aboutsummaryrefslogtreecommitdiff
path: root/src/client_channel.rs
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-07-16 10:46:41 +0100
committerdiogo464 <[email protected]>2025-07-16 10:46:41 +0100
commitf319d7ab5278a3cfb43d38875d81c28cc2dce1e1 (patch)
treecb161fd990643e267bbc373fb09ccd7b689a23b5 /src/client_channel.rs
Initial commit - extracted urpc from monorepo
Diffstat (limited to 'src/client_channel.rs')
-rw-r--r--src/client_channel.rs124
1 files changed, 124 insertions, 0 deletions
diff --git a/src/client_channel.rs b/src/client_channel.rs
new file mode 100644
index 0000000..5666a16
--- /dev/null
+++ b/src/client_channel.rs
@@ -0,0 +1,124 @@
1use std::{collections::HashMap, sync::Arc};
2
3use bytes::Bytes;
4use futures::{SinkExt, StreamExt};
5use tokio::{
6 sync::{mpsc, oneshot},
7 task::AbortHandle,
8};
9
10use crate::{
11 Channel,
12 protocol::{RpcCall, RpcMessage, RpcResponse},
13};
14
15const CLIENT_CHANNEL_BUFFER_SIZE: usize = 64;
16
17struct ClientChannelMessage {
18 service: String,
19 method: String,
20 arguments: Bytes,
21 responder: oneshot::Sender<std::io::Result<Bytes>>,
22}
23
24struct ClientChannelInner {
25 sender: mpsc::Sender<ClientChannelMessage>,
26 abort_handle: AbortHandle,
27}
28
29impl Drop for ClientChannelInner {
30 fn drop(&mut self) {
31 self.abort_handle.abort();
32 }
33}
34
35#[derive(Clone)]
36pub struct ClientChannel(Arc<ClientChannelInner>);
37
38impl ClientChannel {
39 pub fn new<C: Channel>(channel: C) -> Self {
40 let (tx, rx) = mpsc::channel(CLIENT_CHANNEL_BUFFER_SIZE);
41 let abort_handle = tokio::spawn(client_channel_loop(channel, rx)).abort_handle();
42 Self(Arc::new(ClientChannelInner {
43 sender: tx,
44 abort_handle,
45 }))
46 }
47
48 pub async fn call(
49 &self,
50 service: String,
51 method: String,
52 arguments: Bytes,
53 ) -> std::io::Result<Bytes> {
54 let (tx, rx) = oneshot::channel();
55 self.0
56 .sender
57 .send(ClientChannelMessage {
58 service,
59 method,
60 arguments,
61 responder: tx,
62 })
63 .await
64 .expect("client channel task should never shutdown while a client is alive");
65 rx.await
66 .expect("client channel task should never shutdown while a client is alive")
67 }
68}
69
70async fn client_channel_loop<C: Channel>(
71 mut channel: C,
72 mut rx: mpsc::Receiver<ClientChannelMessage>,
73) {
74 enum Select {
75 RpcMessage(RpcMessage),
76 ClientChannelMessage(ClientChannelMessage),
77 }
78
79 let mut responders = HashMap::<u64, oneshot::Sender<std::io::Result<Bytes>>>::default();
80 let mut rpc_call_id = 0;
81
82 loop {
83 let select = tokio::select! {
84 Some(Ok(v)) = channel.next() => Select::RpcMessage(v),
85 Some(v) = rx.recv() => Select::ClientChannelMessage(v),
86 };
87
88 match select {
89 Select::RpcMessage(RpcMessage::Response(RpcResponse { id, value })) => {
90 if let Some(responder) = responders.remove(&id) {
91 let _ = responder.send(Ok(value));
92 }
93 }
94 Select::RpcMessage(_) => todo!(),
95 Select::ClientChannelMessage(ClientChannelMessage {
96 service,
97 method,
98 arguments,
99 responder,
100 }) => {
101 let id = rpc_call_id;
102 rpc_call_id += 1;
103
104 let result = channel
105 .send(RpcMessage::Call(RpcCall {
106 id,
107 service,
108 method,
109 arguments,
110 }))
111 .await;
112
113 match result {
114 Ok(()) => {
115 responders.insert(id, responder);
116 }
117 Err(err) => {
118 let _ = responder.send(Err(err));
119 }
120 }
121 }
122 }
123 }
124}