diff options
Diffstat (limited to 'startop/scripts/iorap/compile_handcrafted_file.py')
-rwxr-xr-x | startop/scripts/iorap/compile_handcrafted_file.py | 297 |
1 files changed, 297 insertions, 0 deletions
diff --git a/startop/scripts/iorap/compile_handcrafted_file.py b/startop/scripts/iorap/compile_handcrafted_file.py new file mode 100755 index 000000000000..6dbbeaf91571 --- /dev/null +++ b/startop/scripts/iorap/compile_handcrafted_file.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python3 +# +# Copyright 2019, The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import asyncio +import csv +import itertools +import os +import re +import struct +import sys +import tempfile +import time +import zipfile +from typing import Any, Callable, Dict, Generic, Iterable, List, NamedTuple, TextIO, Tuple, TypeVar, Optional, Union + +# Include generated protos. +dir_name = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(dir_name + "/generated") + +from TraceFile_pb2 import * + + +def parse_options(argv: List[str] = None): + """Parse command line arguments and return an argparse Namespace object.""" + parser = argparse.ArgumentParser(description="Compile a TraceFile.proto from a manual text file.") + # argparse considers args starting with - and -- optional in --help, even though required=True. + # by using a named argument group --help will clearly say that it's required instead of optional. + required_named = parser.add_argument_group('required named arguments') + + # optional arguments + # use a group here to get the required arguments to appear 'above' the optional arguments in help. + optional_named = parser.add_argument_group('optional named arguments') + optional_named.add_argument('-opb', '--output-proto-binary', dest='output_proto_binary', action='store', help='Write binary proto output to file.') + optional_named.add_argument('-pm', '--pinlist-meta', dest='pinlist_meta', action='store', help='Path to pinlist.meta (default=none) binary file.') + optional_named.add_argument('-pmp', '--pinlist-meta-parent', dest='pinlist_meta_parent', action='store', help='Device path that the pinlist.meta applies to (e.g. /data/.../somefile.apk)') + optional_named.add_argument('-i', '--input', dest='input', action='store', help='Input text file (default stdin).') + optional_named.add_argument('-zp', '--zip_path', dest='zip_path', action='append', help='Directory containing zip files.') + optional_named.add_argument('-d', '--debug', dest='debug', action='store_true', help='Add extra debugging output') + optional_named.add_argument('-ot', '--output-text', dest='output_text', action='store', help='Output text file (default stdout).') + + return parser.parse_args(argv) + +# TODO: refactor this with a common library file with analyze_metrics.py +def _debug_print(*args, **kwargs): + """Print the args to sys.stderr if the --debug/-d flag was passed in.""" + if _debug: + print(*args, **kwargs, file=sys.stderr) + +class BadInputError(Exception): + pass + +InputRecord = NamedTuple('InputRecord', [('filepath', str), ('offset', int), ('length', int), ('remark', str)]) + +def find_zip_in_paths(original_name, zip_paths): + # /foo/bar/bax.zip -> bax.zip + file_basename = os.path.split(original_name)[1] + + # the file must be located in one of the --zip-path arguments + matched = None + for zip_path in zip_paths: + for dir_entry in os.listdir(zip_path): + if dir_entry == file_basename: + matched = os.path.join(zip_path, dir_entry) + break + if matched: + break + + if not matched: + raise ValueError("%s could not be found in any of the --zip_path specified." %(file_basename)) + + _debug_print("found zip file ", file_basename, " in ", matched) + + if not zipfile.is_zipfile(matched): + raise ValueError("%s is not a zip file" %(matched)) + + return matched + +def handle_zip_entry(input_record, zip_paths): + + res = re.match("([^!]+)[!](.*)", input_record.filepath) + + if not res: + return input_record + + # 'foo!bar' + in_filepath = res[1] # -> 'foo' + in_zip_entry = res[2] # -> 'bar' + + matched = find_zip_in_paths(in_filepath, zip_paths) + + zip = zipfile.ZipFile(matched) + + try: + zip_info = zip.getinfo(in_zip_entry) + except KeyError: + raise ValueError("%s is not an item in the zip file %s" %(in_zip_entry, matched)) + + # TODO: do we also need to add header size to this? + in_offset = zip_info.header_offset + + # TODO: if a range is specified, use that instead. + in_length = zip_info.compress_size + + return InputRecord(in_filepath, in_offset, in_length, 'zip entry (%s)' %(in_zip_entry)) + +def parse_input_file(input: Iterable[str], zip_paths: List[str]) -> Iterable[InputRecord]: + for line in input: + line = line.strip() + + _debug_print("Line = ", line) + if not line: + _debug_print(" skip empty line", line) + continue + elif line[0] == "#": + _debug_print(" skip commented line", line) + continue + + res = re.match("([^\s]+)\s+(\d+)\s+(\d+)", line) + if not res: + raise BadInputError("Expected input of form: <str:filepath> <int:offset> <int:length>") + + in_filepath = res[1] + in_offset = int(res[2]) + in_length = int(res[3]) + + yield handle_zip_entry(InputRecord(in_filepath, in_offset, in_length, 'regular file'), zip_paths) + +# format: +# (<big_endian(i32):file_offset> <big_endian(i32):range_length>)+ +PIN_META_FORMAT = ">ii" +PIN_META_READ_SIZE = struct.calcsize(PIN_META_FORMAT) + +def parse_pin_meta(pin_meta_file, pinlist_meta_parent, zip_paths): + if not pin_meta_file: + return () + + global PIN_META_FORMAT + global PIN_META_READ_SIZE + + # '/data/app/com.google.android.GoogleCamera-aNQhzSznf4h_bvJ_MRbweQ==/base.apk' + # -> 'com.google.android.GoogleCamera' + package_name_match = re.match('/.*/(.*)-.*=/base.apk', pinlist_meta_parent) + + if not package_name_match: + raise ValueError("%s did not contain the <packagename>.apk" %(pinlist_meta_parent)) + + package_name = package_name_match[1] + # "com.google.android.GoogleCamera" -> "GoogleCamera.apk" + apk_name = package_name.split(".")[-1] + ".apk" + + path_to_zip_on_host = find_zip_in_paths(apk_name, zip_paths) + apk_file_size = os.path.getsize(path_to_zip_on_host) + _debug_print("APK path '%s' file size '%d'" %(path_to_zip_on_host, apk_file_size)) + + while True: + data = pin_meta_file.read(PIN_META_READ_SIZE) + + if not data: + break + + (pin_offset, pin_length) = struct.unpack(PIN_META_FORMAT, data) # (offset, length) + + remark = 'regular file (pinlist.meta)' + + remaining_size = apk_file_size - pin_offset + if remaining_size < 0: + print("WARNING: Clamp entry (%d, %d), offset too large (max file size = %d)" %(pin_offset, pin_length, apk_file_size)) + + pin_length = pin_length + remaining_size + pin_offset = pin_offset + remaining_size + + if pin_offset < 0: + pin_offset = 0 + + remark += '[clamped.offset]' + + pin_last_offset = pin_offset + pin_length + remaining_size = apk_file_size - pin_last_offset + + if remaining_size < 0: + print("WARNING: Clamp entry (%d, %d), length too large (max file size = %d)" %(pin_offset, pin_length, apk_file_size)) + pin_length = pin_length + remaining_size + + remark += '[clamped.length]' + + yield InputRecord(pinlist_meta_parent, pin_offset, pin_length, remark) + +def write_text_file_output(input_records: Iterable[InputRecord], output_text_file): + for rec in input_records: + output_text_file.write("%s %d %d #%s\n" %(rec.filepath, rec.offset, rec.length, rec.remark)) + +def build_trace_file(input_records: Iterable[InputRecord]) -> TraceFile: + trace_file = TraceFile() + trace_file_index = trace_file.index + + file_id_counter = 0 + file_id_map = {} # filename -> id + + stats_length_total = 0 + filename_stats = {} # filename -> total size + + for rec in input_records: + filename = rec.filepath + + file_id = file_id_map.get(filename) + if not file_id: + file_id = file_id_counter + file_id_map[filename] = file_id_counter + file_id_counter = file_id_counter + 1 + + file_index_entry = trace_file_index.entries.add() + file_index_entry.id = file_id + file_index_entry.file_name = filename + + # already in the file index, add the file entry. + file_entry = trace_file.list.entries.add() + file_entry.index_id = file_id + file_entry.file_length = rec.length + stats_length_total += file_entry.file_length + file_entry.file_offset = rec.offset + + filename_stats[filename] = filename_stats.get(filename, 0) + file_entry.file_length + + return trace_file + +def main(): + global _debug + + options= parse_options() + _debug = options.debug + _debug_print("parsed options: ", options) + + if not options.input: + input_file = sys.stdin + _debug_print("input = stdin") + else: + input_file = open(options.input) + _debug_print("input = (file)", options.input) + + if not options.output_proto_binary: + output_proto_file = None + else: + output_proto_file = open(options.output_proto_binary, 'wb') + _debug_print("output_proto_binary = ", output_proto_file) + + pinlist_meta_parent = options.pinlist_meta_parent + if options.pinlist_meta: + pin_meta_file = open(options.pinlist_meta, 'rb') + else: + pin_meta_file = None + + if (pinlist_meta_parent == None) != (pin_meta_file == None): + print("Options must be used together: --pinlist-meta and --pinlist-meta-path") + return 1 + + if not options.output_text: + output_text_file = sys.stdout + _debug_print("output = stdout") + else: + output_text_file = open(options.output_text, 'w') + _debug_print("output = (file)", options.output_text) + + zip_paths = options.zip_path or [] + + input_records = list(parse_pin_meta(pin_meta_file, pinlist_meta_parent, zip_paths)) + input_records = input_records + list(parse_input_file(input_file, zip_paths)) + + for p in input_records: + _debug_print(p) + + write_text_file_output(input_records, output_text_file) + output_text_file.close() + + out_proto = build_trace_file(input_records) + + if output_proto_file: + output_proto_file.write(out_proto.SerializeToString()) + output_proto_file.close() + + return 0 + +if __name__ == '__main__': + sys.exit(main()) |