From f319d7ab5278a3cfb43d38875d81c28cc2dce1e1 Mon Sep 17 00:00:00 2001 From: diogo464 Date: Wed, 16 Jul 2025 10:46:41 +0100 Subject: Initial commit - extracted urpc from monorepo --- src/protocol.rs | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 src/protocol.rs (limited to 'src/protocol.rs') diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 0000000..baf886b --- /dev/null +++ b/src/protocol.rs @@ -0,0 +1,107 @@ +//! Types used by the RPC protocol. +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RpcCall { + pub id: u64, + pub service: String, + pub method: String, + pub arguments: Bytes, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RpcCancel { + pub id: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RpcResponse { + pub id: u64, + pub value: Bytes, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum RpcMessage { + Call(RpcCall), + Response(RpcResponse), + Cancel(RpcCancel), +} + +#[derive(Debug)] +pub enum RpcError { + Transport(std::io::Error), + Remote(E), +} + +impl std::fmt::Display for RpcError +where + E: std::fmt::Display, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RpcError::Transport(error) => write!(f, "transport error: {error}"), + RpcError::Remote(error) => write!(f, "remote error: {error}"), + } + } +} + +impl std::error::Error for RpcError +where + E: std::error::Error, +{ + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + RpcError::Transport(error) => error.source(), + RpcError::Remote(error) => error.source(), + } + } +} + +impl From for RpcError { + fn from(value: std::io::Error) -> Self { + Self::Transport(value) + } +} + +#[derive(Default)] +pub struct RpcMessageCodec(LengthDelimitedCodec); + +impl Encoder for RpcMessageCodec { + type Error = std::io::Error; + + fn encode( + &mut self, + item: RpcMessage, + dst: &mut bytes::BytesMut, + ) -> std::result::Result<(), Self::Error> { + let encoded = bincode::serde::encode_to_vec(&item, bincode::config::standard()) + .map_err(std::io::Error::other)?; + let encoded = Bytes::from(encoded); + self.0.encode(encoded, dst).map_err(std::io::Error::other)?; + Ok(()) + } +} + +impl Decoder for RpcMessageCodec { + type Item = RpcMessage; + + type Error = std::io::Error; + + fn decode( + &mut self, + src: &mut bytes::BytesMut, + ) -> std::result::Result, Self::Error> { + match self.0.decode(src) { + Ok(Some(frame)) => { + let (message, _) = + bincode::serde::decode_from_slice(&frame, bincode::config::standard()) + .map_err(std::io::Error::other)?; + Ok(Some(message)) + } + Ok(None) => Ok(None), + Err(err) => Err(std::io::Error::other(err)), + } + } +} -- cgit