//! Types used by the RPC protocol. use bytes::Bytes; use serde::{Deserialize, Serialize}; use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RpcCall { pub id: u64, pub service: String, pub method: String, pub arguments: Bytes, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RpcCancel { pub id: u64, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RpcResponse { pub id: u64, pub value: Bytes, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum RpcMessage { Call(RpcCall), Response(RpcResponse), Cancel(RpcCancel), } #[derive(Debug)] pub enum RpcError { Transport(std::io::Error), Remote(E), } impl std::fmt::Display for RpcError where E: std::fmt::Display, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RpcError::Transport(error) => write!(f, "transport error: {error}"), RpcError::Remote(error) => write!(f, "remote error: {error}"), } } } impl std::error::Error for RpcError where E: std::error::Error, { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { RpcError::Transport(error) => error.source(), RpcError::Remote(error) => error.source(), } } } impl From for RpcError { fn from(value: std::io::Error) -> Self { Self::Transport(value) } } #[derive(Default)] pub struct RpcMessageCodec(LengthDelimitedCodec); impl Encoder for RpcMessageCodec { type Error = std::io::Error; fn encode( &mut self, item: RpcMessage, dst: &mut bytes::BytesMut, ) -> std::result::Result<(), Self::Error> { let encoded = bincode::serde::encode_to_vec(&item, bincode::config::standard()) .map_err(std::io::Error::other)?; let encoded = Bytes::from(encoded); self.0.encode(encoded, dst).map_err(std::io::Error::other)?; Ok(()) } } impl Decoder for RpcMessageCodec { type Item = RpcMessage; type Error = std::io::Error; fn decode( &mut self, src: &mut bytes::BytesMut, ) -> std::result::Result, Self::Error> { match self.0.decode(src) { Ok(Some(frame)) => { let (message, _) = bincode::serde::decode_from_slice(&frame, bincode::config::standard()) .map_err(std::io::Error::other)?; Ok(Some(message)) } Ok(None) => Ok(None), Err(err) => Err(std::io::Error::other(err)), } } }