diff --git a/Cargo.toml b/Cargo.toml index 821c392..088a669 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ documentation = "https://docs.rs/packet-ipc/" repository = "https://github.com/protectwise/packet-ipc" [dependencies] +blocking = "0.5" bincode = "1" crossbeam-channel = "0.3" ipc-channel = "0.14" @@ -27,3 +28,4 @@ thiserror = "1" [dev-dependencies] env_logger = "0.7" +tokio-test = "0.2" diff --git a/src/errors.rs b/src/errors.rs index 6986cdf..9db5ce1 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -12,6 +12,8 @@ pub enum Error { Bincode(#[from] bincode::Error), #[error("Error receiving: {0:?}")] Recv(#[from] crossbeam_channel::RecvError), + #[error("Mutex was poisoned")] + Mutex(), } unsafe impl Sync for Error {} diff --git a/src/packet.rs b/src/packet.rs index 65dc35a..4dec6d6 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -2,18 +2,18 @@ use serde::{Deserialize, Serialize}; pub trait AsIpcPacket { fn timestamp(&self) -> &std::time::SystemTime; - fn data(&self) -> &[u8]; + fn data(&self) -> Vec; } #[derive(Debug, Deserialize, Serialize)] -pub struct IpcPacket<'a> { +pub struct IpcPacket { timestamp: std::time::SystemTime, #[serde(with = "serde_bytes")] - data: &'a [u8], + data: Vec, } -impl<'a, T: AsIpcPacket> From<&'a T> for IpcPacket<'a> { - fn from(v: &'a T) -> Self { +impl From<&T> for IpcPacket { + fn from(v: &T) -> Self { IpcPacket { timestamp: v.timestamp().clone(), data: v.data(), @@ -21,8 +21,8 @@ impl<'a, T: AsIpcPacket> From<&'a T> for IpcPacket<'a> { } } -impl<'a> From> for Packet { - fn from(v: IpcPacket<'a>) -> Self { +impl From for Packet { + fn from(v: IpcPacket) -> Self { Packet { ts: v.timestamp.clone(), data: v.data.to_vec(), @@ -50,8 +50,8 @@ impl AsIpcPacket for Packet { fn timestamp(&self) -> &std::time::SystemTime { &self.ts } - fn data(&self) -> &[u8] { - self.data.as_ref() + fn data(&self) -> Vec { + self.data.clone() } } diff --git a/src/server.rs b/src/server.rs index f33c9a4..e144dad 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,21 +3,22 @@ use crate::errors::Error; use crate::packet::{AsIpcPacket, IpcPacket}; use ipc_channel::ipc::{IpcOneShotServer, IpcSender}; use log::*; +use std::sync::{Arc, Mutex}; -pub type SenderMessage<'a> = Option>>; -pub type Sender<'a> = IpcSender>; +pub type SenderMessage = Option>; +pub type Sender = IpcSender; -pub struct Server<'a> { - server: IpcOneShotServer>, +pub struct Server { + server: IpcOneShotServer, name: String, } -impl<'a> Server<'a> { +impl Server { pub fn name(&self) -> &String { &self.name } - pub fn new() -> Result, Error> { + pub fn new() -> Result { let (server, server_name) = IpcOneShotServer::new().map_err(Error::Io)?; Ok(Server { @@ -26,30 +27,44 @@ impl<'a> Server<'a> { }) } - pub fn accept(self) -> Result, Error> { + pub fn accept(self) -> Result { let (_, tx) = self.server.accept().map_err(Error::Bincode)?; info!("Accepted connection from {:?}", tx); + let tx = Arc::new(Mutex::new(tx)); + Ok(ConnectedIpc { connection: tx }) } } -pub struct ConnectedIpc<'a> { - connection: Sender<'a>, + +pub struct ConnectedIpc { + connection: Arc>, } -impl<'a> ConnectedIpc<'a> { - pub fn send(&'a self, packets: &'a [T]) -> Result<(), Error> { +impl ConnectedIpc { + pub async fn send(&self, packets:Vec) -> Result<(), Error> { let ipc_packets: Vec<_> = packets.iter().map(IpcPacket::from).collect(); - self.connection.send(Some(ipc_packets)).map_err(|e| { - error!("Failed to send {:?}", e); - Error::Bincode(e) - }) + Self::internal_send(Arc::clone(&self.connection), ipc_packets).await + } + + async fn internal_send(sender: Arc>, ipc_packets: Vec) -> Result<(), Error> { + blocking::Unblock::new(()).with_mut(move |_| { + let sender = Arc::clone(&sender); + let sender = sender.lock().map_err(|_| Error::Mutex())?; + sender.send(Some(ipc_packets)).map_err(|e| { + error!("Failed to send {:?}", e); + Error::Bincode(e) + }) + }).await + } pub fn close(&mut self) -> Result<(), Error> { - self.connection.send(None).map_err(Error::Bincode)?; + let connection = Arc::clone(&self.connection); + let connection = connection.lock().map_err(|_| Error::Mutex())?; + connection.send(None).map_err(Error::Bincode)?; Ok(()) } } diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 4d60a54..ec03bbe 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -1,4 +1,5 @@ use packet_ipc::{AsIpcPacket, Client, Error, IpcPacket, Packet, Server}; +use tokio_test::block_on; #[test] fn test_roundtrip() { @@ -45,9 +46,9 @@ fn test_packet_receive() { }); let mut server_tx = server.accept().expect("Failed to accept connection"); - + block_on( server_tx - .send(&vec![Packet::new(std::time::SystemTime::now(), vec![3u8])]) + .send(vec![Packet::new(std::time::SystemTime::now(), vec![3u8])])) .expect("Failed to send"); server_tx.close().expect("Failed to close"); @@ -86,11 +87,11 @@ fn test_multiple_packet_receive() { let mut server_tx = server.accept().expect("Failed to accept connection"); - server_tx - .send(&vec![Packet::new(std::time::SystemTime::now(), vec![3u8])]) + block_on(server_tx + .send(vec![Packet::new(std::time::SystemTime::now(), vec![3u8])])) .expect("Failed to send"); - server_tx - .send(&vec![Packet::new(std::time::SystemTime::now(), vec![4u8])]) + block_on(server_tx + .send(vec![Packet::new(std::time::SystemTime::now(), vec![4u8])])) .expect("Failed to send"); server_tx.close().expect("Failed to close");