diff options
| author | diogo464 <[email protected]> | 2025-07-16 10:46:41 +0100 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-07-16 10:46:41 +0100 |
| commit | f319d7ab5278a3cfb43d38875d81c28cc2dce1e1 (patch) | |
| tree | cb161fd990643e267bbc373fb09ccd7b689a23b5 /src/channel.rs | |
Initial commit - extracted urpc from monorepo
Diffstat (limited to 'src/channel.rs')
| -rw-r--r-- | src/channel.rs | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 0000000..f80ca87 --- /dev/null +++ b/src/channel.rs | |||
| @@ -0,0 +1,162 @@ | |||
| 1 | //! MPSC channel backed channel and listener implementations. | ||
| 2 | //! | ||
| 3 | //! This module provides an mpsc channel based listener/channel implementation that can be useful | ||
| 4 | //! to for tests. | ||
| 5 | //! | ||
| 6 | //! The current implementation uses [`tokio`]'s unbounded mpsc channels. | ||
| 7 | //! | ||
| 8 | //! ``` | ||
| 9 | //! #[urpc::service] | ||
| 10 | //! trait Hello { | ||
| 11 | //! type Error = (); | ||
| 12 | //! | ||
| 13 | //! async fn hello(name: String) -> String; | ||
| 14 | //! } | ||
| 15 | //! | ||
| 16 | //! struct HelloServer; | ||
| 17 | //! | ||
| 18 | //! impl Hello for HelloServer { | ||
| 19 | //! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> { | ||
| 20 | //! Ok(format!("Hello, {name}!")) | ||
| 21 | //! } | ||
| 22 | //! } | ||
| 23 | //! | ||
| 24 | //! #[tokio::main] | ||
| 25 | //! async fn main() -> Result<(), Box<dyn std::error::Error>>{ | ||
| 26 | //! let (dialer, listener) = urpc::channel::new(); | ||
| 27 | //! | ||
| 28 | //! // spawn the server | ||
| 29 | //! tokio::spawn(async move { | ||
| 30 | //! urpc::Server::default() | ||
| 31 | //! .with_listener(listener) | ||
| 32 | //! .with_service(HelloServer.into_service()) | ||
| 33 | //! .serve() | ||
| 34 | //! .await | ||
| 35 | //! }); | ||
| 36 | //! | ||
| 37 | //! // create a client | ||
| 38 | //! let channel = urpc::ClientChannel::new(dialer.connect()?); | ||
| 39 | //! let client = HelloClient::new(channel); | ||
| 40 | //! let greeting = client.hello("World".into()).await.unwrap(); | ||
| 41 | //! assert_eq!(greeting, "Hello, World!"); | ||
| 42 | //! Ok(()) | ||
| 43 | //! } | ||
| 44 | //! ``` | ||
| 45 | |||
| 46 | use futures::{Sink, Stream}; | ||
| 47 | use tokio::sync::mpsc; | ||
| 48 | |||
| 49 | use crate::protocol::RpcMessage; | ||
| 50 | |||
| 51 | pub struct ChannelListener { | ||
| 52 | receiver: mpsc::UnboundedReceiver<Channel>, | ||
| 53 | } | ||
| 54 | |||
| 55 | impl Stream for ChannelListener { | ||
| 56 | type Item = std::io::Result<Channel>; | ||
| 57 | |||
| 58 | fn poll_next( | ||
| 59 | self: std::pin::Pin<&mut Self>, | ||
| 60 | cx: &mut std::task::Context<'_>, | ||
| 61 | ) -> std::task::Poll<Option<Self::Item>> { | ||
| 62 | match self.get_mut().receiver.poll_recv(cx) { | ||
| 63 | std::task::Poll::Ready(Some(c)) => std::task::Poll::Ready(Some(Ok(c))), | ||
| 64 | std::task::Poll::Ready(None) => std::task::Poll::Ready(None), | ||
| 65 | std::task::Poll::Pending => std::task::Poll::Pending, | ||
| 66 | } | ||
| 67 | } | ||
| 68 | } | ||
| 69 | |||
| 70 | impl crate::Listener<Channel> for ChannelListener {} | ||
| 71 | |||
| 72 | pub struct ChannelDialer { | ||
| 73 | sender: mpsc::UnboundedSender<Channel>, | ||
| 74 | } | ||
| 75 | |||
| 76 | impl ChannelDialer { | ||
| 77 | fn new() -> (Self, ChannelListener) { | ||
| 78 | let (sender, receiver) = mpsc::unbounded_channel(); | ||
| 79 | (Self { sender }, ChannelListener { receiver }) | ||
| 80 | } | ||
| 81 | |||
| 82 | pub fn connect(&self) -> std::io::Result<Channel> { | ||
| 83 | let (ch1, ch2) = Channel::new(); | ||
| 84 | self.sender.send(ch1).expect("TODO: remove this"); | ||
| 85 | Ok(ch2) | ||
| 86 | } | ||
| 87 | } | ||
| 88 | |||
| 89 | pub struct Channel { | ||
| 90 | sender: mpsc::UnboundedSender<RpcMessage>, | ||
| 91 | receiver: mpsc::UnboundedReceiver<RpcMessage>, | ||
| 92 | } | ||
| 93 | |||
| 94 | impl Channel { | ||
| 95 | fn new() -> (Self, Self) { | ||
| 96 | let (sender0, receiver0) = mpsc::unbounded_channel(); | ||
| 97 | let (sender1, receiver1) = mpsc::unbounded_channel(); | ||
| 98 | ( | ||
| 99 | Self { | ||
| 100 | sender: sender0, | ||
| 101 | receiver: receiver1, | ||
| 102 | }, | ||
| 103 | Self { | ||
| 104 | sender: sender1, | ||
| 105 | receiver: receiver0, | ||
| 106 | }, | ||
| 107 | ) | ||
| 108 | } | ||
| 109 | } | ||
| 110 | |||
| 111 | impl Stream for Channel { | ||
| 112 | type Item = std::io::Result<RpcMessage>; | ||
| 113 | |||
| 114 | fn poll_next( | ||
| 115 | self: std::pin::Pin<&mut Self>, | ||
| 116 | cx: &mut std::task::Context<'_>, | ||
| 117 | ) -> std::task::Poll<Option<Self::Item>> { | ||
| 118 | match self.get_mut().receiver.poll_recv(cx) { | ||
| 119 | std::task::Poll::Ready(Some(msg)) => std::task::Poll::Ready(Some(Ok(msg))), | ||
| 120 | std::task::Poll::Ready(None) => std::task::Poll::Ready(None), | ||
| 121 | std::task::Poll::Pending => std::task::Poll::Pending, | ||
| 122 | } | ||
| 123 | } | ||
| 124 | } | ||
| 125 | |||
| 126 | impl Sink<RpcMessage> for Channel { | ||
| 127 | type Error = std::io::Error; | ||
| 128 | |||
| 129 | fn poll_ready( | ||
| 130 | self: std::pin::Pin<&mut Self>, | ||
| 131 | _cx: &mut std::task::Context<'_>, | ||
| 132 | ) -> std::task::Poll<Result<(), Self::Error>> { | ||
| 133 | return std::task::Poll::Ready(Ok(())); | ||
| 134 | } | ||
| 135 | |||
| 136 | fn start_send(self: std::pin::Pin<&mut Self>, item: RpcMessage) -> Result<(), Self::Error> { | ||
| 137 | match self.sender.send(item) { | ||
| 138 | Ok(()) => Ok(()), | ||
| 139 | Err(err) => Err(std::io::Error::other(err)), | ||
| 140 | } | ||
| 141 | } | ||
| 142 | |||
| 143 | fn poll_flush( | ||
| 144 | self: std::pin::Pin<&mut Self>, | ||
| 145 | _cx: &mut std::task::Context<'_>, | ||
| 146 | ) -> std::task::Poll<Result<(), Self::Error>> { | ||
| 147 | std::task::Poll::Ready(Ok(())) | ||
| 148 | } | ||
| 149 | |||
| 150 | fn poll_close( | ||
| 151 | self: std::pin::Pin<&mut Self>, | ||
| 152 | _cx: &mut std::task::Context<'_>, | ||
| 153 | ) -> std::task::Poll<Result<(), Self::Error>> { | ||
| 154 | std::task::Poll::Ready(Ok(())) | ||
| 155 | } | ||
| 156 | } | ||
| 157 | |||
| 158 | impl crate::Channel for Channel {} | ||
| 159 | |||
| 160 | pub fn new() -> (ChannelDialer, ChannelListener) { | ||
| 161 | ChannelDialer::new() | ||
| 162 | } | ||
