diff options
Diffstat (limited to 'scripts/update_payload/checker_unittest.py')
-rwxr-xr-x | scripts/update_payload/checker_unittest.py | 295 |
1 files changed, 144 insertions, 151 deletions
diff --git a/scripts/update_payload/checker_unittest.py b/scripts/update_payload/checker_unittest.py index 4b23bd82..2c686371 100755 --- a/scripts/update_payload/checker_unittest.py +++ b/scripts/update_payload/checker_unittest.py @@ -14,13 +14,13 @@ import itertools import os import unittest -# Pylint cannot find mox. +# pylint cannot find mox. # pylint: disable=F0401 import mox import checker import common -import payload as update_payload # avoid name conflicts later. +import payload as update_payload # Avoid name conflicts later. import test_utils import update_metadata_pb2 @@ -61,11 +61,11 @@ def _GetPayloadCheckerWithData(payload_gen): return checker.PayloadChecker(payload) -# (i) this class doesn't need an __init__(); (ii) unit testing is all about -# running protected methods; (iii) don't bark about missing members of classes -# you cannot import. +# This class doesn't need an __init__(). # pylint: disable=W0232 +# Unit testing is all about running protected methods. # pylint: disable=W0212 +# Don't bark about missing members of classes you cannot import. # pylint: disable=E1101 class PayloadCheckerTest(mox.MoxTestBase): """Tests the PayloadChecker class. @@ -78,7 +78,6 @@ class PayloadCheckerTest(mox.MoxTestBase): individual invocation contexts for each, which are then bound as testBar__param1=val1__param2=val2(). The enumeration of parameter spaces for all such tests is done in AddAllParametricTests(). - """ def MockPayload(self): @@ -97,11 +96,10 @@ class PayloadCheckerTest(mox.MoxTestBase): its default state. Args: - start_block: the starting block of the extent - num_blocks: the number of blocks in the extent + start_block: The starting block of the extent. + num_blocks: The number of blocks in the extent. Returns: An Extent message. - """ ex = update_metadata_pb2.Extent() if start_block >= 0: @@ -115,10 +113,9 @@ class PayloadCheckerTest(mox.MoxTestBase): """Returns an list of extents. Args: - *args: (start_block, num_blocks) pairs defining the extents + *args: (start_block, num_blocks) pairs defining the extents. Returns: A list of Extent objects. - """ ex_list = [] for start_block, num_blocks in args: @@ -131,6 +128,8 @@ class PayloadCheckerTest(mox.MoxTestBase): new_field = repeated_field.add() new_field.CopyFrom(field_val) + # The production environment uses an older Python, this isn't an override. + # pylint: disable=W0221 def assertIsNone(self, val): """Asserts that val is None (TODO remove once we upgrade to Python 2.7). @@ -138,28 +137,26 @@ class PayloadCheckerTest(mox.MoxTestBase): non-None value. Args: - val: value/object to be equated to None - + val: Value/object to be equated to None. """ - self.assertEqual(val, None) + self.assertIs(None, val) def SetupAddElemTest(self, is_present, is_submsg, convert=str, linebreak=False, indent=0): """Setup for testing of _CheckElem() and its derivatives. Args: - is_present: whether or not the element is found in the message - is_submsg: whether the element is a sub-message itself - convert: a representation conversion function - linebreak: whether or not a linebreak is to be used in the report - indent: indentation used for the report + is_present: Whether or not the element is found in the message. + is_submsg: Whether the element is a sub-message itself. + convert: A representation conversion function. + linebreak: Whether or not a linebreak is to be used in the report. + indent: Indentation used for the report. Returns: - msg: a mock message object - report: a mock report object - subreport: a mock sub-report object - name: an element name to check - val: expected element value - + msg: A mock message object. + report: A mock report object. + subreport: A mock sub-report object. + name: An element name to check. + val: Expected element value. """ name = 'foo' val = 'fake submsg' if is_submsg else 'fake field' @@ -186,86 +183,83 @@ class PayloadCheckerTest(mox.MoxTestBase): """Parametric testing of _CheckElem(). Args: - is_present: whether or not the element is found in the message - is_mandatory: whether or not it's a mandatory element - is_submsg: whether the element is a sub-message itself - convert: a representation conversion function - linebreak: whether or not a linebreak is to be used in the report - indent: indentation used for the report - + is_present: Whether or not the element is found in the message. + is_mandatory: Whether or not it's a mandatory element. + is_submsg: Whether the element is a sub-message itself. + convert: A representation conversion function. + linebreak: Whether or not a linebreak is to be used in the report. + indent: Indentation used for the report. """ msg, report, subreport, name, val = self.SetupAddElemTest( is_present, is_submsg, convert, linebreak, indent) - largs = [msg, name, report, is_mandatory, is_submsg] - dargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent} + args = (msg, name, report, is_mandatory, is_submsg) + kwargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent} if is_mandatory and not is_present: self.assertRaises(update_payload.PayloadError, - checker.PayloadChecker._CheckElem, *largs, **dargs) + checker.PayloadChecker._CheckElem, *args, **kwargs) else: - ret_val, ret_subreport = checker.PayloadChecker._CheckElem(*largs, - **dargs) - self.assertEquals(ret_val, val if is_present else None) - self.assertEquals(ret_subreport, - subreport if is_present and is_submsg else None) + ret_val, ret_subreport = checker.PayloadChecker._CheckElem(*args, + **kwargs) + self.assertEquals(val if is_present else None, ret_val) + self.assertEquals(subreport if is_present and is_submsg else None, + ret_subreport) def DoAddFieldTest(self, is_mandatory, is_present, convert, linebreak, indent): """Parametric testing of _Check{Mandatory,Optional}Field(). Args: - is_mandatory: whether we're testing a mandatory call - is_present: whether or not the element is found in the message - convert: a representation conversion function - linebreak: whether or not a linebreak is to be used in the report - indent: indentation used for the report - + is_mandatory: Whether we're testing a mandatory call. + is_present: Whether or not the element is found in the message. + convert: A representation conversion function. + linebreak: Whether or not a linebreak is to be used in the report. + indent: Indentation used for the report. """ msg, report, _, name, val = self.SetupAddElemTest( is_present, False, convert, linebreak, indent) # Prepare for invocation of the tested method. - largs = [msg, name, report] - dargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent} + args = [msg, name, report] + kwargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent} if is_mandatory: - largs.append('bar') + args.append('bar') tested_func = checker.PayloadChecker._CheckMandatoryField else: tested_func = checker.PayloadChecker._CheckOptionalField # Test the method call. if is_mandatory and not is_present: - self.assertRaises(update_payload.PayloadError, tested_func, *largs, - **dargs) + self.assertRaises(update_payload.PayloadError, tested_func, *args, + **kwargs) else: - ret_val = tested_func(*largs, **dargs) - self.assertEquals(ret_val, val if is_present else None) + ret_val = tested_func(*args, **kwargs) + self.assertEquals(val if is_present else None, ret_val) def DoAddSubMsgTest(self, is_mandatory, is_present): """Parametrized testing of _Check{Mandatory,Optional}SubMsg(). Args: - is_mandatory: whether we're testing a mandatory call - is_present: whether or not the element is found in the message - + is_mandatory: Whether we're testing a mandatory call. + is_present: Whether or not the element is found in the message. """ msg, report, subreport, name, val = self.SetupAddElemTest(is_present, True) # Prepare for invocation of the tested method. - largs = [msg, name, report] + args = [msg, name, report] if is_mandatory: - largs.append('bar') + args.append('bar') tested_func = checker.PayloadChecker._CheckMandatorySubMsg else: tested_func = checker.PayloadChecker._CheckOptionalSubMsg # Test the method call. if is_mandatory and not is_present: - self.assertRaises(update_payload.PayloadError, tested_func, *largs) + self.assertRaises(update_payload.PayloadError, tested_func, *args) else: - ret_val, ret_subreport = tested_func(*largs) - self.assertEquals(ret_val, val if is_present else None) - self.assertEquals(ret_subreport, subreport if is_present else None) + ret_val, ret_subreport = tested_func(*args) + self.assertEquals(val if is_present else None, ret_val) + self.assertEquals(subreport if is_present else None, ret_subreport) def testCheckPresentIff(self): """Tests _CheckPresentIff().""" @@ -286,30 +280,31 @@ class PayloadCheckerTest(mox.MoxTestBase): """Parametric testing of _CheckSha256SignatureTest(). Args: - expect_pass: whether or not it should pass - expect_subprocess_call: whether to expect the openssl call to happen - sig_data: the signature raw data - sig_asn1_header: the ASN1 header - returned_signed_hash: the signed hash data retuned by openssl - expected_signed_hash: the signed hash data to compare against - + expect_pass: Whether or not it should pass. + expect_subprocess_call: Whether to expect the openssl call to happen. + sig_data: The signature raw data. + sig_asn1_header: The ASN1 header. + returned_signed_hash: The signed hash data retuned by openssl. + expected_signed_hash: The signed hash data to compare against. """ - # Stub out the subprocess invocation. - self.mox.StubOutWithMock(checker.PayloadChecker, '_Run') - if expect_subprocess_call: - checker.PayloadChecker._Run(mox.IsA(list), send_data=sig_data).AndReturn( - (sig_asn1_header + returned_signed_hash, None)) - - self.mox.ReplayAll() - if expect_pass: - self.assertIsNone(checker.PayloadChecker._CheckSha256Signature( - sig_data, 'foo', expected_signed_hash, 'bar')) - else: - self.assertRaises(update_payload.PayloadError, - checker.PayloadChecker._CheckSha256Signature, - sig_data, 'foo', expected_signed_hash, 'bar') - - self.mox.UnsetStubs() + try: + # Stub out the subprocess invocation. + self.mox.StubOutWithMock(checker.PayloadChecker, '_Run') + if expect_subprocess_call: + checker.PayloadChecker._Run( + mox.IsA(list), send_data=sig_data).AndReturn( + (sig_asn1_header + returned_signed_hash, None)) + + self.mox.ReplayAll() + if expect_pass: + self.assertIsNone(checker.PayloadChecker._CheckSha256Signature( + sig_data, 'foo', expected_signed_hash, 'bar')) + else: + self.assertRaises(update_payload.PayloadError, + checker.PayloadChecker._CheckSha256Signature, + sig_data, 'foo', expected_signed_hash, 'bar') + finally: + self.mox.UnsetStubs() def testCheckSha256Signature_Pass(self): """Tests _CheckSha256Signature(); pass case.""" @@ -321,7 +316,7 @@ class PayloadCheckerTest(mox.MoxTestBase): def testCheckSha256Signature_FailBadSignature(self): """Tests _CheckSha256Signature(); fails due to malformed signature.""" - sig_data = 'fake-signature' # malformed (not 256 bytes in length) + sig_data = 'fake-signature' # Malformed (not 256 bytes in length). signed_hash = hashlib.sha256('fake-data').digest() self.DoCheckSha256SignatureTest(False, False, sig_data, common.SIG_ASN1_HEADER, signed_hash, @@ -330,7 +325,7 @@ class PayloadCheckerTest(mox.MoxTestBase): def testCheckSha256Signature_FailBadOutputLength(self): """Tests _CheckSha256Signature(); fails due to unexpected output length.""" sig_data = 'fake-signature'.ljust(256) - signed_hash = 'fake-hash' # malformed (not 32 bytes in length) + signed_hash = 'fake-hash' # Malformed (not 32 bytes in length). self.DoCheckSha256SignatureTest(False, True, sig_data, common.SIG_ASN1_HEADER, signed_hash, signed_hash) @@ -401,19 +396,18 @@ class PayloadCheckerTest(mox.MoxTestBase): """Parametric testing of _CheckManifest(). Args: - fail_mismatched_block_size: simulate a missing block_size field - fail_bad_sigs: make signatures descriptor inconsistent - fail_mismatched_oki_ori: make old rootfs/kernel info partially present - fail_bad_oki: tamper with old kernel info - fail_bad_ori: tamper with old rootfs info - fail_bad_nki: tamper with new kernel info - fail_bad_nri: tamper with new rootfs info - fail_missing_ops: simulate a manifest without any operations - fail_old_kernel_fs_size: make old kernel fs size too big - fail_old_rootfs_fs_size: make old rootfs fs size too big - fail_new_kernel_fs_size: make new kernel fs size too big - fail_new_rootfs_fs_size: make new rootfs fs size too big - + fail_mismatched_block_size: Simulate a missing block_size field. + fail_bad_sigs: Make signatures descriptor inconsistent. + fail_mismatched_oki_ori: Make old rootfs/kernel info partially present. + fail_bad_oki: Tamper with old kernel info. + fail_bad_ori: Tamper with old rootfs info. + fail_bad_nki: Tamper with new kernel info. + fail_bad_nri: Tamper with new rootfs info. + fail_missing_ops: Simulate a manifest without any operations. + fail_old_kernel_fs_size: Make old kernel fs size too big. + fail_old_rootfs_fs_size: Make old rootfs fs size too big. + fail_new_kernel_fs_size: Make new kernel fs size too big. + fail_new_rootfs_fs_size: Make new rootfs fs size too big. """ # Generate a test payload. For this test, we only care about the manifest # and don't need any data blobs, hence we can use a plain paylaod generator @@ -515,26 +509,26 @@ class PayloadCheckerTest(mox.MoxTestBase): # Passes w/ all real extents. extents = self.NewExtentList((0, 4), (8, 3), (1024, 16)) self.assertEquals( + 23, payload_checker._CheckExtents(extents, (1024 + 16) * block_size, - collections.defaultdict(int), 'foo'), - 23) + collections.defaultdict(int), 'foo')) # Passes w/ pseudo-extents (aka sparse holes). extents = self.NewExtentList((0, 4), (common.PSEUDO_EXTENT_MARKER, 5), (8, 3)) self.assertEquals( + 12, payload_checker._CheckExtents(extents, (1024 + 16) * block_size, collections.defaultdict(int), 'foo', - allow_pseudo=True), - 12) + allow_pseudo=True)) # Passes w/ pseudo-extent due to a signature. extents = self.NewExtentList((common.PSEUDO_EXTENT_MARKER, 2)) self.assertEquals( + 2, payload_checker._CheckExtents(extents, (1024 + 16) * block_size, collections.defaultdict(int), 'foo', - allow_signature=True), - 2) + allow_signature=True)) # Fails, extent missing a start block. extents = self.NewExtentList((-1, 4), (8, 3), (1024, 16)) @@ -780,19 +774,19 @@ class PayloadCheckerTest(mox.MoxTestBase): """Parametric testing of _CheckOperation(). Args: - op_type_name: 'REPLACE', 'REPLACE_BZ', 'MOVE' or 'BSDIFF' - is_last: whether we're testing the last operation in a sequence - allow_signature: whether we're testing a signature-capable operation - allow_unhashed: whether we're allowing to not hash the data - fail_src_extents: tamper with src extents - fail_dst_extents: tamper with dst extents - fail_mismatched_data_offset_length: make data_{offset,length} inconsistent - fail_missing_dst_extents: do not include dst extents - fail_src_length: make src length inconsistent - fail_dst_length: make dst length inconsistent - fail_data_hash: tamper with the data blob hash - fail_prev_data_offset: make data space uses incontiguous - + op_type_name: 'REPLACE', 'REPLACE_BZ', 'MOVE' or 'BSDIFF'. + is_last: Whether we're testing the last operation in a sequence. + allow_signature: Whether we're testing a signature-capable operation. + allow_unhashed: Whether we're allowing to not hash the data. + fail_src_extents: Tamper with src extents. + fail_dst_extents: Tamper with dst extents. + fail_mismatched_data_offset_length: Make data_{offset,length} + inconsistent. + fail_missing_dst_extents: Do not include dst extents. + fail_src_length: Make src length inconsistent. + fail_dst_length: Make dst length inconsistent. + fail_data_hash: Tamper with the data blob hash. + fail_prev_data_offset: Make data space uses incontiguous. """ op_type = _OpTypeByName(op_type_name) @@ -879,15 +873,15 @@ class PayloadCheckerTest(mox.MoxTestBase): fail_mismatched_data_offset_length or fail_missing_dst_extents or fail_src_length or fail_dst_length or fail_data_hash or fail_prev_data_offset) - largs = [op, 'foo', is_last, old_block_counters, new_block_counters, - old_part_size, new_part_size, prev_data_offset, allow_signature, - blob_hash_counts] + args = (op, 'foo', is_last, old_block_counters, new_block_counters, + old_part_size, new_part_size, prev_data_offset, allow_signature, + blob_hash_counts) if should_fail: self.assertRaises(update_payload.PayloadError, - payload_checker._CheckOperation, *largs) + payload_checker._CheckOperation, *args) else: - self.assertEqual(payload_checker._CheckOperation(*largs), - op.data_length if op.HasField('data_length') else 0) + self.assertEqual(op.data_length if op.HasField('data_length') else 0, + payload_checker._CheckOperation(*args)) def testAllocBlockCounters(self): """Tests _CheckMoveOperation().""" @@ -896,14 +890,14 @@ class PayloadCheckerTest(mox.MoxTestBase): # Check allocation for block-aligned partition size, ensure it's integers. result = payload_checker._AllocBlockCounters(16 * block_size) - self.assertEqual(len(result), 16) - self.assertEqual(type(result[0]), int) + self.assertEqual(16, len(result)) + self.assertEqual(int, type(result[0])) # Check allocation of unaligned partition sizes. result = payload_checker._AllocBlockCounters(16 * block_size - 1) - self.assertEqual(len(result), 16) + self.assertEqual(16, len(result)) result = payload_checker._AllocBlockCounters(16 * block_size + 1) - self.assertEqual(len(result), 17) + self.assertEqual(17, len(result)) def DoCheckOperationsTest(self, fail_bad_type, fail_nonexhaustive_full_update): @@ -942,14 +936,14 @@ class PayloadCheckerTest(mox.MoxTestBase): report = checker._PayloadReport() should_fail = (fail_bad_type or fail_nonexhaustive_full_update) - largs = (payload_checker.payload.manifest.install_operations, report, - 'foo', 0, rootfs_part_size, rootfs_part_size, 0, False) + args = (payload_checker.payload.manifest.install_operations, report, + 'foo', 0, rootfs_part_size, rootfs_part_size, 0, False) if should_fail: self.assertRaises(update_payload.PayloadError, - payload_checker._CheckOperations, *largs) + payload_checker._CheckOperations, *args) else: - self.assertEqual(payload_checker._CheckOperations(*largs), - rootfs_data_length) + self.assertEqual(rootfs_data_length, + payload_checker._CheckOperations(*args)) def DoCheckSignaturesTest(self, fail_empty_sigs_blob, fail_missing_pseudo_op, fail_mismatched_pseudo_op, fail_sig_missing_fields, @@ -1015,12 +1009,12 @@ class PayloadCheckerTest(mox.MoxTestBase): should_fail = (fail_empty_sigs_blob or fail_missing_pseudo_op or fail_mismatched_pseudo_op or fail_sig_missing_fields or fail_unknown_sig_version or fail_incorrect_sig) - largs = (report, test_utils._PUBKEY_FILE_NAME) + args = (report, test_utils._PUBKEY_FILE_NAME) if should_fail: self.assertRaises(update_payload.PayloadError, - payload_checker._CheckSignatures, *largs) + payload_checker._CheckSignatures, *args) else: - self.assertIsNone(payload_checker._CheckSignatures(*largs)) + self.assertIsNone(payload_checker._CheckSignatures(*args)) def DoRunTest(self, fail_wrong_payload_type, fail_invalid_block_size, fail_mismatched_block_size, fail_excess_data): @@ -1050,13 +1044,13 @@ class PayloadCheckerTest(mox.MoxTestBase): # Generate payload (complete w/ signature) and create the test object. if fail_invalid_block_size: - use_block_size = block_size + 5 # not a power of two + use_block_size = block_size + 5 # Not a power of two. elif fail_mismatched_block_size: - use_block_size = block_size * 2 # different that payload stated + use_block_size = block_size * 2 # Different that payload stated. else: use_block_size = block_size - dargs = { + kwargs = { 'payload_gen_dargs': { 'privkey_file_name': test_utils._PRIVKEY_FILE_NAME, 'do_add_pseudo_operation': True, @@ -1067,18 +1061,18 @@ class PayloadCheckerTest(mox.MoxTestBase): 'block_size': use_block_size}} if fail_invalid_block_size: self.assertRaises(update_payload.PayloadError, _GetPayloadChecker, - payload_gen.WriteToFileWithData, **dargs) + payload_gen.WriteToFileWithData, **kwargs) else: payload_checker = _GetPayloadChecker(payload_gen.WriteToFileWithData, - **dargs) - dargs = {'pubkey_file_name': test_utils._PUBKEY_FILE_NAME} + **kwargs) + kwargs = {'pubkey_file_name': test_utils._PUBKEY_FILE_NAME} should_fail = (fail_wrong_payload_type or fail_mismatched_block_size or fail_excess_data) if should_fail: - self.assertRaises(update_payload.PayloadError, - payload_checker.Run, **dargs) + self.assertRaises(update_payload.PayloadError, payload_checker.Run, + **kwargs) else: - self.assertIsNone(payload_checker.Run(**dargs)) + self.assertIsNone(payload_checker.Run(**kwargs)) # This implements a generic API, hence the occasional unused args. @@ -1123,11 +1117,10 @@ def AddParametricTests(tested_method_name, arg_space, validate_func=None): usual setUp/tearDown mechanics. Args: - tested_method_name: name of the tested PayloadChecker method - arg_space: a dictionary containing variables (keys) and lists of values - (values) associated with them - validate_func: a function used for validating test argument combinations - + tested_method_name: Name of the tested PayloadChecker method. + arg_space: A dictionary containing variables (keys) and lists of values + (values) associated with them. + validate_func: A function used for validating test argument combinations. """ for value_tuple in itertools.product(*arg_space.itervalues()): run_dargs = dict(zip(arg_space.iterkeys(), value_tuple)) |