//! MPSC channel backed channel and listener implementations. //! //! This module provides an mpsc channel based listener/channel implementation that can be useful //! to for tests. //! //! The current implementation uses [`tokio`]'s unbounded mpsc channels. //! //! ``` //! #[urpc::service] //! trait Hello { //! type Error = (); //! //! async fn hello(name: String) -> String; //! } //! //! struct HelloServer; //! //! impl Hello for HelloServer { //! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result { //! Ok(format!("Hello, {name}!")) //! } //! } //! //! #[tokio::main] //! async fn main() -> Result<(), Box>{ //! let (dialer, listener) = urpc::channel::new(); //! //! // spawn the server //! tokio::spawn(async move { //! urpc::Server::default() //! .with_listener(listener) //! .with_service(HelloServer.into_service()) //! .serve() //! .await //! }); //! //! // create a client //! let channel = urpc::ClientChannel::new(dialer.connect()?); //! let client = HelloClient::new(channel); //! let greeting = client.hello("World".into()).await.unwrap(); //! assert_eq!(greeting, "Hello, World!"); //! Ok(()) //! } //! ``` use futures::{Sink, Stream}; use tokio::sync::mpsc; use crate::protocol::RpcMessage; pub struct ChannelListener { receiver: mpsc::UnboundedReceiver, } impl Stream for ChannelListener { type Item = std::io::Result; fn poll_next( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { match self.get_mut().receiver.poll_recv(cx) { std::task::Poll::Ready(Some(c)) => std::task::Poll::Ready(Some(Ok(c))), std::task::Poll::Ready(None) => std::task::Poll::Ready(None), std::task::Poll::Pending => std::task::Poll::Pending, } } } impl crate::Listener for ChannelListener {} pub struct ChannelDialer { sender: mpsc::UnboundedSender, } impl ChannelDialer { fn new() -> (Self, ChannelListener) { let (sender, receiver) = mpsc::unbounded_channel(); (Self { sender }, ChannelListener { receiver }) } pub fn connect(&self) -> std::io::Result { let (ch1, ch2) = Channel::new(); self.sender.send(ch1).expect("TODO: remove this"); Ok(ch2) } } pub struct Channel { sender: mpsc::UnboundedSender, receiver: mpsc::UnboundedReceiver, } impl Channel { fn new() -> (Self, Self) { let (sender0, receiver0) = mpsc::unbounded_channel(); let (sender1, receiver1) = mpsc::unbounded_channel(); ( Self { sender: sender0, receiver: receiver1, }, Self { sender: sender1, receiver: receiver0, }, ) } } impl Stream for Channel { type Item = std::io::Result; fn poll_next( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { match self.get_mut().receiver.poll_recv(cx) { std::task::Poll::Ready(Some(msg)) => std::task::Poll::Ready(Some(Ok(msg))), std::task::Poll::Ready(None) => std::task::Poll::Ready(None), std::task::Poll::Pending => std::task::Poll::Pending, } } } impl Sink for Channel { type Error = std::io::Error; fn poll_ready( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { return std::task::Poll::Ready(Ok(())); } fn start_send(self: std::pin::Pin<&mut Self>, item: RpcMessage) -> Result<(), Self::Error> { match self.sender.send(item) { Ok(()) => Ok(()), Err(err) => Err(std::io::Error::other(err)), } } fn poll_flush( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { std::task::Poll::Ready(Ok(())) } fn poll_close( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { std::task::Poll::Ready(Ok(())) } } impl crate::Channel for Channel {} pub fn new() -> (ChannelDialer, ChannelListener) { ChannelDialer::new() }