summaryrefslogtreecommitdiff
path: root/system/gd/rust/stack/src/link/acl/fragment.rs
blob: 6e52cd50c5577efe62ed3497a7eb2894953012bc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//! Handles fragmentation & reassembly of ACL packets into whole L2CAP payloads

use bt_common::Bluetooth;
use bt_packets::hci::PacketBoundaryFlag::{
    ContinuingFragment, FirstAutomaticallyFlushable, FirstNonAutomaticallyFlushable,
};
use bt_packets::hci::{AclBuilder, AclChild, AclPacket, BroadcastFlag};
use bytes::{Buf, Bytes, BytesMut};
use futures::stream::{self, StreamExt};
use log::{error, info, warn};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tokio_stream::wrappers::ReceiverStream;

const L2CAP_BASIC_FRAME_HEADER_LEN: usize = 4;

pub struct Reassembler {
    buffer: Option<BytesMut>,
    remaining: usize,
    out: Sender<Bytes>,
}

impl Reassembler {
    /// Create a new reassembler
    pub fn new(out: Sender<Bytes>) -> Self {
        Self { buffer: None, remaining: 0, out }
    }

    /// Injest the packet and send out if fully reassembled
    pub async fn on_packet(&mut self, packet: AclPacket) {
        let payload = match packet.specialize() {
            AclChild::Payload(payload) => payload,
            AclChild::None => {
                info!("dropping ACL packet with empty payload");
                return;
            }
        };

        if let BroadcastFlag::ActivePeripheralBroadcast = packet.get_broadcast_flag() {
            // we do not accept broadcast packets
            return;
        }

        match packet.get_packet_boundary_flag() {
            FirstNonAutomaticallyFlushable => error!("not allowed to send FIRST_NON_AUTOMATICALLY_FLUSHABLE to host except loopback mode"),
            FirstAutomaticallyFlushable => {
                if self.buffer.take().is_some() {
                    error!("got a start packet without finishing previous reassembly - dropping previous");
                }

                let full_size = get_l2cap_pdu_size(&payload);
                self.remaining = full_size - (payload.len() - L2CAP_BASIC_FRAME_HEADER_LEN);
                if self.remaining > 0 {
                    let mut buffer = BytesMut::with_capacity(full_size);
                    buffer.extend_from_slice(&payload[..]);
                    self.buffer = Some(buffer);
                } else {
                    self.out.send(payload).await.unwrap();
                }
            },
            ContinuingFragment => {
                match self.buffer.take() {
                    None => warn!("got continuation packet without pending reassembly"),
                    Some(_) if self.remaining < payload.len() => warn!("remote sent unexpected L2CAP PDU - dropping entire packet"),
                    Some(mut buffer) => {
                        self.remaining -= payload.len();
                        buffer.extend_from_slice(&payload[..]);
                        if self.remaining == 0 {
                            self.out.send(buffer.freeze()).await.unwrap();
                        } else {
                            self.buffer = Some(buffer);
                        }
                    }
                }
            },
        }
    }
}

fn get_l2cap_pdu_size(first_packet: &Bytes) -> usize {
    if first_packet.len() <= L2CAP_BASIC_FRAME_HEADER_LEN {
        error!("invalid l2cap starting packet");

        0
    } else {
        (&first_packet[..]).get_u16_le() as usize
    }
}

pub fn fragmenting_stream(
    rx: ReceiverStream<Bytes>,
    mtu: usize,
    handle: u16,
    bt: Bluetooth,
    close_rx: oneshot::Receiver<()>,
) -> std::pin::Pin<
    std::boxed::Box<dyn futures::Stream<Item = bt_packets::hci::AclPacket> + std::marker::Send>,
> {
    rx.flat_map(move |data| {
        stream::iter(
            data.chunks(mtu)
                .enumerate()
                .map(move |(i, chunk)| {
                    AclBuilder {
                        handle,
                        packet_boundary_flag: match bt {
                            Bluetooth::Classic if i == 0 => FirstAutomaticallyFlushable,
                            Bluetooth::Le if i == 0 => FirstNonAutomaticallyFlushable,
                            _ => ContinuingFragment,
                        },
                        broadcast_flag: BroadcastFlag::PointToPoint,
                        payload: Some(Bytes::copy_from_slice(chunk)),
                    }
                    .build()
                })
                .collect::<Vec<AclPacket>>(),
        )
    })
    .take_until(close_rx)
    .boxed()
}