From f319d7ab5278a3cfb43d38875d81c28cc2dce1e1 Mon Sep 17 00:00:00 2001 From: diogo464 Date: Wed, 16 Jul 2025 10:46:41 +0100 Subject: Initial commit - extracted urpc from monorepo --- .gitignore | 22 ++++ Cargo.toml | 26 +++++ LICENSE-APACHE | 208 +++++++++++++++++++++++++++++++++++++ LICENSE-MIT | 21 ++++ README.md | 64 ++++++++++++ src/channel.rs | 162 +++++++++++++++++++++++++++++ src/client_channel.rs | 124 ++++++++++++++++++++++ src/internal.rs | 2 + src/lib.rs | 52 ++++++++++ src/protocol.rs | 107 +++++++++++++++++++ src/server.rs | 166 +++++++++++++++++++++++++++++ src/tcp.rs | 156 ++++++++++++++++++++++++++++ src/unix.rs | 160 ++++++++++++++++++++++++++++ tests/tcp.rs | 34 ++++++ tests/unix.rs | 34 ++++++ urpc-macro/Cargo.toml | 18 ++++ urpc-macro/src/lib.rs | 282 ++++++++++++++++++++++++++++++++++++++++++++++++++ 17 files changed, 1638 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 LICENSE-APACHE create mode 100644 LICENSE-MIT create mode 100644 README.md create mode 100644 src/channel.rs create mode 100644 src/client_channel.rs create mode 100644 src/internal.rs create mode 100644 src/lib.rs create mode 100644 src/protocol.rs create mode 100644 src/server.rs create mode 100644 src/tcp.rs create mode 100644 src/unix.rs create mode 100644 tests/tcp.rs create mode 100644 tests/unix.rs create mode 100644 urpc-macro/Cargo.toml create mode 100644 urpc-macro/src/lib.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3a9c95e --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +# Rust build artifacts +/target/ +Cargo.lock + +# IDE files +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS generated files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# Test artifacts +hello.service \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..13032d9 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "urpc" +version = "0.1.0" +edition = "2021" +authors = ["Your Name "] +description = "A minimalistic RPC framework for Rust" +license = "MIT OR Apache-2.0" +repository = "https://github.com/your-username/urpc" +keywords = ["rpc", "async", "networking"] +categories = ["network-programming", "asynchronous"] + +[workspace] +members = ["urpc-macro"] + +[dependencies] +urpc-macro = { path = "urpc-macro" } + +bincode = { version = "2.0.1", features = ["serde"] } +bytes = { version = "1.10.1", features = ["serde"] } +futures = "0.3.31" +serde = { version = "1.0.219", features = ["derive"] } +tokio = { version = "1.44.1", features = ["full"] } +tokio-util = { version = "0.7.14", features = ["codec"] } + +[dev-dependencies] +tokio-test = "0.4" diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..1d9467e --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,208 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (which shall not include communication that is conspicuously + marked or otherwise designated in writing by the copyright owner + as "Not a Contribution"). + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright + owner or by an individual or Legal Entity authorized to submit on + behalf of the copyright owner. For the purposes of this definition, + "submitted" means any form of electronic, verbal, or written + communication sent to the Licensor or its representatives, + including but not limited to communication on electronic mailing + lists, source code control systems, and issue tracking systems that + are managed by, or on behalf of, the Licensor for the purpose of + discussing and improving the Work, but excluding communication that + is conspicuously marked or otherwise designated in writing by the + copyright owner as "Not a Contribution". + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to use, reproduce, modify, distribute, prepare + Derivative Works of, publicly display, publicly perform, sublicense, + and sell the Work and to permit persons to whom the Work is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Work. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright notice to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Support. You can have agreements requiring + that any person who redistributes the Work or Derivative Works + thereof provide a warranty or support for the Work, but You may + not do so on behalf of any Contributor or impose any warranty + or support obligations on any Contributor. Any such warranty or + support must be offered by You alone, and You hereby agree to + indemnify any Contributor for any liability incurred by such + Contributor as a result of warranty or support terms offered by You. + + 10. No Trademark License. This License does not grant permission to use + the trade names, trademarks, service marks, or product names of the + Licensor, except as required for reasonable and customary use in + describing the origin of the Work and reproducing the content of + the NOTICE file. + + 11. Additional Terms. You may choose to offer, and to charge a fee for, + warranty, support, indemnity, or other obligations and/or rights + consistent with this License. However, in accepting such obligations, + You may act only on Your own behalf and on Your sole responsibility, + not on behalf of any other Contributor, and only if You agree to + indemnify, defend, and hold each Contributor harmless for any + liability incurred by such Contributor as a result of warranty, + support, indemnity, or other terms You offer. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "about" line as the copyright notice for easier + identification within third-party archives. + + Copyright 2025 urpc contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..c21a6e5 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 urpc contributors + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..6b84471 --- /dev/null +++ b/README.md @@ -0,0 +1,64 @@ +# urpc + +A minimalistic RPC framework for Rust, designed for simplicity and performance. + +## Features + +- Async/await support +- TCP and Unix socket transport +- Procedural macros for service definitions +- Serialization with bincode +- Lightweight and fast + +## Example + +```rust +use urpc::*; + +#[urpc::service] +trait Hello { + type Error = (); + + async fn hello(name: String) -> String; +} + +struct HelloServer; + +impl Hello for HelloServer { + async fn hello(&self, _ctx: urpc::Context, name: String) -> Result { + Ok(format!("Hello, {}!", name)) + } +} + +#[tokio::main] +async fn main() { + // Server + let listener = urpc::tcp::bind("127.0.0.1:8080").await.unwrap(); + + tokio::spawn(async move { + urpc::Server::default() + .with_listener(listener) + .with_service(HelloServer.into_service()) + .serve() + .await + }); + + // Client + let channel = urpc::ClientChannel::new( + urpc::tcp::connect("127.0.0.1:8080").await.unwrap() + ); + let client = HelloClient::new(channel); + + let response = client.hello("World".to_string()).await.unwrap(); + println!("{}", response); // "Hello, World!" +} +``` + +## License + +Licensed under either of + +- Apache License, Version 2.0 +- MIT License + +at your option. \ No newline at end of file diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 0000000..f80ca87 --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,162 @@ +//! MPSC channel backed channel and listener implementations. +//! +//! This module provides an mpsc channel based listener/channel implementation that can be useful +//! to for tests. +//! +//! The current implementation uses [`tokio`]'s unbounded mpsc channels. +//! +//! ``` +//! #[urpc::service] +//! trait Hello { +//! type Error = (); +//! +//! async fn hello(name: String) -> String; +//! } +//! +//! struct HelloServer; +//! +//! impl Hello for HelloServer { +//! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result { +//! Ok(format!("Hello, {name}!")) +//! } +//! } +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box>{ +//! let (dialer, listener) = urpc::channel::new(); +//! +//! // spawn the server +//! tokio::spawn(async move { +//! urpc::Server::default() +//! .with_listener(listener) +//! .with_service(HelloServer.into_service()) +//! .serve() +//! .await +//! }); +//! +//! // create a client +//! let channel = urpc::ClientChannel::new(dialer.connect()?); +//! let client = HelloClient::new(channel); +//! let greeting = client.hello("World".into()).await.unwrap(); +//! assert_eq!(greeting, "Hello, World!"); +//! Ok(()) +//! } +//! ``` + +use futures::{Sink, Stream}; +use tokio::sync::mpsc; + +use crate::protocol::RpcMessage; + +pub struct ChannelListener { + receiver: mpsc::UnboundedReceiver, +} + +impl Stream for ChannelListener { + type Item = std::io::Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.get_mut().receiver.poll_recv(cx) { + std::task::Poll::Ready(Some(c)) => std::task::Poll::Ready(Some(Ok(c))), + std::task::Poll::Ready(None) => std::task::Poll::Ready(None), + std::task::Poll::Pending => std::task::Poll::Pending, + } + } +} + +impl crate::Listener for ChannelListener {} + +pub struct ChannelDialer { + sender: mpsc::UnboundedSender, +} + +impl ChannelDialer { + fn new() -> (Self, ChannelListener) { + let (sender, receiver) = mpsc::unbounded_channel(); + (Self { sender }, ChannelListener { receiver }) + } + + pub fn connect(&self) -> std::io::Result { + let (ch1, ch2) = Channel::new(); + self.sender.send(ch1).expect("TODO: remove this"); + Ok(ch2) + } +} + +pub struct Channel { + sender: mpsc::UnboundedSender, + receiver: mpsc::UnboundedReceiver, +} + +impl Channel { + fn new() -> (Self, Self) { + let (sender0, receiver0) = mpsc::unbounded_channel(); + let (sender1, receiver1) = mpsc::unbounded_channel(); + ( + Self { + sender: sender0, + receiver: receiver1, + }, + Self { + sender: sender1, + receiver: receiver0, + }, + ) + } +} + +impl Stream for Channel { + type Item = std::io::Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.get_mut().receiver.poll_recv(cx) { + std::task::Poll::Ready(Some(msg)) => std::task::Poll::Ready(Some(Ok(msg))), + std::task::Poll::Ready(None) => std::task::Poll::Ready(None), + std::task::Poll::Pending => std::task::Poll::Pending, + } + } +} + +impl Sink for Channel { + type Error = std::io::Error; + + fn poll_ready( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + return std::task::Poll::Ready(Ok(())); + } + + fn start_send(self: std::pin::Pin<&mut Self>, item: RpcMessage) -> Result<(), Self::Error> { + match self.sender.send(item) { + Ok(()) => Ok(()), + Err(err) => Err(std::io::Error::other(err)), + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } +} + +impl crate::Channel for Channel {} + +pub fn new() -> (ChannelDialer, ChannelListener) { + ChannelDialer::new() +} diff --git a/src/client_channel.rs b/src/client_channel.rs new file mode 100644 index 0000000..5666a16 --- /dev/null +++ b/src/client_channel.rs @@ -0,0 +1,124 @@ +use std::{collections::HashMap, sync::Arc}; + +use bytes::Bytes; +use futures::{SinkExt, StreamExt}; +use tokio::{ + sync::{mpsc, oneshot}, + task::AbortHandle, +}; + +use crate::{ + Channel, + protocol::{RpcCall, RpcMessage, RpcResponse}, +}; + +const CLIENT_CHANNEL_BUFFER_SIZE: usize = 64; + +struct ClientChannelMessage { + service: String, + method: String, + arguments: Bytes, + responder: oneshot::Sender>, +} + +struct ClientChannelInner { + sender: mpsc::Sender, + abort_handle: AbortHandle, +} + +impl Drop for ClientChannelInner { + fn drop(&mut self) { + self.abort_handle.abort(); + } +} + +#[derive(Clone)] +pub struct ClientChannel(Arc); + +impl ClientChannel { + pub fn new(channel: C) -> Self { + let (tx, rx) = mpsc::channel(CLIENT_CHANNEL_BUFFER_SIZE); + let abort_handle = tokio::spawn(client_channel_loop(channel, rx)).abort_handle(); + Self(Arc::new(ClientChannelInner { + sender: tx, + abort_handle, + })) + } + + pub async fn call( + &self, + service: String, + method: String, + arguments: Bytes, + ) -> std::io::Result { + let (tx, rx) = oneshot::channel(); + self.0 + .sender + .send(ClientChannelMessage { + service, + method, + arguments, + responder: tx, + }) + .await + .expect("client channel task should never shutdown while a client is alive"); + rx.await + .expect("client channel task should never shutdown while a client is alive") + } +} + +async fn client_channel_loop( + mut channel: C, + mut rx: mpsc::Receiver, +) { + enum Select { + RpcMessage(RpcMessage), + ClientChannelMessage(ClientChannelMessage), + } + + let mut responders = HashMap::>>::default(); + let mut rpc_call_id = 0; + + loop { + let select = tokio::select! { + Some(Ok(v)) = channel.next() => Select::RpcMessage(v), + Some(v) = rx.recv() => Select::ClientChannelMessage(v), + }; + + match select { + Select::RpcMessage(RpcMessage::Response(RpcResponse { id, value })) => { + if let Some(responder) = responders.remove(&id) { + let _ = responder.send(Ok(value)); + } + } + Select::RpcMessage(_) => todo!(), + Select::ClientChannelMessage(ClientChannelMessage { + service, + method, + arguments, + responder, + }) => { + let id = rpc_call_id; + rpc_call_id += 1; + + let result = channel + .send(RpcMessage::Call(RpcCall { + id, + service, + method, + arguments, + })) + .await; + + match result { + Ok(()) => { + responders.insert(id, responder); + } + Err(err) => { + let _ = responder.send(Err(err)); + } + } + } + } + } +} diff --git a/src/internal.rs b/src/internal.rs new file mode 100644 index 0000000..a3c203a --- /dev/null +++ b/src/internal.rs @@ -0,0 +1,2 @@ +pub use bincode; +pub use bytes; diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..e65f659 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,52 @@ +#[doc(hidden)] +pub mod internal; + +pub mod channel; +pub mod protocol; +pub mod tcp; +pub mod unix; + +mod client_channel; +mod server; + +pub use client_channel::ClientChannel; +pub use server::Server; +pub use urpc_macro::service; + +use protocol::RpcMessage; + +use std::pin::Pin; +use std::future::Future; + +use bytes::Bytes; +use futures::{Sink, Stream}; + +#[derive(Debug, Default)] +pub struct Context; + +pub trait Service: Send + Sync + 'static { + fn name() -> &'static str + where + Self: Sized; + + fn call( + &self, + method: String, + arguments: Bytes, + ) -> Pin> + Send + '_>>; +} + +pub trait Channel: + Stream> + + Sink + + Send + + Unpin + + 'static +{ +} + +pub trait Listener: Stream> + Send + Unpin + 'static +where + C: Channel, +{ +} diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 0000000..baf886b --- /dev/null +++ b/src/protocol.rs @@ -0,0 +1,107 @@ +//! Types used by the RPC protocol. +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RpcCall { + pub id: u64, + pub service: String, + pub method: String, + pub arguments: Bytes, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RpcCancel { + pub id: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RpcResponse { + pub id: u64, + pub value: Bytes, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum RpcMessage { + Call(RpcCall), + Response(RpcResponse), + Cancel(RpcCancel), +} + +#[derive(Debug)] +pub enum RpcError { + Transport(std::io::Error), + Remote(E), +} + +impl std::fmt::Display for RpcError +where + E: std::fmt::Display, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RpcError::Transport(error) => write!(f, "transport error: {error}"), + RpcError::Remote(error) => write!(f, "remote error: {error}"), + } + } +} + +impl std::error::Error for RpcError +where + E: std::error::Error, +{ + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + RpcError::Transport(error) => error.source(), + RpcError::Remote(error) => error.source(), + } + } +} + +impl From for RpcError { + fn from(value: std::io::Error) -> Self { + Self::Transport(value) + } +} + +#[derive(Default)] +pub struct RpcMessageCodec(LengthDelimitedCodec); + +impl Encoder for RpcMessageCodec { + type Error = std::io::Error; + + fn encode( + &mut self, + item: RpcMessage, + dst: &mut bytes::BytesMut, + ) -> std::result::Result<(), Self::Error> { + let encoded = bincode::serde::encode_to_vec(&item, bincode::config::standard()) + .map_err(std::io::Error::other)?; + let encoded = Bytes::from(encoded); + self.0.encode(encoded, dst).map_err(std::io::Error::other)?; + Ok(()) + } +} + +impl Decoder for RpcMessageCodec { + type Item = RpcMessage; + + type Error = std::io::Error; + + fn decode( + &mut self, + src: &mut bytes::BytesMut, + ) -> std::result::Result, Self::Error> { + match self.0.decode(src) { + Ok(Some(frame)) => { + let (message, _) = + bincode::serde::decode_from_slice(&frame, bincode::config::standard()) + .map_err(std::io::Error::other)?; + Ok(Some(message)) + } + Ok(None) => Ok(None), + Err(err) => Err(std::io::Error::other(err)), + } + } +} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..414340e --- /dev/null +++ b/src/server.rs @@ -0,0 +1,166 @@ +use std::{collections::HashMap, sync::Arc}; + +use futures::{SinkExt, StreamExt}; +use tokio::task::{AbortHandle, JoinSet}; + +use crate::{ + Channel, Listener, Service, + protocol::{RpcCall, RpcCancel, RpcMessage, RpcResponse}, +}; + +#[derive(Clone)] +struct Services(Arc>>); + +impl Services { + fn new(services: HashMap>) -> Self { + Self(Arc::new(services)) + } + + fn get_service(&self, name: &str) -> std::io::Result> { + match self.0.get(name) { + Some(service) => Ok(service.clone()), + None => Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "service not found", + )), + } + } +} + +type ListenerSpawner = + Box>, Services) -> AbortHandle + Send + 'static>; + +#[derive(Debug, Default)] +struct AbortHandles(Vec); + +impl Drop for AbortHandles { + fn drop(&mut self) { + for handle in &self.0 { + handle.abort(); + } + } +} + +impl AbortHandles { + pub fn push(&mut self, handle: AbortHandle) { + self.0.push(handle); + } +} + +#[derive(Default)] +pub struct Server { + services: HashMap>, + listener_spawners: Vec, +} + +impl Server { + pub fn with_service(mut self, service: T) -> Self + where + T: Service, + { + let name = T::name(); + let service = Arc::new(service); + self.services.insert(name.to_string(), service); + self + } + + pub fn with_listener(mut self, listener: L) -> Self + where + C: Channel, + L: Listener, + { + self.listener_spawners + .push(Box::new(move |join_set, services| { + join_set.spawn(listener_loop(listener, services)) + })); + self + } + + pub async fn serve(self) -> std::io::Result<()> { + let services = Services::new(self.services); + let mut join_set = JoinSet::default(); + let mut abort_handles = AbortHandles::default(); + for spawner in self.listener_spawners { + let abort_handle = (spawner)(&mut join_set, services.clone()); + abort_handles.push(abort_handle); + } + match join_set.join_next().await { + Some(Ok(Ok(()))) => Ok(()), + Some(Ok(Err(err))) => Err(err), + Some(Err(err)) => Err(std::io::Error::other(err)), + None => Ok(()), + } + } +} + +async fn listener_loop(mut listener: L, services: Services) -> std::io::Result<()> +where + C: Channel, + L: Listener, +{ + while let Some(result) = listener.next().await { + let channel = result?; + let services = services.clone(); + tokio::spawn(channel_handler(channel, services)); + } + Ok(()) +} + +async fn channel_handler(mut channel: C, services: Services) -> std::io::Result<()> { + enum Select { + Empty, + Message(RpcMessage), + } + + let (response_tx, mut response_rx) = + tokio::sync::mpsc::unbounded_channel::>(); + let mut requests: HashMap = Default::default(); + loop { + let select = tokio::select! { + reqopt = channel.next() => match reqopt { + Some(Ok(message)) => Select::Message(message), + Some(Err(err)) => return Err(err), + None => Select::Empty, + }, + Some(response) = response_rx.recv() => match response { + Ok(response) => Select::Message(RpcMessage::Response(response)), + Err(err) => return Err(err), + } + }; + + match select { + Select::Empty => break, + Select::Message(message) => match message { + RpcMessage::Call(RpcCall { + id, + service, + method, + arguments, + }) => { + let response_tx = response_tx.clone(); + let service = services.get_service(&service)?; + let handle = tokio::spawn(async move { + let response = match service.call(method, arguments).await { + Ok(value) => Ok(RpcResponse { id, value }), + Err(err) => Err(err), + }; + let _ = response_tx.send(response); + }) + .abort_handle(); + requests.insert(id, handle); + } + RpcMessage::Cancel(RpcCancel { id }) => { + if let Some(handle) = requests.remove(&id) { + handle.abort(); + } + } + RpcMessage::Response(response) => { + requests.remove(&response.id); + channel.send(RpcMessage::Response(response)).await?; + } + }, + } + } + + Ok(()) +} diff --git a/src/tcp.rs b/src/tcp.rs new file mode 100644 index 0000000..5684c41 --- /dev/null +++ b/src/tcp.rs @@ -0,0 +1,156 @@ +//! TCP backed channel and listener implementations. +//! +//! ```no_run +//! #[urpc::service] +//! trait Hello { +//! type Error = (); +//! +//! async fn hello(name: String) -> String; +//! } +//! +//! struct HelloServer; +//! +//! impl Hello for HelloServer { +//! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result { +//! Ok(format!("Hello, {name}!")) +//! } +//! } +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box>{ +//! let listener = urpc::tcp::bind("0.0.0.0:3000").await?; +//! +//! // spawn the server +//! tokio::spawn(async move { +//! urpc::Server::default() +//! .with_listener(listener) +//! .with_service(HelloServer.into_service()) +//! .serve() +//! .await +//! }); +//! +//! // create a client +//! let channel = urpc::ClientChannel::new(urpc::tcp::connect("127.0.0.1:3000").await?); +//! let client = HelloClient::new(channel); +//! let greeting = client.hello("World".into()).await.unwrap(); +//! assert_eq!(greeting, "Hello, World!"); +//! Ok(()) +//! } +//! ``` +use std::pin::Pin; + +use futures::{Sink, Stream}; +use tokio::{ + net::{TcpListener, TcpStream, ToSocketAddrs}, + sync::mpsc::Receiver, + task::AbortHandle, +}; +use tokio_util::codec::Framed; + +use crate::{ + Channel, Listener, + protocol::{RpcMessage, RpcMessageCodec}, +}; + +pub struct TcpChannel(Framed); + +impl TcpChannel { + fn new(stream: TcpStream) -> Self { + Self(Framed::new(stream, RpcMessageCodec::default())) + } + + pub async fn connect(addrs: impl ToSocketAddrs) -> std::io::Result { + let stream = TcpStream::connect(addrs).await?; + Ok(Self::new(stream)) + } +} + +impl Sink for TcpChannel { + type Error = std::io::Error; + + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Sink::poll_ready(Pin::new(&mut self.get_mut().0), cx) + } + + fn start_send(self: Pin<&mut Self>, item: RpcMessage) -> std::result::Result<(), Self::Error> { + Sink::start_send(Pin::new(&mut self.get_mut().0), item) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Sink::poll_flush(Pin::new(&mut self.get_mut().0), cx) + } + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Sink::poll_close(Pin::new(&mut self.get_mut().0), cx) + } +} + +impl Stream for TcpChannel { + type Item = std::io::Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Stream::poll_next(Pin::new(&mut self.get_mut().0), cx) + } +} + +impl Channel for TcpChannel {} + +pub struct TcpChannelListener { + receiver: Receiver, + abort: AbortHandle, +} + +impl Drop for TcpChannelListener { + fn drop(&mut self) { + self.abort.abort(); + } +} + +impl TcpChannelListener { + pub async fn bind(addrs: impl ToSocketAddrs) -> std::io::Result { + let listener = TcpListener::bind(addrs).await?; + let (sender, receiver) = tokio::sync::mpsc::channel(8); + let abort = tokio::spawn(async move { + while let Ok((stream, _addr)) = listener.accept().await { + if sender.send(TcpChannel::new(stream)).await.is_err() { + break; + } + } + }) + .abort_handle(); + Ok(Self { receiver, abort }) + } +} + +impl Stream for TcpChannelListener { + type Item = std::io::Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_mut().receiver.poll_recv(cx).map(|v| v.map(Ok)) + } +} + +impl Listener for TcpChannelListener {} + +pub async fn bind(addrs: impl ToSocketAddrs) -> std::io::Result { + TcpChannelListener::bind(addrs).await +} + +pub async fn connect(addrs: impl ToSocketAddrs) -> std::io::Result { + TcpChannel::connect(addrs).await +} diff --git a/src/unix.rs b/src/unix.rs new file mode 100644 index 0000000..e241b64 --- /dev/null +++ b/src/unix.rs @@ -0,0 +1,160 @@ +//! UNIX Domain Socket backed channel and listener implementations. +//! +//! ```no_run +//! #[urpc::service] +//! trait Hello { +//! type Error = (); +//! +//! async fn hello(name: String) -> String; +//! } +//! +//! struct HelloServer; +//! +//! impl Hello for HelloServer { +//! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result { +//! Ok(format!("Hello, {name}!")) +//! } +//! } +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box>{ +//! let listener = urpc::unix::bind("./hello.service").await?; +//! +//! // spawn the server +//! tokio::spawn(async move { +//! urpc::Server::default() +//! .with_listener(listener) +//! .with_service(HelloServer.into_service()) +//! .serve() +//! .await +//! }); +//! +//! // create a client +//! let channel = urpc::ClientChannel::new(urpc::unix::connect("./hello.service").await?); +//! let client = HelloClient::new(channel); +//! let greeting = client.hello("World".into()).await.unwrap(); +//! assert_eq!(greeting, "Hello, World!"); +//! Ok(()) +//! } +//! ``` +use std::{path::Path, pin::Pin}; + +use futures::{Sink, Stream}; +use tokio::{ + net::{UnixListener, UnixStream}, + sync::mpsc::Receiver, + task::AbortHandle, +}; +use tokio_util::codec::Framed; + +use crate::{ + Channel, Listener, + protocol::{RpcMessage, RpcMessageCodec}, +}; + +pub struct UnixChannel(Framed); + +impl UnixChannel { + fn new(stream: UnixStream) -> Self { + Self(Framed::new(stream, RpcMessageCodec::default())) + } + + pub async fn connect(path: impl AsRef) -> std::io::Result { + let stream = UnixStream::connect(path).await?; + Ok(Self::new(stream)) + } +} + +impl Sink for UnixChannel { + type Error = std::io::Error; + + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Sink::poll_ready(Pin::new(&mut self.get_mut().0), cx) + } + + fn start_send(self: Pin<&mut Self>, item: RpcMessage) -> std::result::Result<(), Self::Error> { + Sink::start_send(Pin::new(&mut self.get_mut().0), item) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Sink::poll_flush(Pin::new(&mut self.get_mut().0), cx) + } + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Sink::poll_close(Pin::new(&mut self.get_mut().0), cx) + } +} + +impl Stream for UnixChannel { + type Item = std::io::Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Stream::poll_next(Pin::new(&mut self.get_mut().0), cx) + } +} + +impl Channel for UnixChannel {} + +pub struct UnixChannelListener { + receiver: Receiver, + abort: AbortHandle, +} + +impl Drop for UnixChannelListener { + fn drop(&mut self) { + self.abort.abort(); + } +} + +impl UnixChannelListener { + pub async fn bind(path: impl AsRef) -> std::io::Result { + let path = path.as_ref(); + if tokio::fs::try_exists(path).await? { + tokio::fs::remove_file(path).await?; + } + let listener = UnixListener::bind(path)?; + let (sender, receiver) = tokio::sync::mpsc::channel(8); + let abort = tokio::spawn(async move { + while let Ok((stream, _addr)) = listener.accept().await { + if sender.send(UnixChannel::new(stream)).await.is_err() { + break; + } + } + }) + .abort_handle(); + Ok(Self { receiver, abort }) + } +} + +impl Stream for UnixChannelListener { + type Item = std::io::Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_mut().receiver.poll_recv(cx).map(|v| v.map(Ok)) + } +} + +impl Listener for UnixChannelListener {} + +pub async fn bind(path: impl AsRef) -> std::io::Result { + UnixChannelListener::bind(path).await +} + +pub async fn connect(path: impl AsRef) -> std::io::Result { + UnixChannel::connect(path).await +} diff --git a/tests/tcp.rs b/tests/tcp.rs new file mode 100644 index 0000000..a2f647d --- /dev/null +++ b/tests/tcp.rs @@ -0,0 +1,34 @@ +#[tokio::test] +async fn test_hello_service() { + #[urpc::service] + trait Hello { + type Error = (); + + async fn hello(name: String) -> String; + } + + struct HelloServer; + + impl Hello for HelloServer { + async fn hello(&self, _ctx: urpc::Context, name: String) -> Result { + Ok(format!("Hello, {name}!")) + } + } + + let listener = urpc::tcp::bind("0.0.0.0:3000").await.unwrap(); + + // spawn the server + tokio::spawn(async move { + urpc::Server::default() + .with_listener(listener) + .with_service(HelloServer.into_service()) + .serve() + .await + }); + + // create a client + let channel = urpc::ClientChannel::new(urpc::tcp::connect("127.0.0.1:3000").await.unwrap()); + let client = HelloClient::new(channel); + let greeting = client.hello("World".into()).await.unwrap(); + assert_eq!(greeting, "Hello, World!"); +} diff --git a/tests/unix.rs b/tests/unix.rs new file mode 100644 index 0000000..bfa5a69 --- /dev/null +++ b/tests/unix.rs @@ -0,0 +1,34 @@ +#[tokio::test] +async fn test_hello_service() { + #[urpc::service] + trait Hello { + type Error = (); + + async fn hello(name: String) -> String; + } + + struct HelloServer; + + impl Hello for HelloServer { + async fn hello(&self, _ctx: urpc::Context, name: String) -> Result { + Ok(format!("Hello, {name}!")) + } + } + + let listener = urpc::unix::bind("./hello.service").await.unwrap(); + + // spawn the server + tokio::spawn(async move { + urpc::Server::default() + .with_listener(listener) + .with_service(HelloServer.into_service()) + .serve() + .await + }); + + // create a client + let channel = urpc::ClientChannel::new(urpc::unix::connect("./hello.service").await.unwrap()); + let client = HelloClient::new(channel); + let greeting = client.hello("World".into()).await.unwrap(); + assert_eq!(greeting, "Hello, World!"); +} diff --git a/urpc-macro/Cargo.toml b/urpc-macro/Cargo.toml new file mode 100644 index 0000000..16b5650 --- /dev/null +++ b/urpc-macro/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "urpc-macro" +version = "0.1.0" +edition = "2021" +authors = ["Your Name "] +description = "Procedural macros for urpc" +license = "MIT OR Apache-2.0" +repository = "https://github.com/your-username/urpc" +keywords = ["rpc", "async", "networking", "macros"] +categories = ["network-programming", "asynchronous"] + +[lib] +proc-macro = true + +[dependencies] +proc-macro2 = "1.0.95" +quote = "1.0.40" +syn = { version = "2.0.101", features = ["full"] } diff --git a/urpc-macro/src/lib.rs b/urpc-macro/src/lib.rs new file mode 100644 index 0000000..4facc60 --- /dev/null +++ b/urpc-macro/src/lib.rs @@ -0,0 +1,282 @@ +use proc_macro2::{Punct, Spacing, Span, TokenStream}; +use quote::{ToTokens, TokenStreamExt}; +use syn::parse::Parse; + +struct Service { + vis: syn::Visibility, + ident: syn::Ident, + error: syn::Type, + methods: Vec, +} + +impl Service { + fn server_trait_ident(&self) -> syn::Ident { + self.ident.clone() + } + + fn client_struct_ident(&self) -> syn::Ident { + syn::Ident::new(&format!("{}Client", self.ident), Span::call_site()) + } +} + +impl Parse for Service { + fn parse(input: syn::parse::ParseStream) -> syn::Result { + let vis = input.parse()?; + input.parse::()?; + let ident = input.parse()?; + + let content; + syn::braced!(content in input); + + content.parse::()?; + let error_ident = content.parse::()?; + if error_ident != "Error" { + return Err(syn::Error::new_spanned( + error_ident, + "expected Error associated type", + )); + } + content.parse::()?; + let error = content.parse()?; + content.parse::()?; + + let mut methods = Vec::default(); + while !content.is_empty() { + let method = content.parse()?; + methods.push(method); + } + + Ok(Self { + vis, + ident, + error, + methods, + }) + } +} + +struct MethodArg { + ident: syn::Ident, + ty: syn::Type, +} + +impl ToTokens for MethodArg { + fn to_tokens(&self, tokens: &mut TokenStream) { + self.ident.to_tokens(tokens); + tokens.append(Punct::new(':', Spacing::Alone)); + self.ty.to_tokens(tokens); + } +} + +impl Parse for MethodArg { + fn parse(input: syn::parse::ParseStream) -> syn::Result { + let ident = input.parse()?; + input.parse::()?; + let ty = input.parse()?; + Ok(Self { ident, ty }) + } +} + +struct Method { + ident: syn::Ident, + arguments: Vec, + return_ty: Option, +} + +impl Parse for Method { + fn parse(input: syn::parse::ParseStream) -> syn::Result { + input.parse::()?; + input.parse::()?; + let ident = input.parse::()?; + let arguments_content; + syn::parenthesized!(arguments_content in input); + + let mut arguments = Vec::default(); + while !arguments_content.is_empty() { + if !arguments.is_empty() { + arguments_content.parse::()?; + } + let argument = arguments_content.parse::()?; + arguments.push(argument); + } + + let return_ty = match input.peek(syn::Token![->]) { + true => { + input.parse::]>()?; + Some(input.parse()?) + } + false => None, + }; + + input.parse::()?; + + Ok(Self { + ident, + arguments, + return_ty, + }) + } +} + +fn generate_service_into_service_match_arm(service: &Service, method: &Method) -> TokenStream { + let method_ident = &method.ident; + let method_name = method.ident.to_string(); + let server_trait = service.server_trait_ident(); + let arg_idents = method + .arguments + .iter() + .map(|arg| &arg.ident) + .collect::>(); + + quote::quote! { + #method_name => { + let ((#(#arg_idents),*), _) = + urpc::internal::bincode::serde::decode_from_slice(&arguments, urpc::internal::bincode::config::standard()).map_err(std::io::Error::other)?; + let ctx = Default::default(); + let ret = ::#method_ident(&self.0, ctx, #(#arg_idents),*).await; + let value = urpc::internal::bincode::serde::encode_to_vec(&ret, urpc::internal::bincode::config::standard()).map_err(std::io::Error::other)?; + Ok(From::from(value)) + } + } +} + +fn generate_service_into_service(service: &Service) -> TokenStream { + let server_trait = service.server_trait_ident(); + let service_name = service.ident.to_string(); + let match_arms = service + .methods + .iter() + .map(|m| generate_service_into_service_match_arm(service, m)) + .collect::>(); + quote::quote! { + fn into_service(self) -> impl urpc::Service { + struct Adapter(T); + + impl urpc::Service for Adapter + where + T: #server_trait, + { + fn name() -> &'static str { + #service_name + } + + fn call( + &self, + method: String, + arguments: urpc::internal::bytes::Bytes, + ) -> std::pin::Pin> + Send + '_>> { + Box::pin(async move { + match method.as_str() { + #(#match_arms)* + _ => panic!("unknown method for service {}", "Router"), + } + }) + } + } + + Adapter(self) + } + } +} + +fn generate_service_server_method(service: &Service, method: &Method) -> TokenStream { + let ident = &method.ident; + let service_error = &service.error; + let args = &method.arguments; + let return_ty = match &method.return_ty { + Some(t) => quote::quote! { std::result::Result<#t, #service_error> }, + None => quote::quote! { std::result::Result<(), #service_error> }, + }; + quote::quote! { + fn #ident(&self, ctx: urpc::Context, #(#args),*) -> impl std::future::Future + Send; + } +} + +fn generate_service_server(service: &Service) -> TokenStream { + let vis = &service.vis; + let ident = service.server_trait_ident(); + let methods = service + .methods + .iter() + .map(|m| generate_service_server_method(service, m)); + let into_service = generate_service_into_service(service); + + quote::quote! { + #vis trait #ident : Sized + Send + Sync + 'static { + #(#methods)* + #into_service + } + } +} + +fn generate_service_client_method(service: &Service, method: &Method) -> TokenStream { + let service_name = service.ident.to_string(); + let service_error = &service.error; + let method_ident = &method.ident; + let method_string = method.ident.to_string(); + let method_arg_idents = method + .arguments + .iter() + .map(|arg| &arg.ident) + .collect::>(); + let method_arg_types = method.arguments.iter().map(|arg| &arg.ty); + let method_return_type = match &method.return_ty { + Some(ty) => quote::quote! { #ty }, + None => quote::quote! { () }, + }; + quote::quote! { + pub async fn #method_ident(&self, #(#method_arg_idents: #method_arg_types),*) -> std::result::Result<#method_return_type, urpc::protocol::RpcError<#service_error>> { + let service = String::from(#service_name); + let method = String::from(#method_string); + let arguments = (#(#method_arg_idents),*); + let arguments = match urpc::internal::bincode::serde::encode_to_vec(&arguments, urpc::internal::bincode::config::standard()) { + Ok(arguments) => From::from(arguments), + Err(err) => return Err(urpc::protocol::RpcError::Transport(std::io::Error::other(err))), + }; + let response = match self.channel.call(service, method, arguments).await { + Ok(response) => response, + Err(err) => return Err(urpc::protocol::RpcError::Transport(err)), + }; + match urpc::internal::bincode::serde::decode_from_slice::, _>(&response, urpc::internal::bincode::config::standard()) { + Ok((result, _)) => result.map_err(urpc::protocol::RpcError::Remote), + Err(err) => Err(urpc::protocol::RpcError::Transport(std::io::Error::other(err))), + } + } + } +} + +fn generate_service_client(service: &Service) -> TokenStream { + let vis = &service.vis; + let client_ident = service.client_struct_ident(); + let methods = service + .methods + .iter() + .map(|m| generate_service_client_method(service, m)); + quote::quote! { + #vis struct #client_ident { + channel: urpc::ClientChannel + } + + impl #client_ident { + pub fn new(channel: urpc::ClientChannel) -> Self { + Self { channel } + } + + #(#methods)* + } + } +} + +#[proc_macro_attribute] +pub fn service( + _attr: proc_macro::TokenStream, + item: proc_macro::TokenStream, +) -> proc_macro::TokenStream { + let service = syn::parse_macro_input!(item as Service); + let service_server = generate_service_server(&service); + let service_client = generate_service_client(&service); + From::from(quote::quote! { + #service_server + #service_client + }) +} -- cgit