aboutsummaryrefslogtreecommitdiff
path: root/src/unix.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/unix.rs')
-rw-r--r--src/unix.rs160
1 files changed, 160 insertions, 0 deletions
diff --git a/src/unix.rs b/src/unix.rs
new file mode 100644
index 0000000..e241b64
--- /dev/null
+++ b/src/unix.rs
@@ -0,0 +1,160 @@
1//! UNIX Domain Socket backed channel and listener implementations.
2//!
3//! ```no_run
4//! #[urpc::service]
5//! trait Hello {
6//! type Error = ();
7//!
8//! async fn hello(name: String) -> String;
9//! }
10//!
11//! struct HelloServer;
12//!
13//! impl Hello for HelloServer {
14//! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> {
15//! Ok(format!("Hello, {name}!"))
16//! }
17//! }
18//!
19//! #[tokio::main]
20//! async fn main() -> Result<(), Box<dyn std::error::Error>>{
21//! let listener = urpc::unix::bind("./hello.service").await?;
22//!
23//! // spawn the server
24//! tokio::spawn(async move {
25//! urpc::Server::default()
26//! .with_listener(listener)
27//! .with_service(HelloServer.into_service())
28//! .serve()
29//! .await
30//! });
31//!
32//! // create a client
33//! let channel = urpc::ClientChannel::new(urpc::unix::connect("./hello.service").await?);
34//! let client = HelloClient::new(channel);
35//! let greeting = client.hello("World".into()).await.unwrap();
36//! assert_eq!(greeting, "Hello, World!");
37//! Ok(())
38//! }
39//! ```
40use std::{path::Path, pin::Pin};
41
42use futures::{Sink, Stream};
43use tokio::{
44 net::{UnixListener, UnixStream},
45 sync::mpsc::Receiver,
46 task::AbortHandle,
47};
48use tokio_util::codec::Framed;
49
50use crate::{
51 Channel, Listener,
52 protocol::{RpcMessage, RpcMessageCodec},
53};
54
55pub struct UnixChannel(Framed<UnixStream, RpcMessageCodec>);
56
57impl UnixChannel {
58 fn new(stream: UnixStream) -> Self {
59 Self(Framed::new(stream, RpcMessageCodec::default()))
60 }
61
62 pub async fn connect(path: impl AsRef<Path>) -> std::io::Result<Self> {
63 let stream = UnixStream::connect(path).await?;
64 Ok(Self::new(stream))
65 }
66}
67
68impl Sink<RpcMessage> for UnixChannel {
69 type Error = std::io::Error;
70
71 fn poll_ready(
72 self: Pin<&mut Self>,
73 cx: &mut std::task::Context<'_>,
74 ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
75 Sink::poll_ready(Pin::new(&mut self.get_mut().0), cx)
76 }
77
78 fn start_send(self: Pin<&mut Self>, item: RpcMessage) -> std::result::Result<(), Self::Error> {
79 Sink::start_send(Pin::new(&mut self.get_mut().0), item)
80 }
81
82 fn poll_flush(
83 self: Pin<&mut Self>,
84 cx: &mut std::task::Context<'_>,
85 ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
86 Sink::poll_flush(Pin::new(&mut self.get_mut().0), cx)
87 }
88
89 fn poll_close(
90 self: Pin<&mut Self>,
91 cx: &mut std::task::Context<'_>,
92 ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
93 Sink::poll_close(Pin::new(&mut self.get_mut().0), cx)
94 }
95}
96
97impl Stream for UnixChannel {
98 type Item = std::io::Result<RpcMessage>;
99
100 fn poll_next(
101 self: Pin<&mut Self>,
102 cx: &mut std::task::Context<'_>,
103 ) -> std::task::Poll<Option<Self::Item>> {
104 Stream::poll_next(Pin::new(&mut self.get_mut().0), cx)
105 }
106}
107
108impl Channel for UnixChannel {}
109
110pub struct UnixChannelListener {
111 receiver: Receiver<UnixChannel>,
112 abort: AbortHandle,
113}
114
115impl Drop for UnixChannelListener {
116 fn drop(&mut self) {
117 self.abort.abort();
118 }
119}
120
121impl UnixChannelListener {
122 pub async fn bind(path: impl AsRef<Path>) -> std::io::Result<Self> {
123 let path = path.as_ref();
124 if tokio::fs::try_exists(path).await? {
125 tokio::fs::remove_file(path).await?;
126 }
127 let listener = UnixListener::bind(path)?;
128 let (sender, receiver) = tokio::sync::mpsc::channel(8);
129 let abort = tokio::spawn(async move {
130 while let Ok((stream, _addr)) = listener.accept().await {
131 if sender.send(UnixChannel::new(stream)).await.is_err() {
132 break;
133 }
134 }
135 })
136 .abort_handle();
137 Ok(Self { receiver, abort })
138 }
139}
140
141impl Stream for UnixChannelListener {
142 type Item = std::io::Result<UnixChannel>;
143
144 fn poll_next(
145 self: Pin<&mut Self>,
146 cx: &mut std::task::Context<'_>,
147 ) -> std::task::Poll<Option<Self::Item>> {
148 self.get_mut().receiver.poll_recv(cx).map(|v| v.map(Ok))
149 }
150}
151
152impl Listener<UnixChannel> for UnixChannelListener {}
153
154pub async fn bind(path: impl AsRef<Path>) -> std::io::Result<UnixChannelListener> {
155 UnixChannelListener::bind(path).await
156}
157
158pub async fn connect(path: impl AsRef<Path>) -> std::io::Result<UnixChannel> {
159 UnixChannel::connect(path).await
160}