diff options
author | David Duarte <licorne@google.com> | 2022-03-07 10:08:48 +0000 |
---|---|---|
committer | David Duarte <licorne@google.com> | 2022-03-08 18:02:43 +0000 |
commit | 18c8bb01235ed88875247cd92e22fffc8c724494 (patch) | |
tree | e5cbfd81df768f6a4047f05f9edd61e9b1f62bbb /tools/rootcanal/scripts/simple_link_layer_socket.py | |
parent | b8f22dd1663d8ff64503a27d3fe00f5871c24d86 (diff) |
RootCanal: Move into tools/
Rename the namespace from test_vendor_lib to rootcanal
Test: None
Change-Id: I9cf72eef6c6100888faa0e6344ebd4c5b7ec716e
Diffstat (limited to 'tools/rootcanal/scripts/simple_link_layer_socket.py')
-rw-r--r-- | tools/rootcanal/scripts/simple_link_layer_socket.py | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/tools/rootcanal/scripts/simple_link_layer_socket.py b/tools/rootcanal/scripts/simple_link_layer_socket.py new file mode 100644 index 0000000000..3baabb7263 --- /dev/null +++ b/tools/rootcanal/scripts/simple_link_layer_socket.py @@ -0,0 +1,196 @@ +# +# Copyright 2018 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Script for sending data to a port. + +This script provides a simple shell interface for sending data at run-time to a +port. + +Usage: + 1. Choose a port to use. Use 'adb forward tcp:<port> + tcp:<port>' to forward the port to the device. + 2. In a separate shell, build and push the test vendor library to the device + using the script mentioned in option A (i.e. without the --test-channel flag + set). + 3. Once logcat has started, turn Bluetooth on from the device. + 4. Run this program, in the shell from step 1, the port, also from step 1, + as arguments. + + + length type source dest addr_type adv_type EIR Data + + adv public connectable Name (TATA) +send 18000000 07 010203040506 000000000000 00 00 050954415441 + 020106 + + length adv source dest public non-conn Name (TETE) +send 18000000 07 010203040507 000000000000 00 03 050954455445 + 020106 + + length scan_rsp source dest public scan_rsp Name (TFTF) +send 18000000 09 010203040506 4de24c67454b 00 04 050954465446 + 020106 + +""" + +#!/usr/bin/env python3 + +import binascii +import cmd +import random +import socket +import string +import struct +import sys + +DEVICE_NAME_LENGTH = 6 +DEVICE_ADDRESS_LENGTH = 6 + + +# Used to generate fake device names and addresses during discovery. +def generate_random_name(): + return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ + string.digits) for _ in range(DEVICE_NAME_LENGTH)) + + +def generate_random_address(): + return ''.join(random.SystemRandom().choice(string.digits) for _ in \ + range(DEVICE_ADDRESS_LENGTH)) + + +class Connection(object): + """Simple wrapper class for a socket object. + + Attributes: + socket: The underlying socket created for the specified address and port. + """ + + def __init__(self, port): + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect(('localhost', port)) + self._socket.setblocking(0) + + def close(self): + self._socket.close() + + def send(self, data): + self._socket.sendall(data) + + def receive(self, size): + return self._socket.recv(size) + + +class RawPort(object): + """Checks outgoing commands and sends them once verified. + + Attributes: + connection: The connection to the HCI port. + """ + + def __init__(self, port): + self._connection = Connection(port) + self._closed = False + + def close(self): + self._connection.close() + self._closed = True + + def send_binary(self, args): + joined_args = ''.join(arg for arg in args) + print(joined_args) + packet = binascii.a2b_hex(joined_args) + if self._closed: + return + self._connection.send(packet) + + def receive_response(self): + if self._closed: + return + size_chars = self._connection.receive(4) + if not size_chars: + print('Debug: No response') + return False + size_bytes = bytearray(size_chars) + response_size = 0 + for i in range(0, len(size_chars) - 1): + response_size |= ord(size_chars[i]) << (8 * i) + response = self._connection.receive(response_size) + return response + + def lint_command(self, name, args, name_size, args_size): + assert name_size == len(name) and args_size == len(args) + try: + name.encode('utf-8') + for arg in args: + arg.encode('utf-8') + except UnicodeError: + print('Unrecognized characters.') + raise + if name_size > 255 or args_size > 255: + raise ValueError # Size must be encodable in one octet. + for arg in args: + if len(arg) > 255: + raise ValueError # Size must be encodable in one octet. + + +class RawPortShell(cmd.Cmd): + """Shell for sending binary data to a port.""" + + def __init__(self, raw_port): + cmd.Cmd.__init__(self) + self._raw_port = raw_port + + def do_send(self, args): + """Arguments: dev_type_str Add a new device of type dev_type_str.""" + self._raw_port.send_binary(args.split()) + + def do_quit(self, args): + """Arguments: None. + + Exits. + """ + self._raw_port.close() + print('Goodbye.') + return True + + def do_help(self, args): + """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.""" + if (len(args) == 0): + cmd.Cmd.do_help(self, args) + + +def main(argv): + if len(argv) != 2: + print('Usage: python raw_port.py [port]') + return + try: + port = int(argv[1]) + except ValueError: + print('Error parsing port.') + else: + try: + raw_port = RawPort(port) + except (socket.error, e): + print('Error connecting to socket: %s' % e) + except: + print('Error creating (check arguments).') + else: + raw_port_shell = RawPortShell(raw_port) + raw_port_shell.prompt = '$ ' + raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' + 'Type \'help\' for more information.') + + +if __name__ == '__main__': + main(sys.argv) |