aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore22
-rw-r--r--Cargo.toml26
-rw-r--r--LICENSE-APACHE208
-rw-r--r--LICENSE-MIT21
-rw-r--r--README.md64
-rw-r--r--src/channel.rs162
-rw-r--r--src/client_channel.rs124
-rw-r--r--src/internal.rs2
-rw-r--r--src/lib.rs52
-rw-r--r--src/protocol.rs107
-rw-r--r--src/server.rs166
-rw-r--r--src/tcp.rs156
-rw-r--r--src/unix.rs160
-rw-r--r--tests/tcp.rs34
-rw-r--r--tests/unix.rs34
-rw-r--r--urpc-macro/Cargo.toml18
-rw-r--r--urpc-macro/src/lib.rs282
17 files changed, 1638 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..3a9c95e
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,22 @@
1# Rust build artifacts
2/target/
3Cargo.lock
4
5# IDE files
6.vscode/
7.idea/
8*.swp
9*.swo
10*~
11
12# OS generated files
13.DS_Store
14.DS_Store?
15._*
16.Spotlight-V100
17.Trashes
18ehthumbs.db
19Thumbs.db
20
21# Test artifacts
22hello.service \ No newline at end of file
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..13032d9
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,26 @@
1[package]
2name = "urpc"
3version = "0.1.0"
4edition = "2021"
5authors = ["Your Name <[email protected]>"]
6description = "A minimalistic RPC framework for Rust"
7license = "MIT OR Apache-2.0"
8repository = "https://github.com/your-username/urpc"
9keywords = ["rpc", "async", "networking"]
10categories = ["network-programming", "asynchronous"]
11
12[workspace]
13members = ["urpc-macro"]
14
15[dependencies]
16urpc-macro = { path = "urpc-macro" }
17
18bincode = { version = "2.0.1", features = ["serde"] }
19bytes = { version = "1.10.1", features = ["serde"] }
20futures = "0.3.31"
21serde = { version = "1.0.219", features = ["derive"] }
22tokio = { version = "1.44.1", features = ["full"] }
23tokio-util = { version = "0.7.14", features = ["codec"] }
24
25[dev-dependencies]
26tokio-test = "0.4"
diff --git a/LICENSE-APACHE b/LICENSE-APACHE
new file mode 100644
index 0000000..1d9467e
--- /dev/null
+++ b/LICENSE-APACHE
@@ -0,0 +1,208 @@
1 Apache License
2 Version 2.0, January 2004
3 http://www.apache.org/licenses/
4
5 TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
6
7 1. Definitions.
8
9 "License" shall mean the terms and conditions for use, reproduction,
10 and distribution as defined by Sections 1 through 9 of this document.
11
12 "Licensor" shall mean the copyright owner or entity granting the License.
13
14 "Legal Entity" shall mean the union of the acting entity and all
15 other entities that control, are controlled by, or are under common
16 control with that entity. For the purposes of this definition,
17 "control" means (i) the power, direct or indirect, to cause the
18 direction or management of such entity, whether by contract or
19 otherwise, or (ii) ownership of fifty percent (50%) or more of the
20 outstanding shares, or (iii) beneficial ownership of such entity.
21
22 "You" (or "Your") shall mean an individual or Legal Entity
23 exercising permissions granted by this License.
24
25 "Source" form shall mean the preferred form for making modifications,
26 including but not limited to software source code, documentation
27 source, and configuration files.
28
29 "Object" form shall mean any form resulting from mechanical
30 transformation or translation of a Source form, including but
31 not limited to compiled object code, generated documentation,
32 and conversions to other media types.
33
34 "Work" shall mean the work of authorship, whether in Source or
35 Object form, made available under the License, as indicated by a
36 copyright notice that is included in or attached to the work
37 (which shall not include communication that is conspicuously
38 marked or otherwise designated in writing by the copyright owner
39 as "Not a Contribution").
40
41 "Contribution" shall mean any work of authorship, including
42 the original version of the Work and any modifications or additions
43 to that Work or Derivative Works thereof, that is intentionally
44 submitted to Licensor for inclusion in the Work by the copyright
45 owner or by an individual or Legal Entity authorized to submit on
46 behalf of the copyright owner. For the purposes of this definition,
47 "submitted" means any form of electronic, verbal, or written
48 communication sent to the Licensor or its representatives,
49 including but not limited to communication on electronic mailing
50 lists, source code control systems, and issue tracking systems that
51 are managed by, or on behalf of, the Licensor for the purpose of
52 discussing and improving the Work, but excluding communication that
53 is conspicuously marked or otherwise designated in writing by the
54 copyright owner as "Not a Contribution".
55
56 2. Grant of Copyright License. Subject to the terms and conditions of
57 this License, each Contributor hereby grants to You a perpetual,
58 worldwide, non-exclusive, no-charge, royalty-free, irrevocable
59 copyright license to use, reproduce, modify, distribute, prepare
60 Derivative Works of, publicly display, publicly perform, sublicense,
61 and sell the Work and to permit persons to whom the Work is
62 furnished to do so, subject to the following conditions:
63
64 The above copyright notice and this permission notice shall be
65 included in all copies or substantial portions of the Work.
66
67 3. Grant of Patent License. Subject to the terms and conditions of
68 this License, each Contributor hereby grants to You a perpetual,
69 worldwide, non-exclusive, no-charge, royalty-free, irrevocable
70 (except as stated in this section) patent license to make, have made,
71 use, offer to sell, sell, import, and otherwise transfer the Work,
72 where such license applies only to those patent claims licensable
73 by such Contributor that are necessarily infringed by their
74 Contribution(s) alone or by combination of their Contribution(s)
75 with the Work to which such Contribution(s) was submitted. If You
76 institute patent litigation against any entity (including a
77 cross-claim or counterclaim in a lawsuit) alleging that the Work
78 or a Contribution incorporated within the Work constitutes direct
79 or contributory patent infringement, then any patent licenses
80 granted to You under this License for that Work shall terminate
81 as of the date such litigation is filed.
82
83 4. Redistribution. You may reproduce and distribute copies of the
84 Work or Derivative Works thereof in any medium, with or without
85 modifications, and in Source or Object form, provided that You
86 meet the following conditions:
87
88 (a) You must give any other recipients of the Work or
89 Derivative Works a copy of this License; and
90
91 (b) You must cause any modified files to carry prominent notices
92 stating that You changed the files; and
93
94 (c) You must retain, in the Source form of any Derivative Works
95 that You distribute, all copyright, patent, trademark, and
96 attribution notices from the Source form of the Work,
97 excluding those notices that do not pertain to any part of
98 the Derivative Works; and
99
100 (d) If the Work includes a "NOTICE" text file as part of its
101 distribution, then any Derivative Works that You distribute must
102 include a readable copy of the attribution notices contained
103 within such NOTICE file, excluding those notices that do not
104 pertain to any part of the Derivative Works, in at least one
105 of the following places: within a NOTICE text file distributed
106 as part of the Derivative Works; within the Source form or
107 documentation, if provided along with the Derivative Works; or,
108 within a display generated by the Derivative Works, if and
109 wherever such third-party notices normally appear. The contents
110 of the NOTICE file are for informational purposes only and
111 do not modify the License. You may add Your own attribution
112 notices within Derivative Works that You distribute, alongside
113 or as an addendum to the NOTICE text from the Work, provided
114 that such additional attribution notices cannot be construed
115 as modifying the License.
116
117 You may add Your own copyright notice to Your modifications and
118 may provide additional or different license terms and conditions
119 for use, reproduction, or distribution of Your modifications, or
120 for any such Derivative Works as a whole, provided Your use,
121 reproduction, and distribution of the Work otherwise complies with
122 the conditions stated in this License.
123
124 5. Submission of Contributions. Unless You explicitly state otherwise,
125 any Contribution intentionally submitted for inclusion in the Work
126 by You to the Licensor shall be under the terms and conditions of
127 this License, without any additional terms or conditions.
128 Notwithstanding the above, nothing herein shall supersede or modify
129 the terms of any separate license agreement you may have executed
130 with Licensor regarding such Contributions.
131
132 6. Trademarks. This License does not grant permission to use the trade
133 names, trademarks, service marks, or product names of the Licensor,
134 except as required for reasonable and customary use in describing the
135 origin of the Work and reproducing the content of the NOTICE file.
136
137 7. Disclaimer of Warranty. Unless required by applicable law or
138 agreed to in writing, Licensor provides the Work (and each
139 Contributor provides its Contributions) on an "AS IS" BASIS,
140 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
141 implied, including, without limitation, any warranties or conditions
142 of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
143 PARTICULAR PURPOSE. You are solely responsible for determining the
144 appropriateness of using or redistributing the Work and assume any
145 risks associated with Your exercise of permissions under this License.
146
147 8. Limitation of Liability. In no event and under no legal theory,
148 whether in tort (including negligence), contract, or otherwise,
149 unless required by applicable law (such as deliberate and grossly
150 negligent acts) or agreed to in writing, shall any Contributor be
151 liable to You for damages, including any direct, indirect, special,
152 incidental, or consequential damages of any character arising as a
153 result of this License or out of the use or inability to use the
154 Work (including but not limited to damages for loss of goodwill,
155 work stoppage, computer failure or malfunction, or any and all
156 other commercial damages or losses), even if such Contributor
157 has been advised of the possibility of such damages.
158
159 9. Accepting Warranty or Support. You can have agreements requiring
160 that any person who redistributes the Work or Derivative Works
161 thereof provide a warranty or support for the Work, but You may
162 not do so on behalf of any Contributor or impose any warranty
163 or support obligations on any Contributor. Any such warranty or
164 support must be offered by You alone, and You hereby agree to
165 indemnify any Contributor for any liability incurred by such
166 Contributor as a result of warranty or support terms offered by You.
167
168 10. No Trademark License. This License does not grant permission to use
169 the trade names, trademarks, service marks, or product names of the
170 Licensor, except as required for reasonable and customary use in
171 describing the origin of the Work and reproducing the content of
172 the NOTICE file.
173
174 11. Additional Terms. You may choose to offer, and to charge a fee for,
175 warranty, support, indemnity, or other obligations and/or rights
176 consistent with this License. However, in accepting such obligations,
177 You may act only on Your own behalf and on Your sole responsibility,
178 not on behalf of any other Contributor, and only if You agree to
179 indemnify, defend, and hold each Contributor harmless for any
180 liability incurred by such Contributor as a result of warranty,
181 support, indemnity, or other terms You offer.
182
183 END OF TERMS AND CONDITIONS
184
185 APPENDIX: How to apply the Apache License to your work.
186
187 To apply the Apache License to your work, attach the following
188 boilerplate notice, with the fields enclosed by brackets "[]"
189 replaced with your own identifying information. (Don't include
190 the brackets!) The text should be enclosed in the appropriate
191 comment syntax for the file format. We also recommend that a
192 file or class name and description of purpose be included on the
193 same "about" line as the copyright notice for easier
194 identification within third-party archives.
195
196 Copyright 2025 urpc contributors
197
198 Licensed under the Apache License, Version 2.0 (the "License");
199 you may not use this file except in compliance with the License.
200 You may obtain a copy of the License at
201
202 http://www.apache.org/licenses/LICENSE-2.0
203
204 Unless required by applicable law or agreed to in writing, software
205 distributed under the License is distributed on an "AS IS" BASIS,
206 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
207 See the License for the specific language governing permissions and
208 limitations under the License. \ No newline at end of file
diff --git a/LICENSE-MIT b/LICENSE-MIT
new file mode 100644
index 0000000..c21a6e5
--- /dev/null
+++ b/LICENSE-MIT
@@ -0,0 +1,21 @@
1MIT License
2
3Copyright (c) 2025 urpc contributors
4
5Permission is hereby granted, free of charge, to any person obtaining a copy
6of this software and associated documentation files (the "Software"), to deal
7in the Software without restriction, including without limitation the rights
8to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9copies of the Software, and to permit persons to whom the Software is
10furnished to do so, subject to the following conditions:
11
12The above copyright notice and this permission notice shall be included in all
13copies or substantial portions of the Software.
14
15THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21SOFTWARE. \ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..6b84471
--- /dev/null
+++ b/README.md
@@ -0,0 +1,64 @@
1# urpc
2
3A minimalistic RPC framework for Rust, designed for simplicity and performance.
4
5## Features
6
7- Async/await support
8- TCP and Unix socket transport
9- Procedural macros for service definitions
10- Serialization with bincode
11- Lightweight and fast
12
13## Example
14
15```rust
16use urpc::*;
17
18#[urpc::service]
19trait Hello {
20 type Error = ();
21
22 async fn hello(name: String) -> String;
23}
24
25struct HelloServer;
26
27impl Hello for HelloServer {
28 async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> {
29 Ok(format!("Hello, {}!", name))
30 }
31}
32
33#[tokio::main]
34async fn main() {
35 // Server
36 let listener = urpc::tcp::bind("127.0.0.1:8080").await.unwrap();
37
38 tokio::spawn(async move {
39 urpc::Server::default()
40 .with_listener(listener)
41 .with_service(HelloServer.into_service())
42 .serve()
43 .await
44 });
45
46 // Client
47 let channel = urpc::ClientChannel::new(
48 urpc::tcp::connect("127.0.0.1:8080").await.unwrap()
49 );
50 let client = HelloClient::new(channel);
51
52 let response = client.hello("World".to_string()).await.unwrap();
53 println!("{}", response); // "Hello, World!"
54}
55```
56
57## License
58
59Licensed under either of
60
61- Apache License, Version 2.0
62- MIT License
63
64at your option. \ No newline at end of file
diff --git a/src/channel.rs b/src/channel.rs
new file mode 100644
index 0000000..f80ca87
--- /dev/null
+++ b/src/channel.rs
@@ -0,0 +1,162 @@
1//! MPSC channel backed channel and listener implementations.
2//!
3//! This module provides an mpsc channel based listener/channel implementation that can be useful
4//! to for tests.
5//!
6//! The current implementation uses [`tokio`]'s unbounded mpsc channels.
7//!
8//! ```
9//! #[urpc::service]
10//! trait Hello {
11//! type Error = ();
12//!
13//! async fn hello(name: String) -> String;
14//! }
15//!
16//! struct HelloServer;
17//!
18//! impl Hello for HelloServer {
19//! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> {
20//! Ok(format!("Hello, {name}!"))
21//! }
22//! }
23//!
24//! #[tokio::main]
25//! async fn main() -> Result<(), Box<dyn std::error::Error>>{
26//! let (dialer, listener) = urpc::channel::new();
27//!
28//! // spawn the server
29//! tokio::spawn(async move {
30//! urpc::Server::default()
31//! .with_listener(listener)
32//! .with_service(HelloServer.into_service())
33//! .serve()
34//! .await
35//! });
36//!
37//! // create a client
38//! let channel = urpc::ClientChannel::new(dialer.connect()?);
39//! let client = HelloClient::new(channel);
40//! let greeting = client.hello("World".into()).await.unwrap();
41//! assert_eq!(greeting, "Hello, World!");
42//! Ok(())
43//! }
44//! ```
45
46use futures::{Sink, Stream};
47use tokio::sync::mpsc;
48
49use crate::protocol::RpcMessage;
50
51pub struct ChannelListener {
52 receiver: mpsc::UnboundedReceiver<Channel>,
53}
54
55impl Stream for ChannelListener {
56 type Item = std::io::Result<Channel>;
57
58 fn poll_next(
59 self: std::pin::Pin<&mut Self>,
60 cx: &mut std::task::Context<'_>,
61 ) -> std::task::Poll<Option<Self::Item>> {
62 match self.get_mut().receiver.poll_recv(cx) {
63 std::task::Poll::Ready(Some(c)) => std::task::Poll::Ready(Some(Ok(c))),
64 std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
65 std::task::Poll::Pending => std::task::Poll::Pending,
66 }
67 }
68}
69
70impl crate::Listener<Channel> for ChannelListener {}
71
72pub struct ChannelDialer {
73 sender: mpsc::UnboundedSender<Channel>,
74}
75
76impl ChannelDialer {
77 fn new() -> (Self, ChannelListener) {
78 let (sender, receiver) = mpsc::unbounded_channel();
79 (Self { sender }, ChannelListener { receiver })
80 }
81
82 pub fn connect(&self) -> std::io::Result<Channel> {
83 let (ch1, ch2) = Channel::new();
84 self.sender.send(ch1).expect("TODO: remove this");
85 Ok(ch2)
86 }
87}
88
89pub struct Channel {
90 sender: mpsc::UnboundedSender<RpcMessage>,
91 receiver: mpsc::UnboundedReceiver<RpcMessage>,
92}
93
94impl Channel {
95 fn new() -> (Self, Self) {
96 let (sender0, receiver0) = mpsc::unbounded_channel();
97 let (sender1, receiver1) = mpsc::unbounded_channel();
98 (
99 Self {
100 sender: sender0,
101 receiver: receiver1,
102 },
103 Self {
104 sender: sender1,
105 receiver: receiver0,
106 },
107 )
108 }
109}
110
111impl Stream for Channel {
112 type Item = std::io::Result<RpcMessage>;
113
114 fn poll_next(
115 self: std::pin::Pin<&mut Self>,
116 cx: &mut std::task::Context<'_>,
117 ) -> std::task::Poll<Option<Self::Item>> {
118 match self.get_mut().receiver.poll_recv(cx) {
119 std::task::Poll::Ready(Some(msg)) => std::task::Poll::Ready(Some(Ok(msg))),
120 std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
121 std::task::Poll::Pending => std::task::Poll::Pending,
122 }
123 }
124}
125
126impl Sink<RpcMessage> for Channel {
127 type Error = std::io::Error;
128
129 fn poll_ready(
130 self: std::pin::Pin<&mut Self>,
131 _cx: &mut std::task::Context<'_>,
132 ) -> std::task::Poll<Result<(), Self::Error>> {
133 return std::task::Poll::Ready(Ok(()));
134 }
135
136 fn start_send(self: std::pin::Pin<&mut Self>, item: RpcMessage) -> Result<(), Self::Error> {
137 match self.sender.send(item) {
138 Ok(()) => Ok(()),
139 Err(err) => Err(std::io::Error::other(err)),
140 }
141 }
142
143 fn poll_flush(
144 self: std::pin::Pin<&mut Self>,
145 _cx: &mut std::task::Context<'_>,
146 ) -> std::task::Poll<Result<(), Self::Error>> {
147 std::task::Poll::Ready(Ok(()))
148 }
149
150 fn poll_close(
151 self: std::pin::Pin<&mut Self>,
152 _cx: &mut std::task::Context<'_>,
153 ) -> std::task::Poll<Result<(), Self::Error>> {
154 std::task::Poll::Ready(Ok(()))
155 }
156}
157
158impl crate::Channel for Channel {}
159
160pub fn new() -> (ChannelDialer, ChannelListener) {
161 ChannelDialer::new()
162}
diff --git a/src/client_channel.rs b/src/client_channel.rs
new file mode 100644
index 0000000..5666a16
--- /dev/null
+++ b/src/client_channel.rs
@@ -0,0 +1,124 @@
1use std::{collections::HashMap, sync::Arc};
2
3use bytes::Bytes;
4use futures::{SinkExt, StreamExt};
5use tokio::{
6 sync::{mpsc, oneshot},
7 task::AbortHandle,
8};
9
10use crate::{
11 Channel,
12 protocol::{RpcCall, RpcMessage, RpcResponse},
13};
14
15const CLIENT_CHANNEL_BUFFER_SIZE: usize = 64;
16
17struct ClientChannelMessage {
18 service: String,
19 method: String,
20 arguments: Bytes,
21 responder: oneshot::Sender<std::io::Result<Bytes>>,
22}
23
24struct ClientChannelInner {
25 sender: mpsc::Sender<ClientChannelMessage>,
26 abort_handle: AbortHandle,
27}
28
29impl Drop for ClientChannelInner {
30 fn drop(&mut self) {
31 self.abort_handle.abort();
32 }
33}
34
35#[derive(Clone)]
36pub struct ClientChannel(Arc<ClientChannelInner>);
37
38impl ClientChannel {
39 pub fn new<C: Channel>(channel: C) -> Self {
40 let (tx, rx) = mpsc::channel(CLIENT_CHANNEL_BUFFER_SIZE);
41 let abort_handle = tokio::spawn(client_channel_loop(channel, rx)).abort_handle();
42 Self(Arc::new(ClientChannelInner {
43 sender: tx,
44 abort_handle,
45 }))
46 }
47
48 pub async fn call(
49 &self,
50 service: String,
51 method: String,
52 arguments: Bytes,
53 ) -> std::io::Result<Bytes> {
54 let (tx, rx) = oneshot::channel();
55 self.0
56 .sender
57 .send(ClientChannelMessage {
58 service,
59 method,
60 arguments,
61 responder: tx,
62 })
63 .await
64 .expect("client channel task should never shutdown while a client is alive");
65 rx.await
66 .expect("client channel task should never shutdown while a client is alive")
67 }
68}
69
70async fn client_channel_loop<C: Channel>(
71 mut channel: C,
72 mut rx: mpsc::Receiver<ClientChannelMessage>,
73) {
74 enum Select {
75 RpcMessage(RpcMessage),
76 ClientChannelMessage(ClientChannelMessage),
77 }
78
79 let mut responders = HashMap::<u64, oneshot::Sender<std::io::Result<Bytes>>>::default();
80 let mut rpc_call_id = 0;
81
82 loop {
83 let select = tokio::select! {
84 Some(Ok(v)) = channel.next() => Select::RpcMessage(v),
85 Some(v) = rx.recv() => Select::ClientChannelMessage(v),
86 };
87
88 match select {
89 Select::RpcMessage(RpcMessage::Response(RpcResponse { id, value })) => {
90 if let Some(responder) = responders.remove(&id) {
91 let _ = responder.send(Ok(value));
92 }
93 }
94 Select::RpcMessage(_) => todo!(),
95 Select::ClientChannelMessage(ClientChannelMessage {
96 service,
97 method,
98 arguments,
99 responder,
100 }) => {
101 let id = rpc_call_id;
102 rpc_call_id += 1;
103
104 let result = channel
105 .send(RpcMessage::Call(RpcCall {
106 id,
107 service,
108 method,
109 arguments,
110 }))
111 .await;
112
113 match result {
114 Ok(()) => {
115 responders.insert(id, responder);
116 }
117 Err(err) => {
118 let _ = responder.send(Err(err));
119 }
120 }
121 }
122 }
123 }
124}
diff --git a/src/internal.rs b/src/internal.rs
new file mode 100644
index 0000000..a3c203a
--- /dev/null
+++ b/src/internal.rs
@@ -0,0 +1,2 @@
1pub use bincode;
2pub use bytes;
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..e65f659
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,52 @@
1#[doc(hidden)]
2pub mod internal;
3
4pub mod channel;
5pub mod protocol;
6pub mod tcp;
7pub mod unix;
8
9mod client_channel;
10mod server;
11
12pub use client_channel::ClientChannel;
13pub use server::Server;
14pub use urpc_macro::service;
15
16use protocol::RpcMessage;
17
18use std::pin::Pin;
19use std::future::Future;
20
21use bytes::Bytes;
22use futures::{Sink, Stream};
23
24#[derive(Debug, Default)]
25pub struct Context;
26
27pub trait Service: Send + Sync + 'static {
28 fn name() -> &'static str
29 where
30 Self: Sized;
31
32 fn call(
33 &self,
34 method: String,
35 arguments: Bytes,
36 ) -> Pin<Box<dyn Future<Output = std::io::Result<Bytes>> + Send + '_>>;
37}
38
39pub trait Channel:
40 Stream<Item = std::io::Result<RpcMessage>>
41 + Sink<RpcMessage, Error = std::io::Error>
42 + Send
43 + Unpin
44 + 'static
45{
46}
47
48pub trait Listener<C>: Stream<Item = std::io::Result<C>> + Send + Unpin + 'static
49where
50 C: Channel,
51{
52}
diff --git a/src/protocol.rs b/src/protocol.rs
new file mode 100644
index 0000000..baf886b
--- /dev/null
+++ b/src/protocol.rs
@@ -0,0 +1,107 @@
1//! Types used by the RPC protocol.
2use bytes::Bytes;
3use serde::{Deserialize, Serialize};
4use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
5
6#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
7pub struct RpcCall {
8 pub id: u64,
9 pub service: String,
10 pub method: String,
11 pub arguments: Bytes,
12}
13
14#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15pub struct RpcCancel {
16 pub id: u64,
17}
18
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20pub struct RpcResponse {
21 pub id: u64,
22 pub value: Bytes,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
26pub enum RpcMessage {
27 Call(RpcCall),
28 Response(RpcResponse),
29 Cancel(RpcCancel),
30}
31
32#[derive(Debug)]
33pub enum RpcError<E> {
34 Transport(std::io::Error),
35 Remote(E),
36}
37
38impl<E> std::fmt::Display for RpcError<E>
39where
40 E: std::fmt::Display,
41{
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 match self {
44 RpcError::Transport(error) => write!(f, "transport error: {error}"),
45 RpcError::Remote(error) => write!(f, "remote error: {error}"),
46 }
47 }
48}
49
50impl<E> std::error::Error for RpcError<E>
51where
52 E: std::error::Error,
53{
54 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
55 match self {
56 RpcError::Transport(error) => error.source(),
57 RpcError::Remote(error) => error.source(),
58 }
59 }
60}
61
62impl<E> From<std::io::Error> for RpcError<E> {
63 fn from(value: std::io::Error) -> Self {
64 Self::Transport(value)
65 }
66}
67
68#[derive(Default)]
69pub struct RpcMessageCodec(LengthDelimitedCodec);
70
71impl Encoder<RpcMessage> for RpcMessageCodec {
72 type Error = std::io::Error;
73
74 fn encode(
75 &mut self,
76 item: RpcMessage,
77 dst: &mut bytes::BytesMut,
78 ) -> std::result::Result<(), Self::Error> {
79 let encoded = bincode::serde::encode_to_vec(&item, bincode::config::standard())
80 .map_err(std::io::Error::other)?;
81 let encoded = Bytes::from(encoded);
82 self.0.encode(encoded, dst).map_err(std::io::Error::other)?;
83 Ok(())
84 }
85}
86
87impl Decoder for RpcMessageCodec {
88 type Item = RpcMessage;
89
90 type Error = std::io::Error;
91
92 fn decode(
93 &mut self,
94 src: &mut bytes::BytesMut,
95 ) -> std::result::Result<Option<Self::Item>, Self::Error> {
96 match self.0.decode(src) {
97 Ok(Some(frame)) => {
98 let (message, _) =
99 bincode::serde::decode_from_slice(&frame, bincode::config::standard())
100 .map_err(std::io::Error::other)?;
101 Ok(Some(message))
102 }
103 Ok(None) => Ok(None),
104 Err(err) => Err(std::io::Error::other(err)),
105 }
106 }
107}
diff --git a/src/server.rs b/src/server.rs
new file mode 100644
index 0000000..414340e
--- /dev/null
+++ b/src/server.rs
@@ -0,0 +1,166 @@
1use std::{collections::HashMap, sync::Arc};
2
3use futures::{SinkExt, StreamExt};
4use tokio::task::{AbortHandle, JoinSet};
5
6use crate::{
7 Channel, Listener, Service,
8 protocol::{RpcCall, RpcCancel, RpcMessage, RpcResponse},
9};
10
11#[derive(Clone)]
12struct Services(Arc<HashMap<String, Arc<dyn Service>>>);
13
14impl Services {
15 fn new(services: HashMap<String, Arc<dyn Service>>) -> Self {
16 Self(Arc::new(services))
17 }
18
19 fn get_service(&self, name: &str) -> std::io::Result<Arc<dyn Service>> {
20 match self.0.get(name) {
21 Some(service) => Ok(service.clone()),
22 None => Err(std::io::Error::new(
23 std::io::ErrorKind::NotFound,
24 "service not found",
25 )),
26 }
27 }
28}
29
30type ListenerSpawner =
31 Box<dyn FnOnce(&mut JoinSet<std::io::Result<()>>, Services) -> AbortHandle + Send + 'static>;
32
33#[derive(Debug, Default)]
34struct AbortHandles(Vec<AbortHandle>);
35
36impl Drop for AbortHandles {
37 fn drop(&mut self) {
38 for handle in &self.0 {
39 handle.abort();
40 }
41 }
42}
43
44impl AbortHandles {
45 pub fn push(&mut self, handle: AbortHandle) {
46 self.0.push(handle);
47 }
48}
49
50#[derive(Default)]
51pub struct Server {
52 services: HashMap<String, Arc<dyn Service>>,
53 listener_spawners: Vec<ListenerSpawner>,
54}
55
56impl Server {
57 pub fn with_service<T>(mut self, service: T) -> Self
58 where
59 T: Service,
60 {
61 let name = T::name();
62 let service = Arc::new(service);
63 self.services.insert(name.to_string(), service);
64 self
65 }
66
67 pub fn with_listener<L, C>(mut self, listener: L) -> Self
68 where
69 C: Channel,
70 L: Listener<C>,
71 {
72 self.listener_spawners
73 .push(Box::new(move |join_set, services| {
74 join_set.spawn(listener_loop(listener, services))
75 }));
76 self
77 }
78
79 pub async fn serve(self) -> std::io::Result<()> {
80 let services = Services::new(self.services);
81 let mut join_set = JoinSet::default();
82 let mut abort_handles = AbortHandles::default();
83 for spawner in self.listener_spawners {
84 let abort_handle = (spawner)(&mut join_set, services.clone());
85 abort_handles.push(abort_handle);
86 }
87 match join_set.join_next().await {
88 Some(Ok(Ok(()))) => Ok(()),
89 Some(Ok(Err(err))) => Err(err),
90 Some(Err(err)) => Err(std::io::Error::other(err)),
91 None => Ok(()),
92 }
93 }
94}
95
96async fn listener_loop<L, C>(mut listener: L, services: Services) -> std::io::Result<()>
97where
98 C: Channel,
99 L: Listener<C>,
100{
101 while let Some(result) = listener.next().await {
102 let channel = result?;
103 let services = services.clone();
104 tokio::spawn(channel_handler(channel, services));
105 }
106 Ok(())
107}
108
109async fn channel_handler<C: Channel>(mut channel: C, services: Services) -> std::io::Result<()> {
110 enum Select {
111 Empty,
112 Message(RpcMessage),
113 }
114
115 let (response_tx, mut response_rx) =
116 tokio::sync::mpsc::unbounded_channel::<std::io::Result<RpcResponse>>();
117 let mut requests: HashMap<u64, AbortHandle> = Default::default();
118 loop {
119 let select = tokio::select! {
120 reqopt = channel.next() => match reqopt {
121 Some(Ok(message)) => Select::Message(message),
122 Some(Err(err)) => return Err(err),
123 None => Select::Empty,
124 },
125 Some(response) = response_rx.recv() => match response {
126 Ok(response) => Select::Message(RpcMessage::Response(response)),
127 Err(err) => return Err(err),
128 }
129 };
130
131 match select {
132 Select::Empty => break,
133 Select::Message(message) => match message {
134 RpcMessage::Call(RpcCall {
135 id,
136 service,
137 method,
138 arguments,
139 }) => {
140 let response_tx = response_tx.clone();
141 let service = services.get_service(&service)?;
142 let handle = tokio::spawn(async move {
143 let response = match service.call(method, arguments).await {
144 Ok(value) => Ok(RpcResponse { id, value }),
145 Err(err) => Err(err),
146 };
147 let _ = response_tx.send(response);
148 })
149 .abort_handle();
150 requests.insert(id, handle);
151 }
152 RpcMessage::Cancel(RpcCancel { id }) => {
153 if let Some(handle) = requests.remove(&id) {
154 handle.abort();
155 }
156 }
157 RpcMessage::Response(response) => {
158 requests.remove(&response.id);
159 channel.send(RpcMessage::Response(response)).await?;
160 }
161 },
162 }
163 }
164
165 Ok(())
166}
diff --git a/src/tcp.rs b/src/tcp.rs
new file mode 100644
index 0000000..5684c41
--- /dev/null
+++ b/src/tcp.rs
@@ -0,0 +1,156 @@
1//! TCP backed channel and listener implementations.
2//!
3//! ```no_run
4//! #[urpc::service]
5//! trait Hello {
6//! type Error = ();
7//!
8//! async fn hello(name: String) -> String;
9//! }
10//!
11//! struct HelloServer;
12//!
13//! impl Hello for HelloServer {
14//! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> {
15//! Ok(format!("Hello, {name}!"))
16//! }
17//! }
18//!
19//! #[tokio::main]
20//! async fn main() -> Result<(), Box<dyn std::error::Error>>{
21//! let listener = urpc::tcp::bind("0.0.0.0:3000").await?;
22//!
23//! // spawn the server
24//! tokio::spawn(async move {
25//! urpc::Server::default()
26//! .with_listener(listener)
27//! .with_service(HelloServer.into_service())
28//! .serve()
29//! .await
30//! });
31//!
32//! // create a client
33//! let channel = urpc::ClientChannel::new(urpc::tcp::connect("127.0.0.1:3000").await?);
34//! let client = HelloClient::new(channel);
35//! let greeting = client.hello("World".into()).await.unwrap();
36//! assert_eq!(greeting, "Hello, World!");
37//! Ok(())
38//! }
39//! ```
40use std::pin::Pin;
41
42use futures::{Sink, Stream};
43use tokio::{
44 net::{TcpListener, TcpStream, ToSocketAddrs},
45 sync::mpsc::Receiver,
46 task::AbortHandle,
47};
48use tokio_util::codec::Framed;
49
50use crate::{
51 Channel, Listener,
52 protocol::{RpcMessage, RpcMessageCodec},
53};
54
55pub struct TcpChannel(Framed<TcpStream, RpcMessageCodec>);
56
57impl TcpChannel {
58 fn new(stream: TcpStream) -> Self {
59 Self(Framed::new(stream, RpcMessageCodec::default()))
60 }
61
62 pub async fn connect(addrs: impl ToSocketAddrs) -> std::io::Result<Self> {
63 let stream = TcpStream::connect(addrs).await?;
64 Ok(Self::new(stream))
65 }
66}
67
68impl Sink<RpcMessage> for TcpChannel {
69 type Error = std::io::Error;
70
71 fn poll_ready(
72 self: Pin<&mut Self>,
73 cx: &mut std::task::Context<'_>,
74 ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
75 Sink::poll_ready(Pin::new(&mut self.get_mut().0), cx)
76 }
77
78 fn start_send(self: Pin<&mut Self>, item: RpcMessage) -> std::result::Result<(), Self::Error> {
79 Sink::start_send(Pin::new(&mut self.get_mut().0), item)
80 }
81
82 fn poll_flush(
83 self: Pin<&mut Self>,
84 cx: &mut std::task::Context<'_>,
85 ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
86 Sink::poll_flush(Pin::new(&mut self.get_mut().0), cx)
87 }
88
89 fn poll_close(
90 self: Pin<&mut Self>,
91 cx: &mut std::task::Context<'_>,
92 ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
93 Sink::poll_close(Pin::new(&mut self.get_mut().0), cx)
94 }
95}
96
97impl Stream for TcpChannel {
98 type Item = std::io::Result<RpcMessage>;
99
100 fn poll_next(
101 self: Pin<&mut Self>,
102 cx: &mut std::task::Context<'_>,
103 ) -> std::task::Poll<Option<Self::Item>> {
104 Stream::poll_next(Pin::new(&mut self.get_mut().0), cx)
105 }
106}
107
108impl Channel for TcpChannel {}
109
110pub struct TcpChannelListener {
111 receiver: Receiver<TcpChannel>,
112 abort: AbortHandle,
113}
114
115impl Drop for TcpChannelListener {
116 fn drop(&mut self) {
117 self.abort.abort();
118 }
119}
120
121impl TcpChannelListener {
122 pub async fn bind(addrs: impl ToSocketAddrs) -> std::io::Result<Self> {
123 let listener = TcpListener::bind(addrs).await?;
124 let (sender, receiver) = tokio::sync::mpsc::channel(8);
125 let abort = tokio::spawn(async move {
126 while let Ok((stream, _addr)) = listener.accept().await {
127 if sender.send(TcpChannel::new(stream)).await.is_err() {
128 break;
129 }
130 }
131 })
132 .abort_handle();
133 Ok(Self { receiver, abort })
134 }
135}
136
137impl Stream for TcpChannelListener {
138 type Item = std::io::Result<TcpChannel>;
139
140 fn poll_next(
141 self: Pin<&mut Self>,
142 cx: &mut std::task::Context<'_>,
143 ) -> std::task::Poll<Option<Self::Item>> {
144 self.get_mut().receiver.poll_recv(cx).map(|v| v.map(Ok))
145 }
146}
147
148impl Listener<TcpChannel> for TcpChannelListener {}
149
150pub async fn bind(addrs: impl ToSocketAddrs) -> std::io::Result<TcpChannelListener> {
151 TcpChannelListener::bind(addrs).await
152}
153
154pub async fn connect(addrs: impl ToSocketAddrs) -> std::io::Result<TcpChannel> {
155 TcpChannel::connect(addrs).await
156}
diff --git a/src/unix.rs b/src/unix.rs
new file mode 100644
index 0000000..e241b64
--- /dev/null
+++ b/src/unix.rs
@@ -0,0 +1,160 @@
1//! UNIX Domain Socket backed channel and listener implementations.
2//!
3//! ```no_run
4//! #[urpc::service]
5//! trait Hello {
6//! type Error = ();
7//!
8//! async fn hello(name: String) -> String;
9//! }
10//!
11//! struct HelloServer;
12//!
13//! impl Hello for HelloServer {
14//! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> {
15//! Ok(format!("Hello, {name}!"))
16//! }
17//! }
18//!
19//! #[tokio::main]
20//! async fn main() -> Result<(), Box<dyn std::error::Error>>{
21//! let listener = urpc::unix::bind("./hello.service").await?;
22//!
23//! // spawn the server
24//! tokio::spawn(async move {
25//! urpc::Server::default()
26//! .with_listener(listener)
27//! .with_service(HelloServer.into_service())
28//! .serve()
29//! .await
30//! });
31//!
32//! // create a client
33//! let channel = urpc::ClientChannel::new(urpc::unix::connect("./hello.service").await?);
34//! let client = HelloClient::new(channel);
35//! let greeting = client.hello("World".into()).await.unwrap();
36//! assert_eq!(greeting, "Hello, World!");
37//! Ok(())
38//! }
39//! ```
40use std::{path::Path, pin::Pin};
41
42use futures::{Sink, Stream};
43use tokio::{
44 net::{UnixListener, UnixStream},
45 sync::mpsc::Receiver,
46 task::AbortHandle,
47};
48use tokio_util::codec::Framed;
49
50use crate::{
51 Channel, Listener,
52 protocol::{RpcMessage, RpcMessageCodec},
53};
54
55pub struct UnixChannel(Framed<UnixStream, RpcMessageCodec>);
56
57impl UnixChannel {
58 fn new(stream: UnixStream) -> Self {
59 Self(Framed::new(stream, RpcMessageCodec::default()))
60 }
61
62 pub async fn connect(path: impl AsRef<Path>) -> std::io::Result<Self> {
63 let stream = UnixStream::connect(path).await?;
64 Ok(Self::new(stream))
65 }
66}
67
68impl Sink<RpcMessage> for UnixChannel {
69 type Error = std::io::Error;
70
71 fn poll_ready(
72 self: Pin<&mut Self>,
73 cx: &mut std::task::Context<'_>,
74 ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
75 Sink::poll_ready(Pin::new(&mut self.get_mut().0), cx)
76 }
77
78 fn start_send(self: Pin<&mut Self>, item: RpcMessage) -> std::result::Result<(), Self::Error> {
79 Sink::start_send(Pin::new(&mut self.get_mut().0), item)
80 }
81
82 fn poll_flush(
83 self: Pin<&mut Self>,
84 cx: &mut std::task::Context<'_>,
85 ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
86 Sink::poll_flush(Pin::new(&mut self.get_mut().0), cx)
87 }
88
89 fn poll_close(
90 self: Pin<&mut Self>,
91 cx: &mut std::task::Context<'_>,
92 ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
93 Sink::poll_close(Pin::new(&mut self.get_mut().0), cx)
94 }
95}
96
97impl Stream for UnixChannel {
98 type Item = std::io::Result<RpcMessage>;
99
100 fn poll_next(
101 self: Pin<&mut Self>,
102 cx: &mut std::task::Context<'_>,
103 ) -> std::task::Poll<Option<Self::Item>> {
104 Stream::poll_next(Pin::new(&mut self.get_mut().0), cx)
105 }
106}
107
108impl Channel for UnixChannel {}
109
110pub struct UnixChannelListener {
111 receiver: Receiver<UnixChannel>,
112 abort: AbortHandle,
113}
114
115impl Drop for UnixChannelListener {
116 fn drop(&mut self) {
117 self.abort.abort();
118 }
119}
120
121impl UnixChannelListener {
122 pub async fn bind(path: impl AsRef<Path>) -> std::io::Result<Self> {
123 let path = path.as_ref();
124 if tokio::fs::try_exists(path).await? {
125 tokio::fs::remove_file(path).await?;
126 }
127 let listener = UnixListener::bind(path)?;
128 let (sender, receiver) = tokio::sync::mpsc::channel(8);
129 let abort = tokio::spawn(async move {
130 while let Ok((stream, _addr)) = listener.accept().await {
131 if sender.send(UnixChannel::new(stream)).await.is_err() {
132 break;
133 }
134 }
135 })
136 .abort_handle();
137 Ok(Self { receiver, abort })
138 }
139}
140
141impl Stream for UnixChannelListener {
142 type Item = std::io::Result<UnixChannel>;
143
144 fn poll_next(
145 self: Pin<&mut Self>,
146 cx: &mut std::task::Context<'_>,
147 ) -> std::task::Poll<Option<Self::Item>> {
148 self.get_mut().receiver.poll_recv(cx).map(|v| v.map(Ok))
149 }
150}
151
152impl Listener<UnixChannel> for UnixChannelListener {}
153
154pub async fn bind(path: impl AsRef<Path>) -> std::io::Result<UnixChannelListener> {
155 UnixChannelListener::bind(path).await
156}
157
158pub async fn connect(path: impl AsRef<Path>) -> std::io::Result<UnixChannel> {
159 UnixChannel::connect(path).await
160}
diff --git a/tests/tcp.rs b/tests/tcp.rs
new file mode 100644
index 0000000..a2f647d
--- /dev/null
+++ b/tests/tcp.rs
@@ -0,0 +1,34 @@
1#[tokio::test]
2async fn test_hello_service() {
3 #[urpc::service]
4 trait Hello {
5 type Error = ();
6
7 async fn hello(name: String) -> String;
8 }
9
10 struct HelloServer;
11
12 impl Hello for HelloServer {
13 async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> {
14 Ok(format!("Hello, {name}!"))
15 }
16 }
17
18 let listener = urpc::tcp::bind("0.0.0.0:3000").await.unwrap();
19
20 // spawn the server
21 tokio::spawn(async move {
22 urpc::Server::default()
23 .with_listener(listener)
24 .with_service(HelloServer.into_service())
25 .serve()
26 .await
27 });
28
29 // create a client
30 let channel = urpc::ClientChannel::new(urpc::tcp::connect("127.0.0.1:3000").await.unwrap());
31 let client = HelloClient::new(channel);
32 let greeting = client.hello("World".into()).await.unwrap();
33 assert_eq!(greeting, "Hello, World!");
34}
diff --git a/tests/unix.rs b/tests/unix.rs
new file mode 100644
index 0000000..bfa5a69
--- /dev/null
+++ b/tests/unix.rs
@@ -0,0 +1,34 @@
1#[tokio::test]
2async fn test_hello_service() {
3 #[urpc::service]
4 trait Hello {
5 type Error = ();
6
7 async fn hello(name: String) -> String;
8 }
9
10 struct HelloServer;
11
12 impl Hello for HelloServer {
13 async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> {
14 Ok(format!("Hello, {name}!"))
15 }
16 }
17
18 let listener = urpc::unix::bind("./hello.service").await.unwrap();
19
20 // spawn the server
21 tokio::spawn(async move {
22 urpc::Server::default()
23 .with_listener(listener)
24 .with_service(HelloServer.into_service())
25 .serve()
26 .await
27 });
28
29 // create a client
30 let channel = urpc::ClientChannel::new(urpc::unix::connect("./hello.service").await.unwrap());
31 let client = HelloClient::new(channel);
32 let greeting = client.hello("World".into()).await.unwrap();
33 assert_eq!(greeting, "Hello, World!");
34}
diff --git a/urpc-macro/Cargo.toml b/urpc-macro/Cargo.toml
new file mode 100644
index 0000000..16b5650
--- /dev/null
+++ b/urpc-macro/Cargo.toml
@@ -0,0 +1,18 @@
1[package]
2name = "urpc-macro"
3version = "0.1.0"
4edition = "2021"
5authors = ["Your Name <[email protected]>"]
6description = "Procedural macros for urpc"
7license = "MIT OR Apache-2.0"
8repository = "https://github.com/your-username/urpc"
9keywords = ["rpc", "async", "networking", "macros"]
10categories = ["network-programming", "asynchronous"]
11
12[lib]
13proc-macro = true
14
15[dependencies]
16proc-macro2 = "1.0.95"
17quote = "1.0.40"
18syn = { version = "2.0.101", features = ["full"] }
diff --git a/urpc-macro/src/lib.rs b/urpc-macro/src/lib.rs
new file mode 100644
index 0000000..4facc60
--- /dev/null
+++ b/urpc-macro/src/lib.rs
@@ -0,0 +1,282 @@
1use proc_macro2::{Punct, Spacing, Span, TokenStream};
2use quote::{ToTokens, TokenStreamExt};
3use syn::parse::Parse;
4
5struct Service {
6 vis: syn::Visibility,
7 ident: syn::Ident,
8 error: syn::Type,
9 methods: Vec<Method>,
10}
11
12impl Service {
13 fn server_trait_ident(&self) -> syn::Ident {
14 self.ident.clone()
15 }
16
17 fn client_struct_ident(&self) -> syn::Ident {
18 syn::Ident::new(&format!("{}Client", self.ident), Span::call_site())
19 }
20}
21
22impl Parse for Service {
23 fn parse(input: syn::parse::ParseStream) -> syn::Result<Self> {
24 let vis = input.parse()?;
25 input.parse::<syn::Token![trait]>()?;
26 let ident = input.parse()?;
27
28 let content;
29 syn::braced!(content in input);
30
31 content.parse::<syn::Token![type]>()?;
32 let error_ident = content.parse::<syn::Ident>()?;
33 if error_ident != "Error" {
34 return Err(syn::Error::new_spanned(
35 error_ident,
36 "expected Error associated type",
37 ));
38 }
39 content.parse::<syn::Token![=]>()?;
40 let error = content.parse()?;
41 content.parse::<syn::Token![;]>()?;
42
43 let mut methods = Vec::default();
44 while !content.is_empty() {
45 let method = content.parse()?;
46 methods.push(method);
47 }
48
49 Ok(Self {
50 vis,
51 ident,
52 error,
53 methods,
54 })
55 }
56}
57
58struct MethodArg {
59 ident: syn::Ident,
60 ty: syn::Type,
61}
62
63impl ToTokens for MethodArg {
64 fn to_tokens(&self, tokens: &mut TokenStream) {
65 self.ident.to_tokens(tokens);
66 tokens.append(Punct::new(':', Spacing::Alone));
67 self.ty.to_tokens(tokens);
68 }
69}
70
71impl Parse for MethodArg {
72 fn parse(input: syn::parse::ParseStream) -> syn::Result<Self> {
73 let ident = input.parse()?;
74 input.parse::<syn::Token![:]>()?;
75 let ty = input.parse()?;
76 Ok(Self { ident, ty })
77 }
78}
79
80struct Method {
81 ident: syn::Ident,
82 arguments: Vec<MethodArg>,
83 return_ty: Option<syn::Type>,
84}
85
86impl Parse for Method {
87 fn parse(input: syn::parse::ParseStream) -> syn::Result<Self> {
88 input.parse::<syn::Token![async]>()?;
89 input.parse::<syn::Token![fn]>()?;
90 let ident = input.parse::<syn::Ident>()?;
91 let arguments_content;
92 syn::parenthesized!(arguments_content in input);
93
94 let mut arguments = Vec::default();
95 while !arguments_content.is_empty() {
96 if !arguments.is_empty() {
97 arguments_content.parse::<syn::Token![,]>()?;
98 }
99 let argument = arguments_content.parse::<MethodArg>()?;
100 arguments.push(argument);
101 }
102
103 let return_ty = match input.peek(syn::Token![->]) {
104 true => {
105 input.parse::<syn::Token![->]>()?;
106 Some(input.parse()?)
107 }
108 false => None,
109 };
110
111 input.parse::<syn::Token![;]>()?;
112
113 Ok(Self {
114 ident,
115 arguments,
116 return_ty,
117 })
118 }
119}
120
121fn generate_service_into_service_match_arm(service: &Service, method: &Method) -> TokenStream {
122 let method_ident = &method.ident;
123 let method_name = method.ident.to_string();
124 let server_trait = service.server_trait_ident();
125 let arg_idents = method
126 .arguments
127 .iter()
128 .map(|arg| &arg.ident)
129 .collect::<Vec<_>>();
130
131 quote::quote! {
132 #method_name => {
133 let ((#(#arg_idents),*), _) =
134 urpc::internal::bincode::serde::decode_from_slice(&arguments, urpc::internal::bincode::config::standard()).map_err(std::io::Error::other)?;
135 let ctx = Default::default();
136 let ret = <T as #server_trait>::#method_ident(&self.0, ctx, #(#arg_idents),*).await;
137 let value = urpc::internal::bincode::serde::encode_to_vec(&ret, urpc::internal::bincode::config::standard()).map_err(std::io::Error::other)?;
138 Ok(From::from(value))
139 }
140 }
141}
142
143fn generate_service_into_service(service: &Service) -> TokenStream {
144 let server_trait = service.server_trait_ident();
145 let service_name = service.ident.to_string();
146 let match_arms = service
147 .methods
148 .iter()
149 .map(|m| generate_service_into_service_match_arm(service, m))
150 .collect::<Vec<_>>();
151 quote::quote! {
152 fn into_service(self) -> impl urpc::Service {
153 struct Adapter<T>(T);
154
155 impl<T> urpc::Service for Adapter<T>
156 where
157 T: #server_trait,
158 {
159 fn name() -> &'static str {
160 #service_name
161 }
162
163 fn call(
164 &self,
165 method: String,
166 arguments: urpc::internal::bytes::Bytes,
167 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = std::io::Result<urpc::internal::bytes::Bytes>> + Send + '_>> {
168 Box::pin(async move {
169 match method.as_str() {
170 #(#match_arms)*
171 _ => panic!("unknown method for service {}", "Router"),
172 }
173 })
174 }
175 }
176
177 Adapter(self)
178 }
179 }
180}
181
182fn generate_service_server_method(service: &Service, method: &Method) -> TokenStream {
183 let ident = &method.ident;
184 let service_error = &service.error;
185 let args = &method.arguments;
186 let return_ty = match &method.return_ty {
187 Some(t) => quote::quote! { std::result::Result<#t, #service_error> },
188 None => quote::quote! { std::result::Result<(), #service_error> },
189 };
190 quote::quote! {
191 fn #ident(&self, ctx: urpc::Context, #(#args),*) -> impl std::future::Future<Output = #return_ty> + Send;
192 }
193}
194
195fn generate_service_server(service: &Service) -> TokenStream {
196 let vis = &service.vis;
197 let ident = service.server_trait_ident();
198 let methods = service
199 .methods
200 .iter()
201 .map(|m| generate_service_server_method(service, m));
202 let into_service = generate_service_into_service(service);
203
204 quote::quote! {
205 #vis trait #ident : Sized + Send + Sync + 'static {
206 #(#methods)*
207 #into_service
208 }
209 }
210}
211
212fn generate_service_client_method(service: &Service, method: &Method) -> TokenStream {
213 let service_name = service.ident.to_string();
214 let service_error = &service.error;
215 let method_ident = &method.ident;
216 let method_string = method.ident.to_string();
217 let method_arg_idents = method
218 .arguments
219 .iter()
220 .map(|arg| &arg.ident)
221 .collect::<Vec<_>>();
222 let method_arg_types = method.arguments.iter().map(|arg| &arg.ty);
223 let method_return_type = match &method.return_ty {
224 Some(ty) => quote::quote! { #ty },
225 None => quote::quote! { () },
226 };
227 quote::quote! {
228 pub async fn #method_ident(&self, #(#method_arg_idents: #method_arg_types),*) -> std::result::Result<#method_return_type, urpc::protocol::RpcError<#service_error>> {
229 let service = String::from(#service_name);
230 let method = String::from(#method_string);
231 let arguments = (#(#method_arg_idents),*);
232 let arguments = match urpc::internal::bincode::serde::encode_to_vec(&arguments, urpc::internal::bincode::config::standard()) {
233 Ok(arguments) => From::from(arguments),
234 Err(err) => return Err(urpc::protocol::RpcError::Transport(std::io::Error::other(err))),
235 };
236 let response = match self.channel.call(service, method, arguments).await {
237 Ok(response) => response,
238 Err(err) => return Err(urpc::protocol::RpcError::Transport(err)),
239 };
240 match urpc::internal::bincode::serde::decode_from_slice::<std::result::Result<#method_return_type, #service_error>, _>(&response, urpc::internal::bincode::config::standard()) {
241 Ok((result, _)) => result.map_err(urpc::protocol::RpcError::Remote),
242 Err(err) => Err(urpc::protocol::RpcError::Transport(std::io::Error::other(err))),
243 }
244 }
245 }
246}
247
248fn generate_service_client(service: &Service) -> TokenStream {
249 let vis = &service.vis;
250 let client_ident = service.client_struct_ident();
251 let methods = service
252 .methods
253 .iter()
254 .map(|m| generate_service_client_method(service, m));
255 quote::quote! {
256 #vis struct #client_ident {
257 channel: urpc::ClientChannel
258 }
259
260 impl #client_ident {
261 pub fn new(channel: urpc::ClientChannel) -> Self {
262 Self { channel }
263 }
264
265 #(#methods)*
266 }
267 }
268}
269
270#[proc_macro_attribute]
271pub fn service(
272 _attr: proc_macro::TokenStream,
273 item: proc_macro::TokenStream,
274) -> proc_macro::TokenStream {
275 let service = syn::parse_macro_input!(item as Service);
276 let service_server = generate_service_server(&service);
277 let service_client = generate_service_client(&service);
278 From::from(quote::quote! {
279 #service_server
280 #service_client
281 })
282}