aboutsummaryrefslogtreecommitdiff
path: root/src/channel.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel.rs')
-rw-r--r--src/channel.rs162
1 files changed, 162 insertions, 0 deletions
diff --git a/src/channel.rs b/src/channel.rs
new file mode 100644
index 0000000..f80ca87
--- /dev/null
+++ b/src/channel.rs
@@ -0,0 +1,162 @@
1//! MPSC channel backed channel and listener implementations.
2//!
3//! This module provides an mpsc channel based listener/channel implementation that can be useful
4//! to for tests.
5//!
6//! The current implementation uses [`tokio`]'s unbounded mpsc channels.
7//!
8//! ```
9//! #[urpc::service]
10//! trait Hello {
11//! type Error = ();
12//!
13//! async fn hello(name: String) -> String;
14//! }
15//!
16//! struct HelloServer;
17//!
18//! impl Hello for HelloServer {
19//! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> {
20//! Ok(format!("Hello, {name}!"))
21//! }
22//! }
23//!
24//! #[tokio::main]
25//! async fn main() -> Result<(), Box<dyn std::error::Error>>{
26//! let (dialer, listener) = urpc::channel::new();
27//!
28//! // spawn the server
29//! tokio::spawn(async move {
30//! urpc::Server::default()
31//! .with_listener(listener)
32//! .with_service(HelloServer.into_service())
33//! .serve()
34//! .await
35//! });
36//!
37//! // create a client
38//! let channel = urpc::ClientChannel::new(dialer.connect()?);
39//! let client = HelloClient::new(channel);
40//! let greeting = client.hello("World".into()).await.unwrap();
41//! assert_eq!(greeting, "Hello, World!");
42//! Ok(())
43//! }
44//! ```
45
46use futures::{Sink, Stream};
47use tokio::sync::mpsc;
48
49use crate::protocol::RpcMessage;
50
51pub struct ChannelListener {
52 receiver: mpsc::UnboundedReceiver<Channel>,
53}
54
55impl Stream for ChannelListener {
56 type Item = std::io::Result<Channel>;
57
58 fn poll_next(
59 self: std::pin::Pin<&mut Self>,
60 cx: &mut std::task::Context<'_>,
61 ) -> std::task::Poll<Option<Self::Item>> {
62 match self.get_mut().receiver.poll_recv(cx) {
63 std::task::Poll::Ready(Some(c)) => std::task::Poll::Ready(Some(Ok(c))),
64 std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
65 std::task::Poll::Pending => std::task::Poll::Pending,
66 }
67 }
68}
69
70impl crate::Listener<Channel> for ChannelListener {}
71
72pub struct ChannelDialer {
73 sender: mpsc::UnboundedSender<Channel>,
74}
75
76impl ChannelDialer {
77 fn new() -> (Self, ChannelListener) {
78 let (sender, receiver) = mpsc::unbounded_channel();
79 (Self { sender }, ChannelListener { receiver })
80 }
81
82 pub fn connect(&self) -> std::io::Result<Channel> {
83 let (ch1, ch2) = Channel::new();
84 self.sender.send(ch1).expect("TODO: remove this");
85 Ok(ch2)
86 }
87}
88
89pub struct Channel {
90 sender: mpsc::UnboundedSender<RpcMessage>,
91 receiver: mpsc::UnboundedReceiver<RpcMessage>,
92}
93
94impl Channel {
95 fn new() -> (Self, Self) {
96 let (sender0, receiver0) = mpsc::unbounded_channel();
97 let (sender1, receiver1) = mpsc::unbounded_channel();
98 (
99 Self {
100 sender: sender0,
101 receiver: receiver1,
102 },
103 Self {
104 sender: sender1,
105 receiver: receiver0,
106 },
107 )
108 }
109}
110
111impl Stream for Channel {
112 type Item = std::io::Result<RpcMessage>;
113
114 fn poll_next(
115 self: std::pin::Pin<&mut Self>,
116 cx: &mut std::task::Context<'_>,
117 ) -> std::task::Poll<Option<Self::Item>> {
118 match self.get_mut().receiver.poll_recv(cx) {
119 std::task::Poll::Ready(Some(msg)) => std::task::Poll::Ready(Some(Ok(msg))),
120 std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
121 std::task::Poll::Pending => std::task::Poll::Pending,
122 }
123 }
124}
125
126impl Sink<RpcMessage> for Channel {
127 type Error = std::io::Error;
128
129 fn poll_ready(
130 self: std::pin::Pin<&mut Self>,
131 _cx: &mut std::task::Context<'_>,
132 ) -> std::task::Poll<Result<(), Self::Error>> {
133 return std::task::Poll::Ready(Ok(()));
134 }
135
136 fn start_send(self: std::pin::Pin<&mut Self>, item: RpcMessage) -> Result<(), Self::Error> {
137 match self.sender.send(item) {
138 Ok(()) => Ok(()),
139 Err(err) => Err(std::io::Error::other(err)),
140 }
141 }
142
143 fn poll_flush(
144 self: std::pin::Pin<&mut Self>,
145 _cx: &mut std::task::Context<'_>,
146 ) -> std::task::Poll<Result<(), Self::Error>> {
147 std::task::Poll::Ready(Ok(()))
148 }
149
150 fn poll_close(
151 self: std::pin::Pin<&mut Self>,
152 _cx: &mut std::task::Context<'_>,
153 ) -> std::task::Poll<Result<(), Self::Error>> {
154 std::task::Poll::Ready(Ok(()))
155 }
156}
157
158impl crate::Channel for Channel {}
159
160pub fn new() -> (ChannelDialer, ChannelListener) {
161 ChannelDialer::new()
162}