diff options
Diffstat (limited to 'src/tcp.rs')
| -rw-r--r-- | src/tcp.rs | 156 |
1 files changed, 156 insertions, 0 deletions
diff --git a/src/tcp.rs b/src/tcp.rs new file mode 100644 index 0000000..5684c41 --- /dev/null +++ b/src/tcp.rs | |||
| @@ -0,0 +1,156 @@ | |||
| 1 | //! TCP backed channel and listener implementations. | ||
| 2 | //! | ||
| 3 | //! ```no_run | ||
| 4 | //! #[urpc::service] | ||
| 5 | //! trait Hello { | ||
| 6 | //! type Error = (); | ||
| 7 | //! | ||
| 8 | //! async fn hello(name: String) -> String; | ||
| 9 | //! } | ||
| 10 | //! | ||
| 11 | //! struct HelloServer; | ||
| 12 | //! | ||
| 13 | //! impl Hello for HelloServer { | ||
| 14 | //! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> { | ||
| 15 | //! Ok(format!("Hello, {name}!")) | ||
| 16 | //! } | ||
| 17 | //! } | ||
| 18 | //! | ||
| 19 | //! #[tokio::main] | ||
| 20 | //! async fn main() -> Result<(), Box<dyn std::error::Error>>{ | ||
| 21 | //! let listener = urpc::tcp::bind("0.0.0.0:3000").await?; | ||
| 22 | //! | ||
| 23 | //! // spawn the server | ||
| 24 | //! tokio::spawn(async move { | ||
| 25 | //! urpc::Server::default() | ||
| 26 | //! .with_listener(listener) | ||
| 27 | //! .with_service(HelloServer.into_service()) | ||
| 28 | //! .serve() | ||
| 29 | //! .await | ||
| 30 | //! }); | ||
| 31 | //! | ||
| 32 | //! // create a client | ||
| 33 | //! let channel = urpc::ClientChannel::new(urpc::tcp::connect("127.0.0.1:3000").await?); | ||
| 34 | //! let client = HelloClient::new(channel); | ||
| 35 | //! let greeting = client.hello("World".into()).await.unwrap(); | ||
| 36 | //! assert_eq!(greeting, "Hello, World!"); | ||
| 37 | //! Ok(()) | ||
| 38 | //! } | ||
| 39 | //! ``` | ||
| 40 | use std::pin::Pin; | ||
| 41 | |||
| 42 | use futures::{Sink, Stream}; | ||
| 43 | use tokio::{ | ||
| 44 | net::{TcpListener, TcpStream, ToSocketAddrs}, | ||
| 45 | sync::mpsc::Receiver, | ||
| 46 | task::AbortHandle, | ||
| 47 | }; | ||
| 48 | use tokio_util::codec::Framed; | ||
| 49 | |||
| 50 | use crate::{ | ||
| 51 | Channel, Listener, | ||
| 52 | protocol::{RpcMessage, RpcMessageCodec}, | ||
| 53 | }; | ||
| 54 | |||
| 55 | pub struct TcpChannel(Framed<TcpStream, RpcMessageCodec>); | ||
| 56 | |||
| 57 | impl TcpChannel { | ||
| 58 | fn new(stream: TcpStream) -> Self { | ||
| 59 | Self(Framed::new(stream, RpcMessageCodec::default())) | ||
| 60 | } | ||
| 61 | |||
| 62 | pub async fn connect(addrs: impl ToSocketAddrs) -> std::io::Result<Self> { | ||
| 63 | let stream = TcpStream::connect(addrs).await?; | ||
| 64 | Ok(Self::new(stream)) | ||
| 65 | } | ||
| 66 | } | ||
| 67 | |||
| 68 | impl Sink<RpcMessage> for TcpChannel { | ||
| 69 | type Error = std::io::Error; | ||
| 70 | |||
| 71 | fn poll_ready( | ||
| 72 | self: Pin<&mut Self>, | ||
| 73 | cx: &mut std::task::Context<'_>, | ||
| 74 | ) -> std::task::Poll<std::result::Result<(), Self::Error>> { | ||
| 75 | Sink::poll_ready(Pin::new(&mut self.get_mut().0), cx) | ||
| 76 | } | ||
| 77 | |||
| 78 | fn start_send(self: Pin<&mut Self>, item: RpcMessage) -> std::result::Result<(), Self::Error> { | ||
| 79 | Sink::start_send(Pin::new(&mut self.get_mut().0), item) | ||
| 80 | } | ||
| 81 | |||
| 82 | fn poll_flush( | ||
| 83 | self: Pin<&mut Self>, | ||
| 84 | cx: &mut std::task::Context<'_>, | ||
| 85 | ) -> std::task::Poll<std::result::Result<(), Self::Error>> { | ||
| 86 | Sink::poll_flush(Pin::new(&mut self.get_mut().0), cx) | ||
| 87 | } | ||
| 88 | |||
| 89 | fn poll_close( | ||
| 90 | self: Pin<&mut Self>, | ||
| 91 | cx: &mut std::task::Context<'_>, | ||
| 92 | ) -> std::task::Poll<std::result::Result<(), Self::Error>> { | ||
| 93 | Sink::poll_close(Pin::new(&mut self.get_mut().0), cx) | ||
| 94 | } | ||
| 95 | } | ||
| 96 | |||
| 97 | impl Stream for TcpChannel { | ||
| 98 | type Item = std::io::Result<RpcMessage>; | ||
| 99 | |||
| 100 | fn poll_next( | ||
| 101 | self: Pin<&mut Self>, | ||
| 102 | cx: &mut std::task::Context<'_>, | ||
| 103 | ) -> std::task::Poll<Option<Self::Item>> { | ||
| 104 | Stream::poll_next(Pin::new(&mut self.get_mut().0), cx) | ||
| 105 | } | ||
| 106 | } | ||
| 107 | |||
| 108 | impl Channel for TcpChannel {} | ||
| 109 | |||
| 110 | pub struct TcpChannelListener { | ||
| 111 | receiver: Receiver<TcpChannel>, | ||
| 112 | abort: AbortHandle, | ||
| 113 | } | ||
| 114 | |||
| 115 | impl Drop for TcpChannelListener { | ||
| 116 | fn drop(&mut self) { | ||
| 117 | self.abort.abort(); | ||
| 118 | } | ||
| 119 | } | ||
| 120 | |||
| 121 | impl TcpChannelListener { | ||
| 122 | pub async fn bind(addrs: impl ToSocketAddrs) -> std::io::Result<Self> { | ||
| 123 | let listener = TcpListener::bind(addrs).await?; | ||
| 124 | let (sender, receiver) = tokio::sync::mpsc::channel(8); | ||
| 125 | let abort = tokio::spawn(async move { | ||
| 126 | while let Ok((stream, _addr)) = listener.accept().await { | ||
| 127 | if sender.send(TcpChannel::new(stream)).await.is_err() { | ||
| 128 | break; | ||
| 129 | } | ||
| 130 | } | ||
| 131 | }) | ||
| 132 | .abort_handle(); | ||
| 133 | Ok(Self { receiver, abort }) | ||
| 134 | } | ||
| 135 | } | ||
| 136 | |||
| 137 | impl Stream for TcpChannelListener { | ||
| 138 | type Item = std::io::Result<TcpChannel>; | ||
| 139 | |||
| 140 | fn poll_next( | ||
| 141 | self: Pin<&mut Self>, | ||
| 142 | cx: &mut std::task::Context<'_>, | ||
| 143 | ) -> std::task::Poll<Option<Self::Item>> { | ||
| 144 | self.get_mut().receiver.poll_recv(cx).map(|v| v.map(Ok)) | ||
| 145 | } | ||
| 146 | } | ||
| 147 | |||
| 148 | impl Listener<TcpChannel> for TcpChannelListener {} | ||
| 149 | |||
| 150 | pub async fn bind(addrs: impl ToSocketAddrs) -> std::io::Result<TcpChannelListener> { | ||
| 151 | TcpChannelListener::bind(addrs).await | ||
| 152 | } | ||
| 153 | |||
| 154 | pub async fn connect(addrs: impl ToSocketAddrs) -> std::io::Result<TcpChannel> { | ||
| 155 | TcpChannel::connect(addrs).await | ||
| 156 | } | ||
