Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ documentation = "https://docs.rs/packet-ipc/"
repository = "https://github.com/protectwise/packet-ipc"

[dependencies]
blocking = "0.5"
bincode = "1"
crossbeam-channel = "0.3"
ipc-channel = "0.14"
Expand All @@ -27,3 +28,4 @@ thiserror = "1"

[dev-dependencies]
env_logger = "0.7"
tokio-test = "0.2"
2 changes: 2 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub enum Error {
Bincode(#[from] bincode::Error),
#[error("Error receiving: {0:?}")]
Recv(#[from] crossbeam_channel::RecvError),
#[error("Mutex was poisoned")]
Mutex(),
}

unsafe impl Sync for Error {}
Expand Down
18 changes: 9 additions & 9 deletions src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ use serde::{Deserialize, Serialize};

pub trait AsIpcPacket {
fn timestamp(&self) -> &std::time::SystemTime;
fn data(&self) -> &[u8];
fn data(&self) -> Vec<u8>;
}

#[derive(Debug, Deserialize, Serialize)]
pub struct IpcPacket<'a> {
pub struct IpcPacket {
timestamp: std::time::SystemTime,
#[serde(with = "serde_bytes")]
data: &'a [u8],
data: Vec<u8>,
}

impl<'a, T: AsIpcPacket> From<&'a T> for IpcPacket<'a> {
fn from(v: &'a T) -> Self {
impl<T: AsIpcPacket> From<&T> for IpcPacket {
fn from(v: &T) -> Self {
IpcPacket {
timestamp: v.timestamp().clone(),
data: v.data(),
}
}
}

impl<'a> From<IpcPacket<'a>> for Packet {
fn from(v: IpcPacket<'a>) -> Self {
impl From<IpcPacket> for Packet {
fn from(v: IpcPacket) -> Self {
Packet {
ts: v.timestamp.clone(),
data: v.data.to_vec(),
Expand Down Expand Up @@ -50,8 +50,8 @@ impl AsIpcPacket for Packet {
fn timestamp(&self) -> &std::time::SystemTime {
&self.ts
}
fn data(&self) -> &[u8] {
self.data.as_ref()
fn data(&self) -> Vec<u8> {
self.data.clone()
}
}

Expand Down
47 changes: 31 additions & 16 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,22 @@ use crate::errors::Error;
use crate::packet::{AsIpcPacket, IpcPacket};
use ipc_channel::ipc::{IpcOneShotServer, IpcSender};
use log::*;
use std::sync::{Arc, Mutex};

pub type SenderMessage<'a> = Option<Vec<IpcPacket<'a>>>;
pub type Sender<'a> = IpcSender<SenderMessage<'a>>;
pub type SenderMessage = Option<Vec<IpcPacket>>;
pub type Sender = IpcSender<SenderMessage>;

pub struct Server<'a> {
server: IpcOneShotServer<Sender<'a>>,
pub struct Server {
server: IpcOneShotServer<Sender>,
name: String,
}

impl<'a> Server<'a> {
impl Server {
pub fn name(&self) -> &String {
&self.name
}

pub fn new() -> Result<Server<'a>, Error> {
pub fn new() -> Result<Server, Error> {
let (server, server_name) = IpcOneShotServer::new().map_err(Error::Io)?;

Ok(Server {
Expand All @@ -26,30 +27,44 @@ impl<'a> Server<'a> {
})
}

pub fn accept(self) -> Result<ConnectedIpc<'a>, Error> {
pub fn accept(self) -> Result<ConnectedIpc, Error> {
let (_, tx) = self.server.accept().map_err(Error::Bincode)?;

info!("Accepted connection from {:?}", tx);

let tx = Arc::new(Mutex::new(tx));

Ok(ConnectedIpc { connection: tx })
}
}

pub struct ConnectedIpc<'a> {
connection: Sender<'a>,

pub struct ConnectedIpc {
connection: Arc<Mutex<Sender>>,
}

impl<'a> ConnectedIpc<'a> {
pub fn send<T: AsIpcPacket>(&'a self, packets: &'a [T]) -> Result<(), Error> {
impl ConnectedIpc {
pub async fn send<T: AsIpcPacket>(&self, packets:Vec<T>) -> Result<(), Error> {
let ipc_packets: Vec<_> = packets.iter().map(IpcPacket::from).collect();
self.connection.send(Some(ipc_packets)).map_err(|e| {
error!("Failed to send {:?}", e);
Error::Bincode(e)
})
Self::internal_send(Arc::clone(&self.connection), ipc_packets).await
}

async fn internal_send(sender: Arc<Mutex<Sender>>, ipc_packets: Vec<IpcPacket>) -> Result<(), Error> {
blocking::Unblock::new(()).with_mut(move |_| {
let sender = Arc::clone(&sender);
let sender = sender.lock().map_err(|_| Error::Mutex())?;
sender.send(Some(ipc_packets)).map_err(|e| {
error!("Failed to send {:?}", e);
Error::Bincode(e)
})
}).await

}

pub fn close(&mut self) -> Result<(), Error> {
self.connection.send(None).map_err(Error::Bincode)?;
let connection = Arc::clone(&self.connection);
let connection = connection.lock().map_err(|_| Error::Mutex())?;
connection.send(None).map_err(Error::Bincode)?;
Ok(())
}
}
13 changes: 7 additions & 6 deletions tests/integration_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use packet_ipc::{AsIpcPacket, Client, Error, IpcPacket, Packet, Server};
use tokio_test::block_on;

#[test]
fn test_roundtrip() {
Expand Down Expand Up @@ -45,9 +46,9 @@ fn test_packet_receive() {
});

let mut server_tx = server.accept().expect("Failed to accept connection");

block_on(
server_tx
.send(&vec![Packet::new(std::time::SystemTime::now(), vec![3u8])])
.send(vec![Packet::new(std::time::SystemTime::now(), vec![3u8])]))
.expect("Failed to send");

server_tx.close().expect("Failed to close");
Expand Down Expand Up @@ -86,11 +87,11 @@ fn test_multiple_packet_receive() {

let mut server_tx = server.accept().expect("Failed to accept connection");

server_tx
.send(&vec![Packet::new(std::time::SystemTime::now(), vec![3u8])])
block_on(server_tx
.send(vec![Packet::new(std::time::SystemTime::now(), vec![3u8])]))
.expect("Failed to send");
server_tx
.send(&vec![Packet::new(std::time::SystemTime::now(), vec![4u8])])
block_on(server_tx
.send(vec![Packet::new(std::time::SystemTime::now(), vec![4u8])]))
.expect("Failed to send");

server_tx.close().expect("Failed to close");
Expand Down