diff options
| author | diogo464 <[email protected]> | 2025-07-16 10:46:41 +0100 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-07-16 10:46:41 +0100 |
| commit | f319d7ab5278a3cfb43d38875d81c28cc2dce1e1 (patch) | |
| tree | cb161fd990643e267bbc373fb09ccd7b689a23b5 | |
Initial commit - extracted urpc from monorepo
| -rw-r--r-- | .gitignore | 22 | ||||
| -rw-r--r-- | Cargo.toml | 26 | ||||
| -rw-r--r-- | LICENSE-APACHE | 208 | ||||
| -rw-r--r-- | LICENSE-MIT | 21 | ||||
| -rw-r--r-- | README.md | 64 | ||||
| -rw-r--r-- | src/channel.rs | 162 | ||||
| -rw-r--r-- | src/client_channel.rs | 124 | ||||
| -rw-r--r-- | src/internal.rs | 2 | ||||
| -rw-r--r-- | src/lib.rs | 52 | ||||
| -rw-r--r-- | src/protocol.rs | 107 | ||||
| -rw-r--r-- | src/server.rs | 166 | ||||
| -rw-r--r-- | src/tcp.rs | 156 | ||||
| -rw-r--r-- | src/unix.rs | 160 | ||||
| -rw-r--r-- | tests/tcp.rs | 34 | ||||
| -rw-r--r-- | tests/unix.rs | 34 | ||||
| -rw-r--r-- | urpc-macro/Cargo.toml | 18 | ||||
| -rw-r--r-- | urpc-macro/src/lib.rs | 282 |
17 files changed, 1638 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3a9c95e --- /dev/null +++ b/.gitignore | |||
| @@ -0,0 +1,22 @@ | |||
| 1 | # Rust build artifacts | ||
| 2 | /target/ | ||
| 3 | Cargo.lock | ||
| 4 | |||
| 5 | # IDE files | ||
| 6 | .vscode/ | ||
| 7 | .idea/ | ||
| 8 | *.swp | ||
| 9 | *.swo | ||
| 10 | *~ | ||
| 11 | |||
| 12 | # OS generated files | ||
| 13 | .DS_Store | ||
| 14 | .DS_Store? | ||
| 15 | ._* | ||
| 16 | .Spotlight-V100 | ||
| 17 | .Trashes | ||
| 18 | ehthumbs.db | ||
| 19 | Thumbs.db | ||
| 20 | |||
| 21 | # Test artifacts | ||
| 22 | hello.service \ No newline at end of file | ||
diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..13032d9 --- /dev/null +++ b/Cargo.toml | |||
| @@ -0,0 +1,26 @@ | |||
| 1 | [package] | ||
| 2 | name = "urpc" | ||
| 3 | version = "0.1.0" | ||
| 4 | edition = "2021" | ||
| 5 | authors = ["Your Name <[email protected]>"] | ||
| 6 | description = "A minimalistic RPC framework for Rust" | ||
| 7 | license = "MIT OR Apache-2.0" | ||
| 8 | repository = "https://github.com/your-username/urpc" | ||
| 9 | keywords = ["rpc", "async", "networking"] | ||
| 10 | categories = ["network-programming", "asynchronous"] | ||
| 11 | |||
| 12 | [workspace] | ||
| 13 | members = ["urpc-macro"] | ||
| 14 | |||
| 15 | [dependencies] | ||
| 16 | urpc-macro = { path = "urpc-macro" } | ||
| 17 | |||
| 18 | bincode = { version = "2.0.1", features = ["serde"] } | ||
| 19 | bytes = { version = "1.10.1", features = ["serde"] } | ||
| 20 | futures = "0.3.31" | ||
| 21 | serde = { version = "1.0.219", features = ["derive"] } | ||
| 22 | tokio = { version = "1.44.1", features = ["full"] } | ||
| 23 | tokio-util = { version = "0.7.14", features = ["codec"] } | ||
| 24 | |||
| 25 | [dev-dependencies] | ||
| 26 | tokio-test = "0.4" | ||
diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..1d9467e --- /dev/null +++ b/LICENSE-APACHE | |||
| @@ -0,0 +1,208 @@ | |||
| 1 | Apache License | ||
| 2 | Version 2.0, January 2004 | ||
| 3 | http://www.apache.org/licenses/ | ||
| 4 | |||
| 5 | TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION | ||
| 6 | |||
| 7 | 1. Definitions. | ||
| 8 | |||
| 9 | "License" shall mean the terms and conditions for use, reproduction, | ||
| 10 | and distribution as defined by Sections 1 through 9 of this document. | ||
| 11 | |||
| 12 | "Licensor" shall mean the copyright owner or entity granting the License. | ||
| 13 | |||
| 14 | "Legal Entity" shall mean the union of the acting entity and all | ||
| 15 | other entities that control, are controlled by, or are under common | ||
| 16 | control with that entity. For the purposes of this definition, | ||
| 17 | "control" means (i) the power, direct or indirect, to cause the | ||
| 18 | direction or management of such entity, whether by contract or | ||
| 19 | otherwise, or (ii) ownership of fifty percent (50%) or more of the | ||
| 20 | outstanding shares, or (iii) beneficial ownership of such entity. | ||
| 21 | |||
| 22 | "You" (or "Your") shall mean an individual or Legal Entity | ||
| 23 | exercising permissions granted by this License. | ||
| 24 | |||
| 25 | "Source" form shall mean the preferred form for making modifications, | ||
| 26 | including but not limited to software source code, documentation | ||
| 27 | source, and configuration files. | ||
| 28 | |||
| 29 | "Object" form shall mean any form resulting from mechanical | ||
| 30 | transformation or translation of a Source form, including but | ||
| 31 | not limited to compiled object code, generated documentation, | ||
| 32 | and conversions to other media types. | ||
| 33 | |||
| 34 | "Work" shall mean the work of authorship, whether in Source or | ||
| 35 | Object form, made available under the License, as indicated by a | ||
| 36 | copyright notice that is included in or attached to the work | ||
| 37 | (which shall not include communication that is conspicuously | ||
| 38 | marked or otherwise designated in writing by the copyright owner | ||
| 39 | as "Not a Contribution"). | ||
| 40 | |||
| 41 | "Contribution" shall mean any work of authorship, including | ||
| 42 | the original version of the Work and any modifications or additions | ||
| 43 | to that Work or Derivative Works thereof, that is intentionally | ||
| 44 | submitted to Licensor for inclusion in the Work by the copyright | ||
| 45 | owner or by an individual or Legal Entity authorized to submit on | ||
| 46 | behalf of the copyright owner. For the purposes of this definition, | ||
| 47 | "submitted" means any form of electronic, verbal, or written | ||
| 48 | communication sent to the Licensor or its representatives, | ||
| 49 | including but not limited to communication on electronic mailing | ||
| 50 | lists, source code control systems, and issue tracking systems that | ||
| 51 | are managed by, or on behalf of, the Licensor for the purpose of | ||
| 52 | discussing and improving the Work, but excluding communication that | ||
| 53 | is conspicuously marked or otherwise designated in writing by the | ||
| 54 | copyright owner as "Not a Contribution". | ||
| 55 | |||
| 56 | 2. Grant of Copyright License. Subject to the terms and conditions of | ||
| 57 | this License, each Contributor hereby grants to You a perpetual, | ||
| 58 | worldwide, non-exclusive, no-charge, royalty-free, irrevocable | ||
| 59 | copyright license to use, reproduce, modify, distribute, prepare | ||
| 60 | Derivative Works of, publicly display, publicly perform, sublicense, | ||
| 61 | and sell the Work and to permit persons to whom the Work is | ||
| 62 | furnished to do so, subject to the following conditions: | ||
| 63 | |||
| 64 | The above copyright notice and this permission notice shall be | ||
| 65 | included in all copies or substantial portions of the Work. | ||
| 66 | |||
| 67 | 3. Grant of Patent License. Subject to the terms and conditions of | ||
| 68 | this License, each Contributor hereby grants to You a perpetual, | ||
| 69 | worldwide, non-exclusive, no-charge, royalty-free, irrevocable | ||
| 70 | (except as stated in this section) patent license to make, have made, | ||
| 71 | use, offer to sell, sell, import, and otherwise transfer the Work, | ||
| 72 | where such license applies only to those patent claims licensable | ||
| 73 | by such Contributor that are necessarily infringed by their | ||
| 74 | Contribution(s) alone or by combination of their Contribution(s) | ||
| 75 | with the Work to which such Contribution(s) was submitted. If You | ||
| 76 | institute patent litigation against any entity (including a | ||
| 77 | cross-claim or counterclaim in a lawsuit) alleging that the Work | ||
| 78 | or a Contribution incorporated within the Work constitutes direct | ||
| 79 | or contributory patent infringement, then any patent licenses | ||
| 80 | granted to You under this License for that Work shall terminate | ||
| 81 | as of the date such litigation is filed. | ||
| 82 | |||
| 83 | 4. Redistribution. You may reproduce and distribute copies of the | ||
| 84 | Work or Derivative Works thereof in any medium, with or without | ||
| 85 | modifications, and in Source or Object form, provided that You | ||
| 86 | meet the following conditions: | ||
| 87 | |||
| 88 | (a) You must give any other recipients of the Work or | ||
| 89 | Derivative Works a copy of this License; and | ||
| 90 | |||
| 91 | (b) You must cause any modified files to carry prominent notices | ||
| 92 | stating that You changed the files; and | ||
| 93 | |||
| 94 | (c) You must retain, in the Source form of any Derivative Works | ||
| 95 | that You distribute, all copyright, patent, trademark, and | ||
| 96 | attribution notices from the Source form of the Work, | ||
| 97 | excluding those notices that do not pertain to any part of | ||
| 98 | the Derivative Works; and | ||
| 99 | |||
| 100 | (d) If the Work includes a "NOTICE" text file as part of its | ||
| 101 | distribution, then any Derivative Works that You distribute must | ||
| 102 | include a readable copy of the attribution notices contained | ||
| 103 | within such NOTICE file, excluding those notices that do not | ||
| 104 | pertain to any part of the Derivative Works, in at least one | ||
| 105 | of the following places: within a NOTICE text file distributed | ||
| 106 | as part of the Derivative Works; within the Source form or | ||
| 107 | documentation, if provided along with the Derivative Works; or, | ||
| 108 | within a display generated by the Derivative Works, if and | ||
| 109 | wherever such third-party notices normally appear. The contents | ||
| 110 | of the NOTICE file are for informational purposes only and | ||
| 111 | do not modify the License. You may add Your own attribution | ||
| 112 | notices within Derivative Works that You distribute, alongside | ||
| 113 | or as an addendum to the NOTICE text from the Work, provided | ||
| 114 | that such additional attribution notices cannot be construed | ||
| 115 | as modifying the License. | ||
| 116 | |||
| 117 | You may add Your own copyright notice to Your modifications and | ||
| 118 | may provide additional or different license terms and conditions | ||
| 119 | for use, reproduction, or distribution of Your modifications, or | ||
| 120 | for any such Derivative Works as a whole, provided Your use, | ||
| 121 | reproduction, and distribution of the Work otherwise complies with | ||
| 122 | the conditions stated in this License. | ||
| 123 | |||
| 124 | 5. Submission of Contributions. Unless You explicitly state otherwise, | ||
| 125 | any Contribution intentionally submitted for inclusion in the Work | ||
| 126 | by You to the Licensor shall be under the terms and conditions of | ||
| 127 | this License, without any additional terms or conditions. | ||
| 128 | Notwithstanding the above, nothing herein shall supersede or modify | ||
| 129 | the terms of any separate license agreement you may have executed | ||
| 130 | with Licensor regarding such Contributions. | ||
| 131 | |||
| 132 | 6. Trademarks. This License does not grant permission to use the trade | ||
| 133 | names, trademarks, service marks, or product names of the Licensor, | ||
| 134 | except as required for reasonable and customary use in describing the | ||
| 135 | origin of the Work and reproducing the content of the NOTICE file. | ||
| 136 | |||
| 137 | 7. Disclaimer of Warranty. Unless required by applicable law or | ||
| 138 | agreed to in writing, Licensor provides the Work (and each | ||
| 139 | Contributor provides its Contributions) on an "AS IS" BASIS, | ||
| 140 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
| 141 | implied, including, without limitation, any warranties or conditions | ||
| 142 | of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A | ||
| 143 | PARTICULAR PURPOSE. You are solely responsible for determining the | ||
| 144 | appropriateness of using or redistributing the Work and assume any | ||
| 145 | risks associated with Your exercise of permissions under this License. | ||
| 146 | |||
| 147 | 8. Limitation of Liability. In no event and under no legal theory, | ||
| 148 | whether in tort (including negligence), contract, or otherwise, | ||
| 149 | unless required by applicable law (such as deliberate and grossly | ||
| 150 | negligent acts) or agreed to in writing, shall any Contributor be | ||
| 151 | liable to You for damages, including any direct, indirect, special, | ||
| 152 | incidental, or consequential damages of any character arising as a | ||
| 153 | result of this License or out of the use or inability to use the | ||
| 154 | Work (including but not limited to damages for loss of goodwill, | ||
| 155 | work stoppage, computer failure or malfunction, or any and all | ||
| 156 | other commercial damages or losses), even if such Contributor | ||
| 157 | has been advised of the possibility of such damages. | ||
| 158 | |||
| 159 | 9. Accepting Warranty or Support. You can have agreements requiring | ||
| 160 | that any person who redistributes the Work or Derivative Works | ||
| 161 | thereof provide a warranty or support for the Work, but You may | ||
| 162 | not do so on behalf of any Contributor or impose any warranty | ||
| 163 | or support obligations on any Contributor. Any such warranty or | ||
| 164 | support must be offered by You alone, and You hereby agree to | ||
| 165 | indemnify any Contributor for any liability incurred by such | ||
| 166 | Contributor as a result of warranty or support terms offered by You. | ||
| 167 | |||
| 168 | 10. No Trademark License. This License does not grant permission to use | ||
| 169 | the trade names, trademarks, service marks, or product names of the | ||
| 170 | Licensor, except as required for reasonable and customary use in | ||
| 171 | describing the origin of the Work and reproducing the content of | ||
| 172 | the NOTICE file. | ||
| 173 | |||
| 174 | 11. Additional Terms. You may choose to offer, and to charge a fee for, | ||
| 175 | warranty, support, indemnity, or other obligations and/or rights | ||
| 176 | consistent with this License. However, in accepting such obligations, | ||
| 177 | You may act only on Your own behalf and on Your sole responsibility, | ||
| 178 | not on behalf of any other Contributor, and only if You agree to | ||
| 179 | indemnify, defend, and hold each Contributor harmless for any | ||
| 180 | liability incurred by such Contributor as a result of warranty, | ||
| 181 | support, indemnity, or other terms You offer. | ||
| 182 | |||
| 183 | END OF TERMS AND CONDITIONS | ||
| 184 | |||
| 185 | APPENDIX: How to apply the Apache License to your work. | ||
| 186 | |||
| 187 | To apply the Apache License to your work, attach the following | ||
| 188 | boilerplate notice, with the fields enclosed by brackets "[]" | ||
| 189 | replaced with your own identifying information. (Don't include | ||
| 190 | the brackets!) The text should be enclosed in the appropriate | ||
| 191 | comment syntax for the file format. We also recommend that a | ||
| 192 | file or class name and description of purpose be included on the | ||
| 193 | same "about" line as the copyright notice for easier | ||
| 194 | identification within third-party archives. | ||
| 195 | |||
| 196 | Copyright 2025 urpc contributors | ||
| 197 | |||
| 198 | Licensed under the Apache License, Version 2.0 (the "License"); | ||
| 199 | you may not use this file except in compliance with the License. | ||
| 200 | You may obtain a copy of the License at | ||
| 201 | |||
| 202 | http://www.apache.org/licenses/LICENSE-2.0 | ||
| 203 | |||
| 204 | Unless required by applicable law or agreed to in writing, software | ||
| 205 | distributed under the License is distributed on an "AS IS" BASIS, | ||
| 206 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| 207 | See the License for the specific language governing permissions and | ||
| 208 | limitations under the License. \ No newline at end of file | ||
diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..c21a6e5 --- /dev/null +++ b/LICENSE-MIT | |||
| @@ -0,0 +1,21 @@ | |||
| 1 | MIT License | ||
| 2 | |||
| 3 | Copyright (c) 2025 urpc contributors | ||
| 4 | |||
| 5 | Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| 6 | of this software and associated documentation files (the "Software"), to deal | ||
| 7 | in the Software without restriction, including without limitation the rights | ||
| 8 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| 9 | copies of the Software, and to permit persons to whom the Software is | ||
| 10 | furnished to do so, subject to the following conditions: | ||
| 11 | |||
| 12 | The above copyright notice and this permission notice shall be included in all | ||
| 13 | copies or substantial portions of the Software. | ||
| 14 | |||
| 15 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| 16 | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| 17 | FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| 18 | AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| 19 | LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| 20 | OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| 21 | SOFTWARE. \ No newline at end of file | ||
diff --git a/README.md b/README.md new file mode 100644 index 0000000..6b84471 --- /dev/null +++ b/README.md | |||
| @@ -0,0 +1,64 @@ | |||
| 1 | # urpc | ||
| 2 | |||
| 3 | A minimalistic RPC framework for Rust, designed for simplicity and performance. | ||
| 4 | |||
| 5 | ## Features | ||
| 6 | |||
| 7 | - Async/await support | ||
| 8 | - TCP and Unix socket transport | ||
| 9 | - Procedural macros for service definitions | ||
| 10 | - Serialization with bincode | ||
| 11 | - Lightweight and fast | ||
| 12 | |||
| 13 | ## Example | ||
| 14 | |||
| 15 | ```rust | ||
| 16 | use urpc::*; | ||
| 17 | |||
| 18 | #[urpc::service] | ||
| 19 | trait Hello { | ||
| 20 | type Error = (); | ||
| 21 | |||
| 22 | async fn hello(name: String) -> String; | ||
| 23 | } | ||
| 24 | |||
| 25 | struct HelloServer; | ||
| 26 | |||
| 27 | impl Hello for HelloServer { | ||
| 28 | async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> { | ||
| 29 | Ok(format!("Hello, {}!", name)) | ||
| 30 | } | ||
| 31 | } | ||
| 32 | |||
| 33 | #[tokio::main] | ||
| 34 | async fn main() { | ||
| 35 | // Server | ||
| 36 | let listener = urpc::tcp::bind("127.0.0.1:8080").await.unwrap(); | ||
| 37 | |||
| 38 | tokio::spawn(async move { | ||
| 39 | urpc::Server::default() | ||
| 40 | .with_listener(listener) | ||
| 41 | .with_service(HelloServer.into_service()) | ||
| 42 | .serve() | ||
| 43 | .await | ||
| 44 | }); | ||
| 45 | |||
| 46 | // Client | ||
| 47 | let channel = urpc::ClientChannel::new( | ||
| 48 | urpc::tcp::connect("127.0.0.1:8080").await.unwrap() | ||
| 49 | ); | ||
| 50 | let client = HelloClient::new(channel); | ||
| 51 | |||
| 52 | let response = client.hello("World".to_string()).await.unwrap(); | ||
| 53 | println!("{}", response); // "Hello, World!" | ||
| 54 | } | ||
| 55 | ``` | ||
| 56 | |||
| 57 | ## License | ||
| 58 | |||
| 59 | Licensed under either of | ||
| 60 | |||
| 61 | - Apache License, Version 2.0 | ||
| 62 | - MIT License | ||
| 63 | |||
| 64 | at your option. \ No newline at end of file | ||
diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 0000000..f80ca87 --- /dev/null +++ b/src/channel.rs | |||
| @@ -0,0 +1,162 @@ | |||
| 1 | //! MPSC channel backed channel and listener implementations. | ||
| 2 | //! | ||
| 3 | //! This module provides an mpsc channel based listener/channel implementation that can be useful | ||
| 4 | //! to for tests. | ||
| 5 | //! | ||
| 6 | //! The current implementation uses [`tokio`]'s unbounded mpsc channels. | ||
| 7 | //! | ||
| 8 | //! ``` | ||
| 9 | //! #[urpc::service] | ||
| 10 | //! trait Hello { | ||
| 11 | //! type Error = (); | ||
| 12 | //! | ||
| 13 | //! async fn hello(name: String) -> String; | ||
| 14 | //! } | ||
| 15 | //! | ||
| 16 | //! struct HelloServer; | ||
| 17 | //! | ||
| 18 | //! impl Hello for HelloServer { | ||
| 19 | //! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> { | ||
| 20 | //! Ok(format!("Hello, {name}!")) | ||
| 21 | //! } | ||
| 22 | //! } | ||
| 23 | //! | ||
| 24 | //! #[tokio::main] | ||
| 25 | //! async fn main() -> Result<(), Box<dyn std::error::Error>>{ | ||
| 26 | //! let (dialer, listener) = urpc::channel::new(); | ||
| 27 | //! | ||
| 28 | //! // spawn the server | ||
| 29 | //! tokio::spawn(async move { | ||
| 30 | //! urpc::Server::default() | ||
| 31 | //! .with_listener(listener) | ||
| 32 | //! .with_service(HelloServer.into_service()) | ||
| 33 | //! .serve() | ||
| 34 | //! .await | ||
| 35 | //! }); | ||
| 36 | //! | ||
| 37 | //! // create a client | ||
| 38 | //! let channel = urpc::ClientChannel::new(dialer.connect()?); | ||
| 39 | //! let client = HelloClient::new(channel); | ||
| 40 | //! let greeting = client.hello("World".into()).await.unwrap(); | ||
| 41 | //! assert_eq!(greeting, "Hello, World!"); | ||
| 42 | //! Ok(()) | ||
| 43 | //! } | ||
| 44 | //! ``` | ||
| 45 | |||
| 46 | use futures::{Sink, Stream}; | ||
| 47 | use tokio::sync::mpsc; | ||
| 48 | |||
| 49 | use crate::protocol::RpcMessage; | ||
| 50 | |||
| 51 | pub struct ChannelListener { | ||
| 52 | receiver: mpsc::UnboundedReceiver<Channel>, | ||
| 53 | } | ||
| 54 | |||
| 55 | impl Stream for ChannelListener { | ||
| 56 | type Item = std::io::Result<Channel>; | ||
| 57 | |||
| 58 | fn poll_next( | ||
| 59 | self: std::pin::Pin<&mut Self>, | ||
| 60 | cx: &mut std::task::Context<'_>, | ||
| 61 | ) -> std::task::Poll<Option<Self::Item>> { | ||
| 62 | match self.get_mut().receiver.poll_recv(cx) { | ||
| 63 | std::task::Poll::Ready(Some(c)) => std::task::Poll::Ready(Some(Ok(c))), | ||
| 64 | std::task::Poll::Ready(None) => std::task::Poll::Ready(None), | ||
| 65 | std::task::Poll::Pending => std::task::Poll::Pending, | ||
| 66 | } | ||
| 67 | } | ||
| 68 | } | ||
| 69 | |||
| 70 | impl crate::Listener<Channel> for ChannelListener {} | ||
| 71 | |||
| 72 | pub struct ChannelDialer { | ||
| 73 | sender: mpsc::UnboundedSender<Channel>, | ||
| 74 | } | ||
| 75 | |||
| 76 | impl ChannelDialer { | ||
| 77 | fn new() -> (Self, ChannelListener) { | ||
| 78 | let (sender, receiver) = mpsc::unbounded_channel(); | ||
| 79 | (Self { sender }, ChannelListener { receiver }) | ||
| 80 | } | ||
| 81 | |||
| 82 | pub fn connect(&self) -> std::io::Result<Channel> { | ||
| 83 | let (ch1, ch2) = Channel::new(); | ||
| 84 | self.sender.send(ch1).expect("TODO: remove this"); | ||
| 85 | Ok(ch2) | ||
| 86 | } | ||
| 87 | } | ||
| 88 | |||
| 89 | pub struct Channel { | ||
| 90 | sender: mpsc::UnboundedSender<RpcMessage>, | ||
| 91 | receiver: mpsc::UnboundedReceiver<RpcMessage>, | ||
| 92 | } | ||
| 93 | |||
| 94 | impl Channel { | ||
| 95 | fn new() -> (Self, Self) { | ||
| 96 | let (sender0, receiver0) = mpsc::unbounded_channel(); | ||
| 97 | let (sender1, receiver1) = mpsc::unbounded_channel(); | ||
| 98 | ( | ||
| 99 | Self { | ||
| 100 | sender: sender0, | ||
| 101 | receiver: receiver1, | ||
| 102 | }, | ||
| 103 | Self { | ||
| 104 | sender: sender1, | ||
| 105 | receiver: receiver0, | ||
| 106 | }, | ||
| 107 | ) | ||
| 108 | } | ||
| 109 | } | ||
| 110 | |||
| 111 | impl Stream for Channel { | ||
| 112 | type Item = std::io::Result<RpcMessage>; | ||
| 113 | |||
| 114 | fn poll_next( | ||
| 115 | self: std::pin::Pin<&mut Self>, | ||
| 116 | cx: &mut std::task::Context<'_>, | ||
| 117 | ) -> std::task::Poll<Option<Self::Item>> { | ||
| 118 | match self.get_mut().receiver.poll_recv(cx) { | ||
| 119 | std::task::Poll::Ready(Some(msg)) => std::task::Poll::Ready(Some(Ok(msg))), | ||
| 120 | std::task::Poll::Ready(None) => std::task::Poll::Ready(None), | ||
| 121 | std::task::Poll::Pending => std::task::Poll::Pending, | ||
| 122 | } | ||
| 123 | } | ||
| 124 | } | ||
| 125 | |||
| 126 | impl Sink<RpcMessage> for Channel { | ||
| 127 | type Error = std::io::Error; | ||
| 128 | |||
| 129 | fn poll_ready( | ||
| 130 | self: std::pin::Pin<&mut Self>, | ||
| 131 | _cx: &mut std::task::Context<'_>, | ||
| 132 | ) -> std::task::Poll<Result<(), Self::Error>> { | ||
| 133 | return std::task::Poll::Ready(Ok(())); | ||
| 134 | } | ||
| 135 | |||
| 136 | fn start_send(self: std::pin::Pin<&mut Self>, item: RpcMessage) -> Result<(), Self::Error> { | ||
| 137 | match self.sender.send(item) { | ||
| 138 | Ok(()) => Ok(()), | ||
| 139 | Err(err) => Err(std::io::Error::other(err)), | ||
| 140 | } | ||
| 141 | } | ||
| 142 | |||
| 143 | fn poll_flush( | ||
| 144 | self: std::pin::Pin<&mut Self>, | ||
| 145 | _cx: &mut std::task::Context<'_>, | ||
| 146 | ) -> std::task::Poll<Result<(), Self::Error>> { | ||
| 147 | std::task::Poll::Ready(Ok(())) | ||
| 148 | } | ||
| 149 | |||
| 150 | fn poll_close( | ||
| 151 | self: std::pin::Pin<&mut Self>, | ||
| 152 | _cx: &mut std::task::Context<'_>, | ||
| 153 | ) -> std::task::Poll<Result<(), Self::Error>> { | ||
| 154 | std::task::Poll::Ready(Ok(())) | ||
| 155 | } | ||
| 156 | } | ||
| 157 | |||
| 158 | impl crate::Channel for Channel {} | ||
| 159 | |||
| 160 | pub fn new() -> (ChannelDialer, ChannelListener) { | ||
| 161 | ChannelDialer::new() | ||
| 162 | } | ||
diff --git a/src/client_channel.rs b/src/client_channel.rs new file mode 100644 index 0000000..5666a16 --- /dev/null +++ b/src/client_channel.rs | |||
| @@ -0,0 +1,124 @@ | |||
| 1 | use std::{collections::HashMap, sync::Arc}; | ||
| 2 | |||
| 3 | use bytes::Bytes; | ||
| 4 | use futures::{SinkExt, StreamExt}; | ||
| 5 | use tokio::{ | ||
| 6 | sync::{mpsc, oneshot}, | ||
| 7 | task::AbortHandle, | ||
| 8 | }; | ||
| 9 | |||
| 10 | use crate::{ | ||
| 11 | Channel, | ||
| 12 | protocol::{RpcCall, RpcMessage, RpcResponse}, | ||
| 13 | }; | ||
| 14 | |||
| 15 | const CLIENT_CHANNEL_BUFFER_SIZE: usize = 64; | ||
| 16 | |||
| 17 | struct ClientChannelMessage { | ||
| 18 | service: String, | ||
| 19 | method: String, | ||
| 20 | arguments: Bytes, | ||
| 21 | responder: oneshot::Sender<std::io::Result<Bytes>>, | ||
| 22 | } | ||
| 23 | |||
| 24 | struct ClientChannelInner { | ||
| 25 | sender: mpsc::Sender<ClientChannelMessage>, | ||
| 26 | abort_handle: AbortHandle, | ||
| 27 | } | ||
| 28 | |||
| 29 | impl Drop for ClientChannelInner { | ||
| 30 | fn drop(&mut self) { | ||
| 31 | self.abort_handle.abort(); | ||
| 32 | } | ||
| 33 | } | ||
| 34 | |||
| 35 | #[derive(Clone)] | ||
| 36 | pub struct ClientChannel(Arc<ClientChannelInner>); | ||
| 37 | |||
| 38 | impl ClientChannel { | ||
| 39 | pub fn new<C: Channel>(channel: C) -> Self { | ||
| 40 | let (tx, rx) = mpsc::channel(CLIENT_CHANNEL_BUFFER_SIZE); | ||
| 41 | let abort_handle = tokio::spawn(client_channel_loop(channel, rx)).abort_handle(); | ||
| 42 | Self(Arc::new(ClientChannelInner { | ||
| 43 | sender: tx, | ||
| 44 | abort_handle, | ||
| 45 | })) | ||
| 46 | } | ||
| 47 | |||
| 48 | pub async fn call( | ||
| 49 | &self, | ||
| 50 | service: String, | ||
| 51 | method: String, | ||
| 52 | arguments: Bytes, | ||
| 53 | ) -> std::io::Result<Bytes> { | ||
| 54 | let (tx, rx) = oneshot::channel(); | ||
| 55 | self.0 | ||
| 56 | .sender | ||
| 57 | .send(ClientChannelMessage { | ||
| 58 | service, | ||
| 59 | method, | ||
| 60 | arguments, | ||
| 61 | responder: tx, | ||
| 62 | }) | ||
| 63 | .await | ||
| 64 | .expect("client channel task should never shutdown while a client is alive"); | ||
| 65 | rx.await | ||
| 66 | .expect("client channel task should never shutdown while a client is alive") | ||
| 67 | } | ||
| 68 | } | ||
| 69 | |||
| 70 | async fn client_channel_loop<C: Channel>( | ||
| 71 | mut channel: C, | ||
| 72 | mut rx: mpsc::Receiver<ClientChannelMessage>, | ||
| 73 | ) { | ||
| 74 | enum Select { | ||
| 75 | RpcMessage(RpcMessage), | ||
| 76 | ClientChannelMessage(ClientChannelMessage), | ||
| 77 | } | ||
| 78 | |||
| 79 | let mut responders = HashMap::<u64, oneshot::Sender<std::io::Result<Bytes>>>::default(); | ||
| 80 | let mut rpc_call_id = 0; | ||
| 81 | |||
| 82 | loop { | ||
| 83 | let select = tokio::select! { | ||
| 84 | Some(Ok(v)) = channel.next() => Select::RpcMessage(v), | ||
| 85 | Some(v) = rx.recv() => Select::ClientChannelMessage(v), | ||
| 86 | }; | ||
| 87 | |||
| 88 | match select { | ||
| 89 | Select::RpcMessage(RpcMessage::Response(RpcResponse { id, value })) => { | ||
| 90 | if let Some(responder) = responders.remove(&id) { | ||
| 91 | let _ = responder.send(Ok(value)); | ||
| 92 | } | ||
| 93 | } | ||
| 94 | Select::RpcMessage(_) => todo!(), | ||
| 95 | Select::ClientChannelMessage(ClientChannelMessage { | ||
| 96 | service, | ||
| 97 | method, | ||
| 98 | arguments, | ||
| 99 | responder, | ||
| 100 | }) => { | ||
| 101 | let id = rpc_call_id; | ||
| 102 | rpc_call_id += 1; | ||
| 103 | |||
| 104 | let result = channel | ||
| 105 | .send(RpcMessage::Call(RpcCall { | ||
| 106 | id, | ||
| 107 | service, | ||
| 108 | method, | ||
| 109 | arguments, | ||
| 110 | })) | ||
| 111 | .await; | ||
| 112 | |||
| 113 | match result { | ||
| 114 | Ok(()) => { | ||
| 115 | responders.insert(id, responder); | ||
| 116 | } | ||
| 117 | Err(err) => { | ||
| 118 | let _ = responder.send(Err(err)); | ||
| 119 | } | ||
| 120 | } | ||
| 121 | } | ||
| 122 | } | ||
| 123 | } | ||
| 124 | } | ||
diff --git a/src/internal.rs b/src/internal.rs new file mode 100644 index 0000000..a3c203a --- /dev/null +++ b/src/internal.rs | |||
| @@ -0,0 +1,2 @@ | |||
| 1 | pub use bincode; | ||
| 2 | pub use bytes; | ||
diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..e65f659 --- /dev/null +++ b/src/lib.rs | |||
| @@ -0,0 +1,52 @@ | |||
| 1 | #[doc(hidden)] | ||
| 2 | pub mod internal; | ||
| 3 | |||
| 4 | pub mod channel; | ||
| 5 | pub mod protocol; | ||
| 6 | pub mod tcp; | ||
| 7 | pub mod unix; | ||
| 8 | |||
| 9 | mod client_channel; | ||
| 10 | mod server; | ||
| 11 | |||
| 12 | pub use client_channel::ClientChannel; | ||
| 13 | pub use server::Server; | ||
| 14 | pub use urpc_macro::service; | ||
| 15 | |||
| 16 | use protocol::RpcMessage; | ||
| 17 | |||
| 18 | use std::pin::Pin; | ||
| 19 | use std::future::Future; | ||
| 20 | |||
| 21 | use bytes::Bytes; | ||
| 22 | use futures::{Sink, Stream}; | ||
| 23 | |||
| 24 | #[derive(Debug, Default)] | ||
| 25 | pub struct Context; | ||
| 26 | |||
| 27 | pub trait Service: Send + Sync + 'static { | ||
| 28 | fn name() -> &'static str | ||
| 29 | where | ||
| 30 | Self: Sized; | ||
| 31 | |||
| 32 | fn call( | ||
| 33 | &self, | ||
| 34 | method: String, | ||
| 35 | arguments: Bytes, | ||
| 36 | ) -> Pin<Box<dyn Future<Output = std::io::Result<Bytes>> + Send + '_>>; | ||
| 37 | } | ||
| 38 | |||
| 39 | pub trait Channel: | ||
| 40 | Stream<Item = std::io::Result<RpcMessage>> | ||
| 41 | + Sink<RpcMessage, Error = std::io::Error> | ||
| 42 | + Send | ||
| 43 | + Unpin | ||
| 44 | + 'static | ||
| 45 | { | ||
| 46 | } | ||
| 47 | |||
| 48 | pub trait Listener<C>: Stream<Item = std::io::Result<C>> + Send + Unpin + 'static | ||
| 49 | where | ||
| 50 | C: Channel, | ||
| 51 | { | ||
| 52 | } | ||
diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 0000000..baf886b --- /dev/null +++ b/src/protocol.rs | |||
| @@ -0,0 +1,107 @@ | |||
| 1 | //! Types used by the RPC protocol. | ||
| 2 | use bytes::Bytes; | ||
| 3 | use serde::{Deserialize, Serialize}; | ||
| 4 | use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; | ||
| 5 | |||
| 6 | #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | ||
| 7 | pub struct RpcCall { | ||
| 8 | pub id: u64, | ||
| 9 | pub service: String, | ||
| 10 | pub method: String, | ||
| 11 | pub arguments: Bytes, | ||
| 12 | } | ||
| 13 | |||
| 14 | #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | ||
| 15 | pub struct RpcCancel { | ||
| 16 | pub id: u64, | ||
| 17 | } | ||
| 18 | |||
| 19 | #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | ||
| 20 | pub struct RpcResponse { | ||
| 21 | pub id: u64, | ||
| 22 | pub value: Bytes, | ||
| 23 | } | ||
| 24 | |||
| 25 | #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | ||
| 26 | pub enum RpcMessage { | ||
| 27 | Call(RpcCall), | ||
| 28 | Response(RpcResponse), | ||
| 29 | Cancel(RpcCancel), | ||
| 30 | } | ||
| 31 | |||
| 32 | #[derive(Debug)] | ||
| 33 | pub enum RpcError<E> { | ||
| 34 | Transport(std::io::Error), | ||
| 35 | Remote(E), | ||
| 36 | } | ||
| 37 | |||
| 38 | impl<E> std::fmt::Display for RpcError<E> | ||
| 39 | where | ||
| 40 | E: std::fmt::Display, | ||
| 41 | { | ||
| 42 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| 43 | match self { | ||
| 44 | RpcError::Transport(error) => write!(f, "transport error: {error}"), | ||
| 45 | RpcError::Remote(error) => write!(f, "remote error: {error}"), | ||
| 46 | } | ||
| 47 | } | ||
| 48 | } | ||
| 49 | |||
| 50 | impl<E> std::error::Error for RpcError<E> | ||
| 51 | where | ||
| 52 | E: std::error::Error, | ||
| 53 | { | ||
| 54 | fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { | ||
| 55 | match self { | ||
| 56 | RpcError::Transport(error) => error.source(), | ||
| 57 | RpcError::Remote(error) => error.source(), | ||
| 58 | } | ||
| 59 | } | ||
| 60 | } | ||
| 61 | |||
| 62 | impl<E> From<std::io::Error> for RpcError<E> { | ||
| 63 | fn from(value: std::io::Error) -> Self { | ||
| 64 | Self::Transport(value) | ||
| 65 | } | ||
| 66 | } | ||
| 67 | |||
| 68 | #[derive(Default)] | ||
| 69 | pub struct RpcMessageCodec(LengthDelimitedCodec); | ||
| 70 | |||
| 71 | impl Encoder<RpcMessage> for RpcMessageCodec { | ||
| 72 | type Error = std::io::Error; | ||
| 73 | |||
| 74 | fn encode( | ||
| 75 | &mut self, | ||
| 76 | item: RpcMessage, | ||
| 77 | dst: &mut bytes::BytesMut, | ||
| 78 | ) -> std::result::Result<(), Self::Error> { | ||
| 79 | let encoded = bincode::serde::encode_to_vec(&item, bincode::config::standard()) | ||
| 80 | .map_err(std::io::Error::other)?; | ||
| 81 | let encoded = Bytes::from(encoded); | ||
| 82 | self.0.encode(encoded, dst).map_err(std::io::Error::other)?; | ||
| 83 | Ok(()) | ||
| 84 | } | ||
| 85 | } | ||
| 86 | |||
| 87 | impl Decoder for RpcMessageCodec { | ||
| 88 | type Item = RpcMessage; | ||
| 89 | |||
| 90 | type Error = std::io::Error; | ||
| 91 | |||
| 92 | fn decode( | ||
| 93 | &mut self, | ||
| 94 | src: &mut bytes::BytesMut, | ||
| 95 | ) -> std::result::Result<Option<Self::Item>, Self::Error> { | ||
| 96 | match self.0.decode(src) { | ||
| 97 | Ok(Some(frame)) => { | ||
| 98 | let (message, _) = | ||
| 99 | bincode::serde::decode_from_slice(&frame, bincode::config::standard()) | ||
| 100 | .map_err(std::io::Error::other)?; | ||
| 101 | Ok(Some(message)) | ||
| 102 | } | ||
| 103 | Ok(None) => Ok(None), | ||
| 104 | Err(err) => Err(std::io::Error::other(err)), | ||
| 105 | } | ||
| 106 | } | ||
| 107 | } | ||
diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..414340e --- /dev/null +++ b/src/server.rs | |||
| @@ -0,0 +1,166 @@ | |||
| 1 | use std::{collections::HashMap, sync::Arc}; | ||
| 2 | |||
| 3 | use futures::{SinkExt, StreamExt}; | ||
| 4 | use tokio::task::{AbortHandle, JoinSet}; | ||
| 5 | |||
| 6 | use crate::{ | ||
| 7 | Channel, Listener, Service, | ||
| 8 | protocol::{RpcCall, RpcCancel, RpcMessage, RpcResponse}, | ||
| 9 | }; | ||
| 10 | |||
| 11 | #[derive(Clone)] | ||
| 12 | struct Services(Arc<HashMap<String, Arc<dyn Service>>>); | ||
| 13 | |||
| 14 | impl Services { | ||
| 15 | fn new(services: HashMap<String, Arc<dyn Service>>) -> Self { | ||
| 16 | Self(Arc::new(services)) | ||
| 17 | } | ||
| 18 | |||
| 19 | fn get_service(&self, name: &str) -> std::io::Result<Arc<dyn Service>> { | ||
| 20 | match self.0.get(name) { | ||
| 21 | Some(service) => Ok(service.clone()), | ||
| 22 | None => Err(std::io::Error::new( | ||
| 23 | std::io::ErrorKind::NotFound, | ||
| 24 | "service not found", | ||
| 25 | )), | ||
| 26 | } | ||
| 27 | } | ||
| 28 | } | ||
| 29 | |||
| 30 | type ListenerSpawner = | ||
| 31 | Box<dyn FnOnce(&mut JoinSet<std::io::Result<()>>, Services) -> AbortHandle + Send + 'static>; | ||
| 32 | |||
| 33 | #[derive(Debug, Default)] | ||
| 34 | struct AbortHandles(Vec<AbortHandle>); | ||
| 35 | |||
| 36 | impl Drop for AbortHandles { | ||
| 37 | fn drop(&mut self) { | ||
| 38 | for handle in &self.0 { | ||
| 39 | handle.abort(); | ||
| 40 | } | ||
| 41 | } | ||
| 42 | } | ||
| 43 | |||
| 44 | impl AbortHandles { | ||
| 45 | pub fn push(&mut self, handle: AbortHandle) { | ||
| 46 | self.0.push(handle); | ||
| 47 | } | ||
| 48 | } | ||
| 49 | |||
| 50 | #[derive(Default)] | ||
| 51 | pub struct Server { | ||
| 52 | services: HashMap<String, Arc<dyn Service>>, | ||
| 53 | listener_spawners: Vec<ListenerSpawner>, | ||
| 54 | } | ||
| 55 | |||
| 56 | impl Server { | ||
| 57 | pub fn with_service<T>(mut self, service: T) -> Self | ||
| 58 | where | ||
| 59 | T: Service, | ||
| 60 | { | ||
| 61 | let name = T::name(); | ||
| 62 | let service = Arc::new(service); | ||
| 63 | self.services.insert(name.to_string(), service); | ||
| 64 | self | ||
| 65 | } | ||
| 66 | |||
| 67 | pub fn with_listener<L, C>(mut self, listener: L) -> Self | ||
| 68 | where | ||
| 69 | C: Channel, | ||
| 70 | L: Listener<C>, | ||
| 71 | { | ||
| 72 | self.listener_spawners | ||
| 73 | .push(Box::new(move |join_set, services| { | ||
| 74 | join_set.spawn(listener_loop(listener, services)) | ||
| 75 | })); | ||
| 76 | self | ||
| 77 | } | ||
| 78 | |||
| 79 | pub async fn serve(self) -> std::io::Result<()> { | ||
| 80 | let services = Services::new(self.services); | ||
| 81 | let mut join_set = JoinSet::default(); | ||
| 82 | let mut abort_handles = AbortHandles::default(); | ||
| 83 | for spawner in self.listener_spawners { | ||
| 84 | let abort_handle = (spawner)(&mut join_set, services.clone()); | ||
| 85 | abort_handles.push(abort_handle); | ||
| 86 | } | ||
| 87 | match join_set.join_next().await { | ||
| 88 | Some(Ok(Ok(()))) => Ok(()), | ||
| 89 | Some(Ok(Err(err))) => Err(err), | ||
| 90 | Some(Err(err)) => Err(std::io::Error::other(err)), | ||
| 91 | None => Ok(()), | ||
| 92 | } | ||
| 93 | } | ||
| 94 | } | ||
| 95 | |||
| 96 | async fn listener_loop<L, C>(mut listener: L, services: Services) -> std::io::Result<()> | ||
| 97 | where | ||
| 98 | C: Channel, | ||
| 99 | L: Listener<C>, | ||
| 100 | { | ||
| 101 | while let Some(result) = listener.next().await { | ||
| 102 | let channel = result?; | ||
| 103 | let services = services.clone(); | ||
| 104 | tokio::spawn(channel_handler(channel, services)); | ||
| 105 | } | ||
| 106 | Ok(()) | ||
| 107 | } | ||
| 108 | |||
| 109 | async fn channel_handler<C: Channel>(mut channel: C, services: Services) -> std::io::Result<()> { | ||
| 110 | enum Select { | ||
| 111 | Empty, | ||
| 112 | Message(RpcMessage), | ||
| 113 | } | ||
| 114 | |||
| 115 | let (response_tx, mut response_rx) = | ||
| 116 | tokio::sync::mpsc::unbounded_channel::<std::io::Result<RpcResponse>>(); | ||
| 117 | let mut requests: HashMap<u64, AbortHandle> = Default::default(); | ||
| 118 | loop { | ||
| 119 | let select = tokio::select! { | ||
| 120 | reqopt = channel.next() => match reqopt { | ||
| 121 | Some(Ok(message)) => Select::Message(message), | ||
| 122 | Some(Err(err)) => return Err(err), | ||
| 123 | None => Select::Empty, | ||
| 124 | }, | ||
| 125 | Some(response) = response_rx.recv() => match response { | ||
| 126 | Ok(response) => Select::Message(RpcMessage::Response(response)), | ||
| 127 | Err(err) => return Err(err), | ||
| 128 | } | ||
| 129 | }; | ||
| 130 | |||
| 131 | match select { | ||
| 132 | Select::Empty => break, | ||
| 133 | Select::Message(message) => match message { | ||
| 134 | RpcMessage::Call(RpcCall { | ||
| 135 | id, | ||
| 136 | service, | ||
| 137 | method, | ||
| 138 | arguments, | ||
| 139 | }) => { | ||
| 140 | let response_tx = response_tx.clone(); | ||
| 141 | let service = services.get_service(&service)?; | ||
| 142 | let handle = tokio::spawn(async move { | ||
| 143 | let response = match service.call(method, arguments).await { | ||
| 144 | Ok(value) => Ok(RpcResponse { id, value }), | ||
| 145 | Err(err) => Err(err), | ||
| 146 | }; | ||
| 147 | let _ = response_tx.send(response); | ||
| 148 | }) | ||
| 149 | .abort_handle(); | ||
| 150 | requests.insert(id, handle); | ||
| 151 | } | ||
| 152 | RpcMessage::Cancel(RpcCancel { id }) => { | ||
| 153 | if let Some(handle) = requests.remove(&id) { | ||
| 154 | handle.abort(); | ||
| 155 | } | ||
| 156 | } | ||
| 157 | RpcMessage::Response(response) => { | ||
| 158 | requests.remove(&response.id); | ||
| 159 | channel.send(RpcMessage::Response(response)).await?; | ||
| 160 | } | ||
| 161 | }, | ||
| 162 | } | ||
| 163 | } | ||
| 164 | |||
| 165 | Ok(()) | ||
| 166 | } | ||
diff --git a/src/tcp.rs b/src/tcp.rs new file mode 100644 index 0000000..5684c41 --- /dev/null +++ b/src/tcp.rs | |||
| @@ -0,0 +1,156 @@ | |||
| 1 | //! TCP backed channel and listener implementations. | ||
| 2 | //! | ||
| 3 | //! ```no_run | ||
| 4 | //! #[urpc::service] | ||
| 5 | //! trait Hello { | ||
| 6 | //! type Error = (); | ||
| 7 | //! | ||
| 8 | //! async fn hello(name: String) -> String; | ||
| 9 | //! } | ||
| 10 | //! | ||
| 11 | //! struct HelloServer; | ||
| 12 | //! | ||
| 13 | //! impl Hello for HelloServer { | ||
| 14 | //! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> { | ||
| 15 | //! Ok(format!("Hello, {name}!")) | ||
| 16 | //! } | ||
| 17 | //! } | ||
| 18 | //! | ||
| 19 | //! #[tokio::main] | ||
| 20 | //! async fn main() -> Result<(), Box<dyn std::error::Error>>{ | ||
| 21 | //! let listener = urpc::tcp::bind("0.0.0.0:3000").await?; | ||
| 22 | //! | ||
| 23 | //! // spawn the server | ||
| 24 | //! tokio::spawn(async move { | ||
| 25 | //! urpc::Server::default() | ||
| 26 | //! .with_listener(listener) | ||
| 27 | //! .with_service(HelloServer.into_service()) | ||
| 28 | //! .serve() | ||
| 29 | //! .await | ||
| 30 | //! }); | ||
| 31 | //! | ||
| 32 | //! // create a client | ||
| 33 | //! let channel = urpc::ClientChannel::new(urpc::tcp::connect("127.0.0.1:3000").await?); | ||
| 34 | //! let client = HelloClient::new(channel); | ||
| 35 | //! let greeting = client.hello("World".into()).await.unwrap(); | ||
| 36 | //! assert_eq!(greeting, "Hello, World!"); | ||
| 37 | //! Ok(()) | ||
| 38 | //! } | ||
| 39 | //! ``` | ||
| 40 | use std::pin::Pin; | ||
| 41 | |||
| 42 | use futures::{Sink, Stream}; | ||
| 43 | use tokio::{ | ||
| 44 | net::{TcpListener, TcpStream, ToSocketAddrs}, | ||
| 45 | sync::mpsc::Receiver, | ||
| 46 | task::AbortHandle, | ||
| 47 | }; | ||
| 48 | use tokio_util::codec::Framed; | ||
| 49 | |||
| 50 | use crate::{ | ||
| 51 | Channel, Listener, | ||
| 52 | protocol::{RpcMessage, RpcMessageCodec}, | ||
| 53 | }; | ||
| 54 | |||
| 55 | pub struct TcpChannel(Framed<TcpStream, RpcMessageCodec>); | ||
| 56 | |||
| 57 | impl TcpChannel { | ||
| 58 | fn new(stream: TcpStream) -> Self { | ||
| 59 | Self(Framed::new(stream, RpcMessageCodec::default())) | ||
| 60 | } | ||
| 61 | |||
| 62 | pub async fn connect(addrs: impl ToSocketAddrs) -> std::io::Result<Self> { | ||
| 63 | let stream = TcpStream::connect(addrs).await?; | ||
| 64 | Ok(Self::new(stream)) | ||
| 65 | } | ||
| 66 | } | ||
| 67 | |||
| 68 | impl Sink<RpcMessage> for TcpChannel { | ||
| 69 | type Error = std::io::Error; | ||
| 70 | |||
| 71 | fn poll_ready( | ||
| 72 | self: Pin<&mut Self>, | ||
| 73 | cx: &mut std::task::Context<'_>, | ||
| 74 | ) -> std::task::Poll<std::result::Result<(), Self::Error>> { | ||
| 75 | Sink::poll_ready(Pin::new(&mut self.get_mut().0), cx) | ||
| 76 | } | ||
| 77 | |||
| 78 | fn start_send(self: Pin<&mut Self>, item: RpcMessage) -> std::result::Result<(), Self::Error> { | ||
| 79 | Sink::start_send(Pin::new(&mut self.get_mut().0), item) | ||
| 80 | } | ||
| 81 | |||
| 82 | fn poll_flush( | ||
| 83 | self: Pin<&mut Self>, | ||
| 84 | cx: &mut std::task::Context<'_>, | ||
| 85 | ) -> std::task::Poll<std::result::Result<(), Self::Error>> { | ||
| 86 | Sink::poll_flush(Pin::new(&mut self.get_mut().0), cx) | ||
| 87 | } | ||
| 88 | |||
| 89 | fn poll_close( | ||
| 90 | self: Pin<&mut Self>, | ||
| 91 | cx: &mut std::task::Context<'_>, | ||
| 92 | ) -> std::task::Poll<std::result::Result<(), Self::Error>> { | ||
| 93 | Sink::poll_close(Pin::new(&mut self.get_mut().0), cx) | ||
| 94 | } | ||
| 95 | } | ||
| 96 | |||
| 97 | impl Stream for TcpChannel { | ||
| 98 | type Item = std::io::Result<RpcMessage>; | ||
| 99 | |||
| 100 | fn poll_next( | ||
| 101 | self: Pin<&mut Self>, | ||
| 102 | cx: &mut std::task::Context<'_>, | ||
| 103 | ) -> std::task::Poll<Option<Self::Item>> { | ||
| 104 | Stream::poll_next(Pin::new(&mut self.get_mut().0), cx) | ||
| 105 | } | ||
| 106 | } | ||
| 107 | |||
| 108 | impl Channel for TcpChannel {} | ||
| 109 | |||
| 110 | pub struct TcpChannelListener { | ||
| 111 | receiver: Receiver<TcpChannel>, | ||
| 112 | abort: AbortHandle, | ||
| 113 | } | ||
| 114 | |||
| 115 | impl Drop for TcpChannelListener { | ||
| 116 | fn drop(&mut self) { | ||
| 117 | self.abort.abort(); | ||
| 118 | } | ||
| 119 | } | ||
| 120 | |||
| 121 | impl TcpChannelListener { | ||
| 122 | pub async fn bind(addrs: impl ToSocketAddrs) -> std::io::Result<Self> { | ||
| 123 | let listener = TcpListener::bind(addrs).await?; | ||
| 124 | let (sender, receiver) = tokio::sync::mpsc::channel(8); | ||
| 125 | let abort = tokio::spawn(async move { | ||
| 126 | while let Ok((stream, _addr)) = listener.accept().await { | ||
| 127 | if sender.send(TcpChannel::new(stream)).await.is_err() { | ||
| 128 | break; | ||
| 129 | } | ||
| 130 | } | ||
| 131 | }) | ||
| 132 | .abort_handle(); | ||
| 133 | Ok(Self { receiver, abort }) | ||
| 134 | } | ||
| 135 | } | ||
| 136 | |||
| 137 | impl Stream for TcpChannelListener { | ||
| 138 | type Item = std::io::Result<TcpChannel>; | ||
| 139 | |||
| 140 | fn poll_next( | ||
| 141 | self: Pin<&mut Self>, | ||
| 142 | cx: &mut std::task::Context<'_>, | ||
| 143 | ) -> std::task::Poll<Option<Self::Item>> { | ||
| 144 | self.get_mut().receiver.poll_recv(cx).map(|v| v.map(Ok)) | ||
| 145 | } | ||
| 146 | } | ||
| 147 | |||
| 148 | impl Listener<TcpChannel> for TcpChannelListener {} | ||
| 149 | |||
| 150 | pub async fn bind(addrs: impl ToSocketAddrs) -> std::io::Result<TcpChannelListener> { | ||
| 151 | TcpChannelListener::bind(addrs).await | ||
| 152 | } | ||
| 153 | |||
| 154 | pub async fn connect(addrs: impl ToSocketAddrs) -> std::io::Result<TcpChannel> { | ||
| 155 | TcpChannel::connect(addrs).await | ||
| 156 | } | ||
diff --git a/src/unix.rs b/src/unix.rs new file mode 100644 index 0000000..e241b64 --- /dev/null +++ b/src/unix.rs | |||
| @@ -0,0 +1,160 @@ | |||
| 1 | //! UNIX Domain Socket backed channel and listener implementations. | ||
| 2 | //! | ||
| 3 | //! ```no_run | ||
| 4 | //! #[urpc::service] | ||
| 5 | //! trait Hello { | ||
| 6 | //! type Error = (); | ||
| 7 | //! | ||
| 8 | //! async fn hello(name: String) -> String; | ||
| 9 | //! } | ||
| 10 | //! | ||
| 11 | //! struct HelloServer; | ||
| 12 | //! | ||
| 13 | //! impl Hello for HelloServer { | ||
| 14 | //! async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> { | ||
| 15 | //! Ok(format!("Hello, {name}!")) | ||
| 16 | //! } | ||
| 17 | //! } | ||
| 18 | //! | ||
| 19 | //! #[tokio::main] | ||
| 20 | //! async fn main() -> Result<(), Box<dyn std::error::Error>>{ | ||
| 21 | //! let listener = urpc::unix::bind("./hello.service").await?; | ||
| 22 | //! | ||
| 23 | //! // spawn the server | ||
| 24 | //! tokio::spawn(async move { | ||
| 25 | //! urpc::Server::default() | ||
| 26 | //! .with_listener(listener) | ||
| 27 | //! .with_service(HelloServer.into_service()) | ||
| 28 | //! .serve() | ||
| 29 | //! .await | ||
| 30 | //! }); | ||
| 31 | //! | ||
| 32 | //! // create a client | ||
| 33 | //! let channel = urpc::ClientChannel::new(urpc::unix::connect("./hello.service").await?); | ||
| 34 | //! let client = HelloClient::new(channel); | ||
| 35 | //! let greeting = client.hello("World".into()).await.unwrap(); | ||
| 36 | //! assert_eq!(greeting, "Hello, World!"); | ||
| 37 | //! Ok(()) | ||
| 38 | //! } | ||
| 39 | //! ``` | ||
| 40 | use std::{path::Path, pin::Pin}; | ||
| 41 | |||
| 42 | use futures::{Sink, Stream}; | ||
| 43 | use tokio::{ | ||
| 44 | net::{UnixListener, UnixStream}, | ||
| 45 | sync::mpsc::Receiver, | ||
| 46 | task::AbortHandle, | ||
| 47 | }; | ||
| 48 | use tokio_util::codec::Framed; | ||
| 49 | |||
| 50 | use crate::{ | ||
| 51 | Channel, Listener, | ||
| 52 | protocol::{RpcMessage, RpcMessageCodec}, | ||
| 53 | }; | ||
| 54 | |||
| 55 | pub struct UnixChannel(Framed<UnixStream, RpcMessageCodec>); | ||
| 56 | |||
| 57 | impl UnixChannel { | ||
| 58 | fn new(stream: UnixStream) -> Self { | ||
| 59 | Self(Framed::new(stream, RpcMessageCodec::default())) | ||
| 60 | } | ||
| 61 | |||
| 62 | pub async fn connect(path: impl AsRef<Path>) -> std::io::Result<Self> { | ||
| 63 | let stream = UnixStream::connect(path).await?; | ||
| 64 | Ok(Self::new(stream)) | ||
| 65 | } | ||
| 66 | } | ||
| 67 | |||
| 68 | impl Sink<RpcMessage> for UnixChannel { | ||
| 69 | type Error = std::io::Error; | ||
| 70 | |||
| 71 | fn poll_ready( | ||
| 72 | self: Pin<&mut Self>, | ||
| 73 | cx: &mut std::task::Context<'_>, | ||
| 74 | ) -> std::task::Poll<std::result::Result<(), Self::Error>> { | ||
| 75 | Sink::poll_ready(Pin::new(&mut self.get_mut().0), cx) | ||
| 76 | } | ||
| 77 | |||
| 78 | fn start_send(self: Pin<&mut Self>, item: RpcMessage) -> std::result::Result<(), Self::Error> { | ||
| 79 | Sink::start_send(Pin::new(&mut self.get_mut().0), item) | ||
| 80 | } | ||
| 81 | |||
| 82 | fn poll_flush( | ||
| 83 | self: Pin<&mut Self>, | ||
| 84 | cx: &mut std::task::Context<'_>, | ||
| 85 | ) -> std::task::Poll<std::result::Result<(), Self::Error>> { | ||
| 86 | Sink::poll_flush(Pin::new(&mut self.get_mut().0), cx) | ||
| 87 | } | ||
| 88 | |||
| 89 | fn poll_close( | ||
| 90 | self: Pin<&mut Self>, | ||
| 91 | cx: &mut std::task::Context<'_>, | ||
| 92 | ) -> std::task::Poll<std::result::Result<(), Self::Error>> { | ||
| 93 | Sink::poll_close(Pin::new(&mut self.get_mut().0), cx) | ||
| 94 | } | ||
| 95 | } | ||
| 96 | |||
| 97 | impl Stream for UnixChannel { | ||
| 98 | type Item = std::io::Result<RpcMessage>; | ||
| 99 | |||
| 100 | fn poll_next( | ||
| 101 | self: Pin<&mut Self>, | ||
| 102 | cx: &mut std::task::Context<'_>, | ||
| 103 | ) -> std::task::Poll<Option<Self::Item>> { | ||
| 104 | Stream::poll_next(Pin::new(&mut self.get_mut().0), cx) | ||
| 105 | } | ||
| 106 | } | ||
| 107 | |||
| 108 | impl Channel for UnixChannel {} | ||
| 109 | |||
| 110 | pub struct UnixChannelListener { | ||
| 111 | receiver: Receiver<UnixChannel>, | ||
| 112 | abort: AbortHandle, | ||
| 113 | } | ||
| 114 | |||
| 115 | impl Drop for UnixChannelListener { | ||
| 116 | fn drop(&mut self) { | ||
| 117 | self.abort.abort(); | ||
| 118 | } | ||
| 119 | } | ||
| 120 | |||
| 121 | impl UnixChannelListener { | ||
| 122 | pub async fn bind(path: impl AsRef<Path>) -> std::io::Result<Self> { | ||
| 123 | let path = path.as_ref(); | ||
| 124 | if tokio::fs::try_exists(path).await? { | ||
| 125 | tokio::fs::remove_file(path).await?; | ||
| 126 | } | ||
| 127 | let listener = UnixListener::bind(path)?; | ||
| 128 | let (sender, receiver) = tokio::sync::mpsc::channel(8); | ||
| 129 | let abort = tokio::spawn(async move { | ||
| 130 | while let Ok((stream, _addr)) = listener.accept().await { | ||
| 131 | if sender.send(UnixChannel::new(stream)).await.is_err() { | ||
| 132 | break; | ||
| 133 | } | ||
| 134 | } | ||
| 135 | }) | ||
| 136 | .abort_handle(); | ||
| 137 | Ok(Self { receiver, abort }) | ||
| 138 | } | ||
| 139 | } | ||
| 140 | |||
| 141 | impl Stream for UnixChannelListener { | ||
| 142 | type Item = std::io::Result<UnixChannel>; | ||
| 143 | |||
| 144 | fn poll_next( | ||
| 145 | self: Pin<&mut Self>, | ||
| 146 | cx: &mut std::task::Context<'_>, | ||
| 147 | ) -> std::task::Poll<Option<Self::Item>> { | ||
| 148 | self.get_mut().receiver.poll_recv(cx).map(|v| v.map(Ok)) | ||
| 149 | } | ||
| 150 | } | ||
| 151 | |||
| 152 | impl Listener<UnixChannel> for UnixChannelListener {} | ||
| 153 | |||
| 154 | pub async fn bind(path: impl AsRef<Path>) -> std::io::Result<UnixChannelListener> { | ||
| 155 | UnixChannelListener::bind(path).await | ||
| 156 | } | ||
| 157 | |||
| 158 | pub async fn connect(path: impl AsRef<Path>) -> std::io::Result<UnixChannel> { | ||
| 159 | UnixChannel::connect(path).await | ||
| 160 | } | ||
diff --git a/tests/tcp.rs b/tests/tcp.rs new file mode 100644 index 0000000..a2f647d --- /dev/null +++ b/tests/tcp.rs | |||
| @@ -0,0 +1,34 @@ | |||
| 1 | #[tokio::test] | ||
| 2 | async fn test_hello_service() { | ||
| 3 | #[urpc::service] | ||
| 4 | trait Hello { | ||
| 5 | type Error = (); | ||
| 6 | |||
| 7 | async fn hello(name: String) -> String; | ||
| 8 | } | ||
| 9 | |||
| 10 | struct HelloServer; | ||
| 11 | |||
| 12 | impl Hello for HelloServer { | ||
| 13 | async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> { | ||
| 14 | Ok(format!("Hello, {name}!")) | ||
| 15 | } | ||
| 16 | } | ||
| 17 | |||
| 18 | let listener = urpc::tcp::bind("0.0.0.0:3000").await.unwrap(); | ||
| 19 | |||
| 20 | // spawn the server | ||
| 21 | tokio::spawn(async move { | ||
| 22 | urpc::Server::default() | ||
| 23 | .with_listener(listener) | ||
| 24 | .with_service(HelloServer.into_service()) | ||
| 25 | .serve() | ||
| 26 | .await | ||
| 27 | }); | ||
| 28 | |||
| 29 | // create a client | ||
| 30 | let channel = urpc::ClientChannel::new(urpc::tcp::connect("127.0.0.1:3000").await.unwrap()); | ||
| 31 | let client = HelloClient::new(channel); | ||
| 32 | let greeting = client.hello("World".into()).await.unwrap(); | ||
| 33 | assert_eq!(greeting, "Hello, World!"); | ||
| 34 | } | ||
diff --git a/tests/unix.rs b/tests/unix.rs new file mode 100644 index 0000000..bfa5a69 --- /dev/null +++ b/tests/unix.rs | |||
| @@ -0,0 +1,34 @@ | |||
| 1 | #[tokio::test] | ||
| 2 | async fn test_hello_service() { | ||
| 3 | #[urpc::service] | ||
| 4 | trait Hello { | ||
| 5 | type Error = (); | ||
| 6 | |||
| 7 | async fn hello(name: String) -> String; | ||
| 8 | } | ||
| 9 | |||
| 10 | struct HelloServer; | ||
| 11 | |||
| 12 | impl Hello for HelloServer { | ||
| 13 | async fn hello(&self, _ctx: urpc::Context, name: String) -> Result<String, ()> { | ||
| 14 | Ok(format!("Hello, {name}!")) | ||
| 15 | } | ||
| 16 | } | ||
| 17 | |||
| 18 | let listener = urpc::unix::bind("./hello.service").await.unwrap(); | ||
| 19 | |||
| 20 | // spawn the server | ||
| 21 | tokio::spawn(async move { | ||
| 22 | urpc::Server::default() | ||
| 23 | .with_listener(listener) | ||
| 24 | .with_service(HelloServer.into_service()) | ||
| 25 | .serve() | ||
| 26 | .await | ||
| 27 | }); | ||
| 28 | |||
| 29 | // create a client | ||
| 30 | let channel = urpc::ClientChannel::new(urpc::unix::connect("./hello.service").await.unwrap()); | ||
| 31 | let client = HelloClient::new(channel); | ||
| 32 | let greeting = client.hello("World".into()).await.unwrap(); | ||
| 33 | assert_eq!(greeting, "Hello, World!"); | ||
| 34 | } | ||
diff --git a/urpc-macro/Cargo.toml b/urpc-macro/Cargo.toml new file mode 100644 index 0000000..16b5650 --- /dev/null +++ b/urpc-macro/Cargo.toml | |||
| @@ -0,0 +1,18 @@ | |||
| 1 | [package] | ||
| 2 | name = "urpc-macro" | ||
| 3 | version = "0.1.0" | ||
| 4 | edition = "2021" | ||
| 5 | authors = ["Your Name <[email protected]>"] | ||
| 6 | description = "Procedural macros for urpc" | ||
| 7 | license = "MIT OR Apache-2.0" | ||
| 8 | repository = "https://github.com/your-username/urpc" | ||
| 9 | keywords = ["rpc", "async", "networking", "macros"] | ||
| 10 | categories = ["network-programming", "asynchronous"] | ||
| 11 | |||
| 12 | [lib] | ||
| 13 | proc-macro = true | ||
| 14 | |||
| 15 | [dependencies] | ||
| 16 | proc-macro2 = "1.0.95" | ||
| 17 | quote = "1.0.40" | ||
| 18 | syn = { version = "2.0.101", features = ["full"] } | ||
diff --git a/urpc-macro/src/lib.rs b/urpc-macro/src/lib.rs new file mode 100644 index 0000000..4facc60 --- /dev/null +++ b/urpc-macro/src/lib.rs | |||
| @@ -0,0 +1,282 @@ | |||
| 1 | use proc_macro2::{Punct, Spacing, Span, TokenStream}; | ||
| 2 | use quote::{ToTokens, TokenStreamExt}; | ||
| 3 | use syn::parse::Parse; | ||
| 4 | |||
| 5 | struct Service { | ||
| 6 | vis: syn::Visibility, | ||
| 7 | ident: syn::Ident, | ||
| 8 | error: syn::Type, | ||
| 9 | methods: Vec<Method>, | ||
| 10 | } | ||
| 11 | |||
| 12 | impl Service { | ||
| 13 | fn server_trait_ident(&self) -> syn::Ident { | ||
| 14 | self.ident.clone() | ||
| 15 | } | ||
| 16 | |||
| 17 | fn client_struct_ident(&self) -> syn::Ident { | ||
| 18 | syn::Ident::new(&format!("{}Client", self.ident), Span::call_site()) | ||
| 19 | } | ||
| 20 | } | ||
| 21 | |||
| 22 | impl Parse for Service { | ||
| 23 | fn parse(input: syn::parse::ParseStream) -> syn::Result<Self> { | ||
| 24 | let vis = input.parse()?; | ||
| 25 | input.parse::<syn::Token![trait]>()?; | ||
| 26 | let ident = input.parse()?; | ||
| 27 | |||
| 28 | let content; | ||
| 29 | syn::braced!(content in input); | ||
| 30 | |||
| 31 | content.parse::<syn::Token![type]>()?; | ||
| 32 | let error_ident = content.parse::<syn::Ident>()?; | ||
| 33 | if error_ident != "Error" { | ||
| 34 | return Err(syn::Error::new_spanned( | ||
| 35 | error_ident, | ||
| 36 | "expected Error associated type", | ||
| 37 | )); | ||
| 38 | } | ||
| 39 | content.parse::<syn::Token![=]>()?; | ||
| 40 | let error = content.parse()?; | ||
| 41 | content.parse::<syn::Token![;]>()?; | ||
| 42 | |||
| 43 | let mut methods = Vec::default(); | ||
| 44 | while !content.is_empty() { | ||
| 45 | let method = content.parse()?; | ||
| 46 | methods.push(method); | ||
| 47 | } | ||
| 48 | |||
| 49 | Ok(Self { | ||
| 50 | vis, | ||
| 51 | ident, | ||
| 52 | error, | ||
| 53 | methods, | ||
| 54 | }) | ||
| 55 | } | ||
| 56 | } | ||
| 57 | |||
| 58 | struct MethodArg { | ||
| 59 | ident: syn::Ident, | ||
| 60 | ty: syn::Type, | ||
| 61 | } | ||
| 62 | |||
| 63 | impl ToTokens for MethodArg { | ||
| 64 | fn to_tokens(&self, tokens: &mut TokenStream) { | ||
| 65 | self.ident.to_tokens(tokens); | ||
| 66 | tokens.append(Punct::new(':', Spacing::Alone)); | ||
| 67 | self.ty.to_tokens(tokens); | ||
| 68 | } | ||
| 69 | } | ||
| 70 | |||
| 71 | impl Parse for MethodArg { | ||
| 72 | fn parse(input: syn::parse::ParseStream) -> syn::Result<Self> { | ||
| 73 | let ident = input.parse()?; | ||
| 74 | input.parse::<syn::Token![:]>()?; | ||
| 75 | let ty = input.parse()?; | ||
| 76 | Ok(Self { ident, ty }) | ||
| 77 | } | ||
| 78 | } | ||
| 79 | |||
| 80 | struct Method { | ||
| 81 | ident: syn::Ident, | ||
| 82 | arguments: Vec<MethodArg>, | ||
| 83 | return_ty: Option<syn::Type>, | ||
| 84 | } | ||
| 85 | |||
| 86 | impl Parse for Method { | ||
| 87 | fn parse(input: syn::parse::ParseStream) -> syn::Result<Self> { | ||
| 88 | input.parse::<syn::Token![async]>()?; | ||
| 89 | input.parse::<syn::Token![fn]>()?; | ||
| 90 | let ident = input.parse::<syn::Ident>()?; | ||
| 91 | let arguments_content; | ||
| 92 | syn::parenthesized!(arguments_content in input); | ||
| 93 | |||
| 94 | let mut arguments = Vec::default(); | ||
| 95 | while !arguments_content.is_empty() { | ||
| 96 | if !arguments.is_empty() { | ||
| 97 | arguments_content.parse::<syn::Token![,]>()?; | ||
| 98 | } | ||
| 99 | let argument = arguments_content.parse::<MethodArg>()?; | ||
| 100 | arguments.push(argument); | ||
| 101 | } | ||
| 102 | |||
| 103 | let return_ty = match input.peek(syn::Token![->]) { | ||
| 104 | true => { | ||
| 105 | input.parse::<syn::Token![->]>()?; | ||
| 106 | Some(input.parse()?) | ||
| 107 | } | ||
| 108 | false => None, | ||
| 109 | }; | ||
| 110 | |||
| 111 | input.parse::<syn::Token![;]>()?; | ||
| 112 | |||
| 113 | Ok(Self { | ||
| 114 | ident, | ||
| 115 | arguments, | ||
| 116 | return_ty, | ||
| 117 | }) | ||
| 118 | } | ||
| 119 | } | ||
| 120 | |||
| 121 | fn generate_service_into_service_match_arm(service: &Service, method: &Method) -> TokenStream { | ||
| 122 | let method_ident = &method.ident; | ||
| 123 | let method_name = method.ident.to_string(); | ||
| 124 | let server_trait = service.server_trait_ident(); | ||
| 125 | let arg_idents = method | ||
| 126 | .arguments | ||
| 127 | .iter() | ||
| 128 | .map(|arg| &arg.ident) | ||
| 129 | .collect::<Vec<_>>(); | ||
| 130 | |||
| 131 | quote::quote! { | ||
| 132 | #method_name => { | ||
| 133 | let ((#(#arg_idents),*), _) = | ||
| 134 | urpc::internal::bincode::serde::decode_from_slice(&arguments, urpc::internal::bincode::config::standard()).map_err(std::io::Error::other)?; | ||
| 135 | let ctx = Default::default(); | ||
| 136 | let ret = <T as #server_trait>::#method_ident(&self.0, ctx, #(#arg_idents),*).await; | ||
| 137 | let value = urpc::internal::bincode::serde::encode_to_vec(&ret, urpc::internal::bincode::config::standard()).map_err(std::io::Error::other)?; | ||
| 138 | Ok(From::from(value)) | ||
| 139 | } | ||
| 140 | } | ||
| 141 | } | ||
| 142 | |||
| 143 | fn generate_service_into_service(service: &Service) -> TokenStream { | ||
| 144 | let server_trait = service.server_trait_ident(); | ||
| 145 | let service_name = service.ident.to_string(); | ||
| 146 | let match_arms = service | ||
| 147 | .methods | ||
| 148 | .iter() | ||
| 149 | .map(|m| generate_service_into_service_match_arm(service, m)) | ||
| 150 | .collect::<Vec<_>>(); | ||
| 151 | quote::quote! { | ||
| 152 | fn into_service(self) -> impl urpc::Service { | ||
| 153 | struct Adapter<T>(T); | ||
| 154 | |||
| 155 | impl<T> urpc::Service for Adapter<T> | ||
| 156 | where | ||
| 157 | T: #server_trait, | ||
| 158 | { | ||
| 159 | fn name() -> &'static str { | ||
| 160 | #service_name | ||
| 161 | } | ||
| 162 | |||
| 163 | fn call( | ||
| 164 | &self, | ||
| 165 | method: String, | ||
| 166 | arguments: urpc::internal::bytes::Bytes, | ||
| 167 | ) -> std::pin::Pin<Box<dyn std::future::Future<Output = std::io::Result<urpc::internal::bytes::Bytes>> + Send + '_>> { | ||
| 168 | Box::pin(async move { | ||
| 169 | match method.as_str() { | ||
| 170 | #(#match_arms)* | ||
| 171 | _ => panic!("unknown method for service {}", "Router"), | ||
| 172 | } | ||
| 173 | }) | ||
| 174 | } | ||
| 175 | } | ||
| 176 | |||
| 177 | Adapter(self) | ||
| 178 | } | ||
| 179 | } | ||
| 180 | } | ||
| 181 | |||
| 182 | fn generate_service_server_method(service: &Service, method: &Method) -> TokenStream { | ||
| 183 | let ident = &method.ident; | ||
| 184 | let service_error = &service.error; | ||
| 185 | let args = &method.arguments; | ||
| 186 | let return_ty = match &method.return_ty { | ||
| 187 | Some(t) => quote::quote! { std::result::Result<#t, #service_error> }, | ||
| 188 | None => quote::quote! { std::result::Result<(), #service_error> }, | ||
| 189 | }; | ||
| 190 | quote::quote! { | ||
| 191 | fn #ident(&self, ctx: urpc::Context, #(#args),*) -> impl std::future::Future<Output = #return_ty> + Send; | ||
| 192 | } | ||
| 193 | } | ||
| 194 | |||
| 195 | fn generate_service_server(service: &Service) -> TokenStream { | ||
| 196 | let vis = &service.vis; | ||
| 197 | let ident = service.server_trait_ident(); | ||
| 198 | let methods = service | ||
| 199 | .methods | ||
| 200 | .iter() | ||
| 201 | .map(|m| generate_service_server_method(service, m)); | ||
| 202 | let into_service = generate_service_into_service(service); | ||
| 203 | |||
| 204 | quote::quote! { | ||
| 205 | #vis trait #ident : Sized + Send + Sync + 'static { | ||
| 206 | #(#methods)* | ||
| 207 | #into_service | ||
| 208 | } | ||
| 209 | } | ||
| 210 | } | ||
| 211 | |||
| 212 | fn generate_service_client_method(service: &Service, method: &Method) -> TokenStream { | ||
| 213 | let service_name = service.ident.to_string(); | ||
| 214 | let service_error = &service.error; | ||
| 215 | let method_ident = &method.ident; | ||
| 216 | let method_string = method.ident.to_string(); | ||
| 217 | let method_arg_idents = method | ||
| 218 | .arguments | ||
| 219 | .iter() | ||
| 220 | .map(|arg| &arg.ident) | ||
| 221 | .collect::<Vec<_>>(); | ||
| 222 | let method_arg_types = method.arguments.iter().map(|arg| &arg.ty); | ||
| 223 | let method_return_type = match &method.return_ty { | ||
| 224 | Some(ty) => quote::quote! { #ty }, | ||
| 225 | None => quote::quote! { () }, | ||
| 226 | }; | ||
| 227 | quote::quote! { | ||
| 228 | pub async fn #method_ident(&self, #(#method_arg_idents: #method_arg_types),*) -> std::result::Result<#method_return_type, urpc::protocol::RpcError<#service_error>> { | ||
| 229 | let service = String::from(#service_name); | ||
| 230 | let method = String::from(#method_string); | ||
| 231 | let arguments = (#(#method_arg_idents),*); | ||
| 232 | let arguments = match urpc::internal::bincode::serde::encode_to_vec(&arguments, urpc::internal::bincode::config::standard()) { | ||
| 233 | Ok(arguments) => From::from(arguments), | ||
| 234 | Err(err) => return Err(urpc::protocol::RpcError::Transport(std::io::Error::other(err))), | ||
| 235 | }; | ||
| 236 | let response = match self.channel.call(service, method, arguments).await { | ||
| 237 | Ok(response) => response, | ||
| 238 | Err(err) => return Err(urpc::protocol::RpcError::Transport(err)), | ||
| 239 | }; | ||
| 240 | match urpc::internal::bincode::serde::decode_from_slice::<std::result::Result<#method_return_type, #service_error>, _>(&response, urpc::internal::bincode::config::standard()) { | ||
| 241 | Ok((result, _)) => result.map_err(urpc::protocol::RpcError::Remote), | ||
| 242 | Err(err) => Err(urpc::protocol::RpcError::Transport(std::io::Error::other(err))), | ||
| 243 | } | ||
| 244 | } | ||
| 245 | } | ||
| 246 | } | ||
| 247 | |||
| 248 | fn generate_service_client(service: &Service) -> TokenStream { | ||
| 249 | let vis = &service.vis; | ||
| 250 | let client_ident = service.client_struct_ident(); | ||
| 251 | let methods = service | ||
| 252 | .methods | ||
| 253 | .iter() | ||
| 254 | .map(|m| generate_service_client_method(service, m)); | ||
| 255 | quote::quote! { | ||
| 256 | #vis struct #client_ident { | ||
| 257 | channel: urpc::ClientChannel | ||
| 258 | } | ||
| 259 | |||
| 260 | impl #client_ident { | ||
| 261 | pub fn new(channel: urpc::ClientChannel) -> Self { | ||
| 262 | Self { channel } | ||
| 263 | } | ||
| 264 | |||
| 265 | #(#methods)* | ||
| 266 | } | ||
| 267 | } | ||
| 268 | } | ||
| 269 | |||
| 270 | #[proc_macro_attribute] | ||
| 271 | pub fn service( | ||
| 272 | _attr: proc_macro::TokenStream, | ||
| 273 | item: proc_macro::TokenStream, | ||
| 274 | ) -> proc_macro::TokenStream { | ||
| 275 | let service = syn::parse_macro_input!(item as Service); | ||
| 276 | let service_server = generate_service_server(&service); | ||
| 277 | let service_client = generate_service_client(&service); | ||
| 278 | From::from(quote::quote! { | ||
| 279 | #service_server | ||
| 280 | #service_client | ||
| 281 | }) | ||
| 282 | } | ||
