summaryrefslogtreecommitdiff
path: root/tools/rootcanal/scripts/simple_link_layer_socket.py
diff options
context:
space:
mode:
authorDavid Duarte <licorne@google.com>2022-03-07 10:08:48 +0000
committerDavid Duarte <licorne@google.com>2022-03-08 18:02:43 +0000
commit18c8bb01235ed88875247cd92e22fffc8c724494 (patch)
treee5cbfd81df768f6a4047f05f9edd61e9b1f62bbb /tools/rootcanal/scripts/simple_link_layer_socket.py
parentb8f22dd1663d8ff64503a27d3fe00f5871c24d86 (diff)
RootCanal: Move into tools/
Rename the namespace from test_vendor_lib to rootcanal Test: None Change-Id: I9cf72eef6c6100888faa0e6344ebd4c5b7ec716e
Diffstat (limited to 'tools/rootcanal/scripts/simple_link_layer_socket.py')
-rw-r--r--tools/rootcanal/scripts/simple_link_layer_socket.py196
1 files changed, 196 insertions, 0 deletions
diff --git a/tools/rootcanal/scripts/simple_link_layer_socket.py b/tools/rootcanal/scripts/simple_link_layer_socket.py
new file mode 100644
index 0000000000..3baabb7263
--- /dev/null
+++ b/tools/rootcanal/scripts/simple_link_layer_socket.py
@@ -0,0 +1,196 @@
+#
+# Copyright 2018 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Script for sending data to a port.
+
+This script provides a simple shell interface for sending data at run-time to a
+port.
+
+Usage:
+ 1. Choose a port to use. Use 'adb forward tcp:<port>
+ tcp:<port>' to forward the port to the device.
+ 2. In a separate shell, build and push the test vendor library to the device
+ using the script mentioned in option A (i.e. without the --test-channel flag
+ set).
+ 3. Once logcat has started, turn Bluetooth on from the device.
+ 4. Run this program, in the shell from step 1, the port, also from step 1,
+ as arguments.
+
+
+ length type source dest addr_type adv_type EIR Data
+
+ adv public connectable Name (TATA)
+send 18000000 07 010203040506 000000000000 00 00 050954415441
+ 020106
+
+ length adv source dest public non-conn Name (TETE)
+send 18000000 07 010203040507 000000000000 00 03 050954455445
+ 020106
+
+ length scan_rsp source dest public scan_rsp Name (TFTF)
+send 18000000 09 010203040506 4de24c67454b 00 04 050954465446
+ 020106
+
+"""
+
+#!/usr/bin/env python3
+
+import binascii
+import cmd
+import random
+import socket
+import string
+import struct
+import sys
+
+DEVICE_NAME_LENGTH = 6
+DEVICE_ADDRESS_LENGTH = 6
+
+
+# Used to generate fake device names and addresses during discovery.
+def generate_random_name():
+ return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
+ string.digits) for _ in range(DEVICE_NAME_LENGTH))
+
+
+def generate_random_address():
+ return ''.join(random.SystemRandom().choice(string.digits) for _ in \
+ range(DEVICE_ADDRESS_LENGTH))
+
+
+class Connection(object):
+ """Simple wrapper class for a socket object.
+
+ Attributes:
+ socket: The underlying socket created for the specified address and port.
+ """
+
+ def __init__(self, port):
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.connect(('localhost', port))
+ self._socket.setblocking(0)
+
+ def close(self):
+ self._socket.close()
+
+ def send(self, data):
+ self._socket.sendall(data)
+
+ def receive(self, size):
+ return self._socket.recv(size)
+
+
+class RawPort(object):
+ """Checks outgoing commands and sends them once verified.
+
+ Attributes:
+ connection: The connection to the HCI port.
+ """
+
+ def __init__(self, port):
+ self._connection = Connection(port)
+ self._closed = False
+
+ def close(self):
+ self._connection.close()
+ self._closed = True
+
+ def send_binary(self, args):
+ joined_args = ''.join(arg for arg in args)
+ print(joined_args)
+ packet = binascii.a2b_hex(joined_args)
+ if self._closed:
+ return
+ self._connection.send(packet)
+
+ def receive_response(self):
+ if self._closed:
+ return
+ size_chars = self._connection.receive(4)
+ if not size_chars:
+ print('Debug: No response')
+ return False
+ size_bytes = bytearray(size_chars)
+ response_size = 0
+ for i in range(0, len(size_chars) - 1):
+ response_size |= ord(size_chars[i]) << (8 * i)
+ response = self._connection.receive(response_size)
+ return response
+
+ def lint_command(self, name, args, name_size, args_size):
+ assert name_size == len(name) and args_size == len(args)
+ try:
+ name.encode('utf-8')
+ for arg in args:
+ arg.encode('utf-8')
+ except UnicodeError:
+ print('Unrecognized characters.')
+ raise
+ if name_size > 255 or args_size > 255:
+ raise ValueError # Size must be encodable in one octet.
+ for arg in args:
+ if len(arg) > 255:
+ raise ValueError # Size must be encodable in one octet.
+
+
+class RawPortShell(cmd.Cmd):
+ """Shell for sending binary data to a port."""
+
+ def __init__(self, raw_port):
+ cmd.Cmd.__init__(self)
+ self._raw_port = raw_port
+
+ def do_send(self, args):
+ """Arguments: dev_type_str Add a new device of type dev_type_str."""
+ self._raw_port.send_binary(args.split())
+
+ def do_quit(self, args):
+ """Arguments: None.
+
+ Exits.
+ """
+ self._raw_port.close()
+ print('Goodbye.')
+ return True
+
+ def do_help(self, args):
+ """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr."""
+ if (len(args) == 0):
+ cmd.Cmd.do_help(self, args)
+
+
+def main(argv):
+ if len(argv) != 2:
+ print('Usage: python raw_port.py [port]')
+ return
+ try:
+ port = int(argv[1])
+ except ValueError:
+ print('Error parsing port.')
+ else:
+ try:
+ raw_port = RawPort(port)
+ except (socket.error, e):
+ print('Error connecting to socket: %s' % e)
+ except:
+ print('Error creating (check arguments).')
+ else:
+ raw_port_shell = RawPortShell(raw_port)
+ raw_port_shell.prompt = '$ '
+ raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' + 'Type \'help\' for more information.')
+
+
+if __name__ == '__main__':
+ main(sys.argv)