aboutsummaryrefslogtreecommitdiff
path: root/src/protocol.rs
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-07-16 10:46:41 +0100
committerdiogo464 <[email protected]>2025-07-16 10:46:41 +0100
commitf319d7ab5278a3cfb43d38875d81c28cc2dce1e1 (patch)
treecb161fd990643e267bbc373fb09ccd7b689a23b5 /src/protocol.rs
Initial commit - extracted urpc from monorepo
Diffstat (limited to 'src/protocol.rs')
-rw-r--r--src/protocol.rs107
1 files changed, 107 insertions, 0 deletions
diff --git a/src/protocol.rs b/src/protocol.rs
new file mode 100644
index 0000000..baf886b
--- /dev/null
+++ b/src/protocol.rs
@@ -0,0 +1,107 @@
1//! Types used by the RPC protocol.
2use bytes::Bytes;
3use serde::{Deserialize, Serialize};
4use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
5
6#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
7pub struct RpcCall {
8 pub id: u64,
9 pub service: String,
10 pub method: String,
11 pub arguments: Bytes,
12}
13
14#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15pub struct RpcCancel {
16 pub id: u64,
17}
18
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20pub struct RpcResponse {
21 pub id: u64,
22 pub value: Bytes,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
26pub enum RpcMessage {
27 Call(RpcCall),
28 Response(RpcResponse),
29 Cancel(RpcCancel),
30}
31
32#[derive(Debug)]
33pub enum RpcError<E> {
34 Transport(std::io::Error),
35 Remote(E),
36}
37
38impl<E> std::fmt::Display for RpcError<E>
39where
40 E: std::fmt::Display,
41{
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 match self {
44 RpcError::Transport(error) => write!(f, "transport error: {error}"),
45 RpcError::Remote(error) => write!(f, "remote error: {error}"),
46 }
47 }
48}
49
50impl<E> std::error::Error for RpcError<E>
51where
52 E: std::error::Error,
53{
54 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
55 match self {
56 RpcError::Transport(error) => error.source(),
57 RpcError::Remote(error) => error.source(),
58 }
59 }
60}
61
62impl<E> From<std::io::Error> for RpcError<E> {
63 fn from(value: std::io::Error) -> Self {
64 Self::Transport(value)
65 }
66}
67
68#[derive(Default)]
69pub struct RpcMessageCodec(LengthDelimitedCodec);
70
71impl Encoder<RpcMessage> for RpcMessageCodec {
72 type Error = std::io::Error;
73
74 fn encode(
75 &mut self,
76 item: RpcMessage,
77 dst: &mut bytes::BytesMut,
78 ) -> std::result::Result<(), Self::Error> {
79 let encoded = bincode::serde::encode_to_vec(&item, bincode::config::standard())
80 .map_err(std::io::Error::other)?;
81 let encoded = Bytes::from(encoded);
82 self.0.encode(encoded, dst).map_err(std::io::Error::other)?;
83 Ok(())
84 }
85}
86
87impl Decoder for RpcMessageCodec {
88 type Item = RpcMessage;
89
90 type Error = std::io::Error;
91
92 fn decode(
93 &mut self,
94 src: &mut bytes::BytesMut,
95 ) -> std::result::Result<Option<Self::Item>, Self::Error> {
96 match self.0.decode(src) {
97 Ok(Some(frame)) => {
98 let (message, _) =
99 bincode::serde::decode_from_slice(&frame, bincode::config::standard())
100 .map_err(std::io::Error::other)?;
101 Ok(Some(message))
102 }
103 Ok(None) => Ok(None),
104 Err(err) => Err(std::io::Error::other(err)),
105 }
106 }
107}