//! TCP backed channel and listener implementations. //! //! ```no_run //! #[urpc::service] //! trait Hello { //! type Error = (); //! //! async fn hello(name: String) -> String; //! } //! //! struct HelloServer; //! //! impl Hello for HelloServer { //! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result { //! Ok(format!("Hello, {name}!")) //! } //! } //! //! #[tokio::main] //! async fn main() -> Result<(), Box>{ //! let listener = urpc::tcp::bind("0.0.0.0:3000").await?; //! //! // spawn the server //! tokio::spawn(async move { //! urpc::Server::default() //! .with_listener(listener) //! .with_service(HelloServer.into_service()) //! .serve() //! .await //! }); //! //! // create a client //! let channel = urpc::ClientChannel::new(urpc::tcp::connect("127.0.0.1:3000").await?); //! let client = HelloClient::new(channel); //! let greeting = client.hello("World".into()).await.unwrap(); //! assert_eq!(greeting, "Hello, World!"); //! Ok(()) //! } //! ``` use std::pin::Pin; use futures::{Sink, Stream}; use tokio::{ net::{TcpListener, TcpStream, ToSocketAddrs}, sync::mpsc::Receiver, task::AbortHandle, }; use tokio_util::codec::Framed; use crate::{ Channel, Listener, protocol::{RpcMessage, RpcMessageCodec}, }; pub struct TcpChannel(Framed); impl TcpChannel { fn new(stream: TcpStream) -> Self { Self(Framed::new(stream, RpcMessageCodec::default())) } pub async fn connect(addrs: impl ToSocketAddrs) -> std::io::Result { let stream = TcpStream::connect(addrs).await?; Ok(Self::new(stream)) } } impl Sink for TcpChannel { type Error = std::io::Error; fn poll_ready( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Sink::poll_ready(Pin::new(&mut self.get_mut().0), cx) } fn start_send(self: Pin<&mut Self>, item: RpcMessage) -> std::result::Result<(), Self::Error> { Sink::start_send(Pin::new(&mut self.get_mut().0), item) } fn poll_flush( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Sink::poll_flush(Pin::new(&mut self.get_mut().0), cx) } fn poll_close( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Sink::poll_close(Pin::new(&mut self.get_mut().0), cx) } } impl Stream for TcpChannel { type Item = std::io::Result; fn poll_next( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Stream::poll_next(Pin::new(&mut self.get_mut().0), cx) } } impl Channel for TcpChannel {} pub struct TcpChannelListener { receiver: Receiver, abort: AbortHandle, } impl Drop for TcpChannelListener { fn drop(&mut self) { self.abort.abort(); } } impl TcpChannelListener { pub async fn bind(addrs: impl ToSocketAddrs) -> std::io::Result { let listener = TcpListener::bind(addrs).await?; let (sender, receiver) = tokio::sync::mpsc::channel(8); let abort = tokio::spawn(async move { while let Ok((stream, _addr)) = listener.accept().await { if sender.send(TcpChannel::new(stream)).await.is_err() { break; } } }) .abort_handle(); Ok(Self { receiver, abort }) } } impl Stream for TcpChannelListener { type Item = std::io::Result; fn poll_next( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { self.get_mut().receiver.poll_recv(cx).map(|v| v.map(Ok)) } } impl Listener for TcpChannelListener {} pub async fn bind(addrs: impl ToSocketAddrs) -> std::io::Result { TcpChannelListener::bind(addrs).await } pub async fn connect(addrs: impl ToSocketAddrs) -> std::io::Result { TcpChannel::connect(addrs).await }