//! UNIX Domain Socket backed channel and listener implementations. //! //! ```no_run //! #[urpc::service] //! trait Hello { //! type Error = (); //! //! async fn hello(name: String) -> String; //! } //! //! struct HelloServer; //! //! impl Hello for HelloServer { //! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result { //! Ok(format!("Hello, {name}!")) //! } //! } //! //! #[tokio::main] //! async fn main() -> Result<(), Box>{ //! let listener = urpc::unix::bind("./hello.service").await?; //! //! // spawn the server //! tokio::spawn(async move { //! urpc::Server::default() //! .with_listener(listener) //! .with_service(HelloServer.into_service()) //! .serve() //! .await //! }); //! //! // create a client //! let channel = urpc::ClientChannel::new(urpc::unix::connect("./hello.service").await?); //! let client = HelloClient::new(channel); //! let greeting = client.hello("World".into()).await.unwrap(); //! assert_eq!(greeting, "Hello, World!"); //! Ok(()) //! } //! ``` use std::{path::Path, pin::Pin}; use futures::{Sink, Stream}; use tokio::{ net::{UnixListener, UnixStream}, sync::mpsc::Receiver, task::AbortHandle, }; use tokio_util::codec::Framed; use crate::{ Channel, Listener, protocol::{RpcMessage, RpcMessageCodec}, }; pub struct UnixChannel(Framed); impl UnixChannel { fn new(stream: UnixStream) -> Self { Self(Framed::new(stream, RpcMessageCodec::default())) } pub async fn connect(path: impl AsRef) -> std::io::Result { let stream = UnixStream::connect(path).await?; Ok(Self::new(stream)) } } impl Sink for UnixChannel { type Error = std::io::Error; fn poll_ready( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Sink::poll_ready(Pin::new(&mut self.get_mut().0), cx) } fn start_send(self: Pin<&mut Self>, item: RpcMessage) -> std::result::Result<(), Self::Error> { Sink::start_send(Pin::new(&mut self.get_mut().0), item) } fn poll_flush( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Sink::poll_flush(Pin::new(&mut self.get_mut().0), cx) } fn poll_close( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Sink::poll_close(Pin::new(&mut self.get_mut().0), cx) } } impl Stream for UnixChannel { type Item = std::io::Result; fn poll_next( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Stream::poll_next(Pin::new(&mut self.get_mut().0), cx) } } impl Channel for UnixChannel {} pub struct UnixChannelListener { receiver: Receiver, abort: AbortHandle, } impl Drop for UnixChannelListener { fn drop(&mut self) { self.abort.abort(); } } impl UnixChannelListener { pub async fn bind(path: impl AsRef) -> std::io::Result { let path = path.as_ref(); if tokio::fs::try_exists(path).await? { tokio::fs::remove_file(path).await?; } let listener = UnixListener::bind(path)?; let (sender, receiver) = tokio::sync::mpsc::channel(8); let abort = tokio::spawn(async move { while let Ok((stream, _addr)) = listener.accept().await { if sender.send(UnixChannel::new(stream)).await.is_err() { break; } } }) .abort_handle(); Ok(Self { receiver, abort }) } } impl Stream for UnixChannelListener { type Item = std::io::Result; fn poll_next( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { self.get_mut().receiver.poll_recv(cx).map(|v| v.map(Ok)) } } impl Listener for UnixChannelListener {} pub async fn bind(path: impl AsRef) -> std::io::Result { UnixChannelListener::bind(path).await } pub async fn connect(path: impl AsRef) -> std::io::Result { UnixChannel::connect(path).await }