summaryrefslogtreecommitdiff
path: root/system/gd/rust/facade/helpers/lib.rs
blob: f2b683842849f9328de143a1f517813a25aa5f7a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//! common facade & shim helpers

use bt_facade_proto::common::Data;
use bytes::Bytes;
use futures::sink::SinkExt;
use grpcio::*;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::Receiver;
use tokio::sync::Mutex;

/// Wrapper so we can invoke callbacks
pub trait U8SliceRunnable {
    /// Do the thing
    fn run(&self, data: &[u8]);
}

/// Helper for interfacing channels with shim or gRPC boundaries
#[derive(Clone)]
pub struct RxAdapter<T> {
    rx: Arc<Mutex<Receiver<T>>>,
    running: bool,
}

impl<T: 'static + Into<Vec<u8>> + Into<Bytes> + Send> RxAdapter<T> {
    /// New, from an unwrapped receiver
    pub fn new(rx: Receiver<T>) -> Self {
        Self::from_arc(Arc::new(Mutex::new(rx)))
    }

    /// New, from an already arc mutexed receiver
    pub fn from_arc(rx: Arc<Mutex<Receiver<T>>>) -> Self {
        Self { rx, running: false }
    }

    /// Stream out the channel over the provided sink
    pub fn stream_grpc(&mut self, ctx: RpcContext<'_>, mut sink: ServerStreamingSink<Data>) {
        assert!(!self.running);
        self.running = true;

        let clone_rx = self.rx.clone();
        ctx.spawn(async move {
            while let Some(payload) = clone_rx.lock().await.recv().await {
                let mut data = Data::default();
                data.set_payload(payload.into());
                if let Err(e) = sink.send((data, WriteFlags::default())).await {
                    log::error!("failure sending data: {:?}", e);
                }
            }
        });
    }

    /// Stream out the channel over the provided shim runnable
    pub fn stream_runnable<R: 'static + U8SliceRunnable + Send>(
        &mut self,
        rt: &Arc<Runtime>,
        runnable: R,
    ) {
        assert!(!self.running);
        self.running = true;

        let clone_rx = self.rx.clone();
        rt.spawn(async move {
            while let Some(payload) = clone_rx.lock().await.recv().await {
                let bytes: Bytes = payload.into();
                runnable.run(&bytes);
            }
        });
    }
}