1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
//! Types used by the RPC protocol.
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RpcCall {
pub id: u64,
pub service: String,
pub method: String,
pub arguments: Bytes,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RpcCancel {
pub id: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RpcResponse {
pub id: u64,
pub value: Bytes,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RpcMessage {
Call(RpcCall),
Response(RpcResponse),
Cancel(RpcCancel),
}
#[derive(Debug)]
pub enum RpcError<E> {
Transport(std::io::Error),
Remote(E),
}
impl<E> std::fmt::Display for RpcError<E>
where
E: std::fmt::Display,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RpcError::Transport(error) => write!(f, "transport error: {error}"),
RpcError::Remote(error) => write!(f, "remote error: {error}"),
}
}
}
impl<E> std::error::Error for RpcError<E>
where
E: std::error::Error,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
RpcError::Transport(error) => error.source(),
RpcError::Remote(error) => error.source(),
}
}
}
impl<E> From<std::io::Error> for RpcError<E> {
fn from(value: std::io::Error) -> Self {
Self::Transport(value)
}
}
#[derive(Default)]
pub struct RpcMessageCodec(LengthDelimitedCodec);
impl Encoder<RpcMessage> for RpcMessageCodec {
type Error = std::io::Error;
fn encode(
&mut self,
item: RpcMessage,
dst: &mut bytes::BytesMut,
) -> std::result::Result<(), Self::Error> {
let encoded = bincode::serde::encode_to_vec(&item, bincode::config::standard())
.map_err(std::io::Error::other)?;
let encoded = Bytes::from(encoded);
self.0.encode(encoded, dst).map_err(std::io::Error::other)?;
Ok(())
}
}
impl Decoder for RpcMessageCodec {
type Item = RpcMessage;
type Error = std::io::Error;
fn decode(
&mut self,
src: &mut bytes::BytesMut,
) -> std::result::Result<Option<Self::Item>, Self::Error> {
match self.0.decode(src) {
Ok(Some(frame)) => {
let (message, _) =
bincode::serde::decode_from_slice(&frame, bincode::config::standard())
.map_err(std::io::Error::other)?;
Ok(Some(message))
}
Ok(None) => Ok(None),
Err(err) => Err(std::io::Error::other(err)),
}
}
}
|