diff options
Diffstat (limited to 'scripts/update_payload/checker_unittest.py')
-rwxr-xr-x | scripts/update_payload/checker_unittest.py | 482 |
1 files changed, 176 insertions, 306 deletions
diff --git a/scripts/update_payload/checker_unittest.py b/scripts/update_payload/checker_unittest.py index 7e52233e..993b785c 100755 --- a/scripts/update_payload/checker_unittest.py +++ b/scripts/update_payload/checker_unittest.py @@ -1,4 +1,4 @@ -#!/usr/bin/python2 +#!/usr/bin/env python # # Copyright (C) 2013 The Android Open Source Project # @@ -17,35 +17,36 @@ """Unit testing checker.py.""" -from __future__ import print_function +# Disable check for function names to avoid errors based on old code +# pylint: disable-msg=invalid-name + +from __future__ import absolute_import import array import collections -import cStringIO import hashlib +import io import itertools import os import unittest -# pylint cannot find mox. -# pylint: disable=F0401 -import mox +from six.moves import zip + +import mock # pylint: disable=import-error from update_payload import checker from update_payload import common from update_payload import test_utils from update_payload import update_metadata_pb2 from update_payload.error import PayloadError -from update_payload.payload import Payload # Avoid name conflicts later. +from update_payload.payload import Payload # Avoid name conflicts later. def _OpTypeByName(op_name): - """Returns the type of an operation from itsname.""" + """Returns the type of an operation from its name.""" op_name_to_type = { 'REPLACE': common.OpType.REPLACE, 'REPLACE_BZ': common.OpType.REPLACE_BZ, - 'MOVE': common.OpType.MOVE, - 'BSDIFF': common.OpType.BSDIFF, 'SOURCE_COPY': common.OpType.SOURCE_COPY, 'SOURCE_BSDIFF': common.OpType.SOURCE_BSDIFF, 'ZERO': common.OpType.ZERO, @@ -65,7 +66,7 @@ def _GetPayloadChecker(payload_gen_write_to_file_func, payload_gen_dargs=None, if checker_init_dargs is None: checker_init_dargs = {} - payload_file = cStringIO.StringIO() + payload_file = io.BytesIO() payload_gen_write_to_file_func(payload_file, **payload_gen_dargs) payload_file.seek(0) payload = Payload(payload_file) @@ -75,7 +76,7 @@ def _GetPayloadChecker(payload_gen_write_to_file_func, payload_gen_dargs=None, def _GetPayloadCheckerWithData(payload_gen): """Returns a payload checker from a given payload generator.""" - payload_file = cStringIO.StringIO() + payload_file = io.BytesIO() payload_gen.WriteToFile(payload_file) payload_file.seek(0) payload = Payload(payload_file) @@ -89,7 +90,7 @@ def _GetPayloadCheckerWithData(payload_gen): # pylint: disable=W0212 # Don't bark about missing members of classes you cannot import. # pylint: disable=E1101 -class PayloadCheckerTest(mox.MoxTestBase): +class PayloadCheckerTest(unittest.TestCase): """Tests the PayloadChecker class. In addition to ordinary testFoo() methods, which are automatically invoked by @@ -102,11 +103,42 @@ class PayloadCheckerTest(mox.MoxTestBase): all such tests is done in AddAllParametricTests(). """ + def setUp(self): + """setUp function for unittest testcase""" + self.mock_checks = [] + + def tearDown(self): + """tearDown function for unittest testcase""" + # Verify that all mock functions were called. + for check in self.mock_checks: + check.mock_fn.assert_called_once_with(*check.exp_args, **check.exp_kwargs) + + class MockChecksAtTearDown(object): + """Mock data storage. + + This class stores the mock functions and its arguments to be checked at a + later point. + """ + def __init__(self, mock_fn, *args, **kwargs): + self.mock_fn = mock_fn + self.exp_args = args + self.exp_kwargs = kwargs + + def addPostCheckForMockFunction(self, mock_fn, *args, **kwargs): + """Store a mock function and its arguments to self.mock_checks + + Args: + mock_fn: mock function object + args: expected positional arguments for the mock_fn + kwargs: expected named arguments for the mock_fn + """ + self.mock_checks.append(self.MockChecksAtTearDown(mock_fn, *args, **kwargs)) + def MockPayload(self): """Create a mock payload object, complete with a mock manifest.""" - payload = self.mox.CreateMock(Payload) + payload = mock.create_autospec(Payload) payload.is_init = True - payload.manifest = self.mox.CreateMock( + payload.manifest = mock.create_autospec( update_metadata_pb2.DeltaArchiveManifest) return payload @@ -175,19 +207,20 @@ class PayloadCheckerTest(mox.MoxTestBase): subreport = 'fake subreport' # Create a mock message. - msg = self.mox.CreateMock(update_metadata_pb2._message.Message) - msg.HasField(name).AndReturn(is_present) + msg = mock.create_autospec(update_metadata_pb2._message.Message) + self.addPostCheckForMockFunction(msg.HasField, name) + msg.HasField.return_value = is_present setattr(msg, name, val) - # Create a mock report. - report = self.mox.CreateMock(checker._PayloadReport) + report = mock.create_autospec(checker._PayloadReport) if is_present: if is_submsg: - report.AddSubReport(name).AndReturn(subreport) + self.addPostCheckForMockFunction(report.AddSubReport, name) + report.AddSubReport.return_value = subreport else: - report.AddField(name, convert(val), linebreak=linebreak, indent=indent) + self.addPostCheckForMockFunction(report.AddField, name, convert(val), + linebreak=linebreak, indent=indent) - self.mox.ReplayAll() return (msg, report, subreport, name, val) def DoAddElemTest(self, is_present, is_mandatory, is_submsg, convert, @@ -213,9 +246,9 @@ class PayloadCheckerTest(mox.MoxTestBase): else: ret_val, ret_subreport = checker.PayloadChecker._CheckElem(*args, **kwargs) - self.assertEquals(val if is_present else None, ret_val) - self.assertEquals(subreport if is_present and is_submsg else None, - ret_subreport) + self.assertEqual(val if is_present else None, ret_val) + self.assertEqual(subreport if is_present and is_submsg else None, + ret_subreport) def DoAddFieldTest(self, is_mandatory, is_present, convert, linebreak, indent): @@ -245,7 +278,7 @@ class PayloadCheckerTest(mox.MoxTestBase): self.assertRaises(PayloadError, tested_func, *args, **kwargs) else: ret_val = tested_func(*args, **kwargs) - self.assertEquals(val if is_present else None, ret_val) + self.assertEqual(val if is_present else None, ret_val) def DoAddSubMsgTest(self, is_mandatory, is_present): """Parametrized testing of _Check{Mandatory,Optional}SubMsg(). @@ -269,8 +302,8 @@ class PayloadCheckerTest(mox.MoxTestBase): self.assertRaises(PayloadError, tested_func, *args) else: ret_val, ret_subreport = tested_func(*args) - self.assertEquals(val if is_present else None, ret_val) - self.assertEquals(subreport if is_present else None, ret_subreport) + self.assertEqual(val if is_present else None, ret_val) + self.assertEqual(subreport if is_present else None, ret_subreport) def testCheckPresentIff(self): """Tests _CheckPresentIff().""" @@ -296,15 +329,14 @@ class PayloadCheckerTest(mox.MoxTestBase): returned_signed_hash: The signed hash data retuned by openssl. expected_signed_hash: The signed hash data to compare against. """ - try: - # Stub out the subprocess invocation. - self.mox.StubOutWithMock(checker.PayloadChecker, '_Run') + # Stub out the subprocess invocation. + with mock.patch.object(checker.PayloadChecker, '_Run') \ + as mock_payload_checker: if expect_subprocess_call: - checker.PayloadChecker._Run( - mox.IsA(list), send_data=sig_data).AndReturn( - (sig_asn1_header + returned_signed_hash, None)) + mock_payload_checker([], send_data=sig_data) + mock_payload_checker.return_value = ( + sig_asn1_header + returned_signed_hash, None) - self.mox.ReplayAll() if expect_pass: self.assertIsNone(checker.PayloadChecker._CheckSha256Signature( sig_data, 'foo', expected_signed_hash, 'bar')) @@ -312,13 +344,11 @@ class PayloadCheckerTest(mox.MoxTestBase): self.assertRaises(PayloadError, checker.PayloadChecker._CheckSha256Signature, sig_data, 'foo', expected_signed_hash, 'bar') - finally: - self.mox.UnsetStubs() def testCheckSha256Signature_Pass(self): """Tests _CheckSha256Signature(); pass case.""" sig_data = 'fake-signature'.ljust(256) - signed_hash = hashlib.sha256('fake-data').digest() + signed_hash = hashlib.sha256(b'fake-data').digest() self.DoCheckSha256SignatureTest(True, True, sig_data, common.SIG_ASN1_HEADER, signed_hash, signed_hash) @@ -326,7 +356,7 @@ class PayloadCheckerTest(mox.MoxTestBase): def testCheckSha256Signature_FailBadSignature(self): """Tests _CheckSha256Signature(); fails due to malformed signature.""" sig_data = 'fake-signature' # Malformed (not 256 bytes in length). - signed_hash = hashlib.sha256('fake-data').digest() + signed_hash = hashlib.sha256(b'fake-data').digest() self.DoCheckSha256SignatureTest(False, False, sig_data, common.SIG_ASN1_HEADER, signed_hash, signed_hash) @@ -334,7 +364,7 @@ class PayloadCheckerTest(mox.MoxTestBase): def testCheckSha256Signature_FailBadOutputLength(self): """Tests _CheckSha256Signature(); fails due to unexpected output length.""" sig_data = 'fake-signature'.ljust(256) - signed_hash = 'fake-hash' # Malformed (not 32 bytes in length). + signed_hash = b'fake-hash' # Malformed (not 32 bytes in length). self.DoCheckSha256SignatureTest(False, True, sig_data, common.SIG_ASN1_HEADER, signed_hash, signed_hash) @@ -342,16 +372,16 @@ class PayloadCheckerTest(mox.MoxTestBase): def testCheckSha256Signature_FailBadAsnHeader(self): """Tests _CheckSha256Signature(); fails due to bad ASN1 header.""" sig_data = 'fake-signature'.ljust(256) - signed_hash = hashlib.sha256('fake-data').digest() - bad_asn1_header = 'bad-asn-header'.ljust(len(common.SIG_ASN1_HEADER)) + signed_hash = hashlib.sha256(b'fake-data').digest() + bad_asn1_header = b'bad-asn-header'.ljust(len(common.SIG_ASN1_HEADER)) self.DoCheckSha256SignatureTest(False, True, sig_data, bad_asn1_header, signed_hash, signed_hash) def testCheckSha256Signature_FailBadHash(self): """Tests _CheckSha256Signature(); fails due to bad hash returned.""" sig_data = 'fake-signature'.ljust(256) - expected_signed_hash = hashlib.sha256('fake-data').digest() - returned_signed_hash = hashlib.sha256('bad-fake-data').digest() + expected_signed_hash = hashlib.sha256(b'fake-data').digest() + returned_signed_hash = hashlib.sha256(b'bad-fake-data').digest() self.DoCheckSha256SignatureTest(False, True, sig_data, common.SIG_ASN1_HEADER, expected_signed_hash, returned_signed_hash) @@ -429,10 +459,10 @@ class PayloadCheckerTest(mox.MoxTestBase): payload_gen.SetBlockSize(test_utils.KiB(4)) # Add some operations. - payload_gen.AddOperation(False, common.OpType.MOVE, + payload_gen.AddOperation(common.ROOTFS, common.OpType.SOURCE_COPY, src_extents=[(0, 16), (16, 497)], dst_extents=[(16, 496), (0, 16)]) - payload_gen.AddOperation(True, common.OpType.MOVE, + payload_gen.AddOperation(common.KERNEL, common.OpType.SOURCE_COPY, src_extents=[(0, 8), (8, 8)], dst_extents=[(8, 8), (0, 8)]) @@ -457,21 +487,23 @@ class PayloadCheckerTest(mox.MoxTestBase): # Add old kernel/rootfs partition info, as required. if fail_mismatched_oki_ori or fail_old_kernel_fs_size or fail_bad_oki: oki_hash = (None if fail_bad_oki - else hashlib.sha256('fake-oki-content').digest()) - payload_gen.SetPartInfo(True, False, old_kernel_fs_size, oki_hash) + else hashlib.sha256(b'fake-oki-content').digest()) + payload_gen.SetPartInfo(common.KERNEL, False, old_kernel_fs_size, + oki_hash) if not fail_mismatched_oki_ori and (fail_old_rootfs_fs_size or fail_bad_ori): ori_hash = (None if fail_bad_ori - else hashlib.sha256('fake-ori-content').digest()) - payload_gen.SetPartInfo(False, False, old_rootfs_fs_size, ori_hash) + else hashlib.sha256(b'fake-ori-content').digest()) + payload_gen.SetPartInfo(common.ROOTFS, False, old_rootfs_fs_size, + ori_hash) # Add new kernel/rootfs partition info. payload_gen.SetPartInfo( - True, True, new_kernel_fs_size, - None if fail_bad_nki else hashlib.sha256('fake-nki-content').digest()) + common.KERNEL, True, new_kernel_fs_size, + None if fail_bad_nki else hashlib.sha256(b'fake-nki-content').digest()) payload_gen.SetPartInfo( - False, True, new_rootfs_fs_size, - None if fail_bad_nri else hashlib.sha256('fake-nri-content').digest()) + common.ROOTFS, True, new_rootfs_fs_size, + None if fail_bad_nri else hashlib.sha256(b'fake-nri-content').digest()) # Set the minor version. payload_gen.SetMinorVersion(0) @@ -518,28 +550,11 @@ class PayloadCheckerTest(mox.MoxTestBase): # Passes w/ all real extents. extents = self.NewExtentList((0, 4), (8, 3), (1024, 16)) - self.assertEquals( + self.assertEqual( 23, payload_checker._CheckExtents(extents, (1024 + 16) * block_size, collections.defaultdict(int), 'foo')) - # Passes w/ pseudo-extents (aka sparse holes). - extents = self.NewExtentList((0, 4), (common.PSEUDO_EXTENT_MARKER, 5), - (8, 3)) - self.assertEquals( - 12, - payload_checker._CheckExtents(extents, (1024 + 16) * block_size, - collections.defaultdict(int), 'foo', - allow_pseudo=True)) - - # Passes w/ pseudo-extent due to a signature. - extents = self.NewExtentList((common.PSEUDO_EXTENT_MARKER, 2)) - self.assertEquals( - 2, - payload_checker._CheckExtents(extents, (1024 + 16) * block_size, - collections.defaultdict(int), 'foo', - allow_signature=True)) - # Fails, extent missing a start block. extents = self.NewExtentList((-1, 4), (8, 3), (1024, 16)) self.assertRaises( @@ -570,34 +585,34 @@ class PayloadCheckerTest(mox.MoxTestBase): block_size = payload_checker.block_size data_length = 10000 - op = self.mox.CreateMock( - update_metadata_pb2.InstallOperation) + op = mock.create_autospec(update_metadata_pb2.InstallOperation) op.type = common.OpType.REPLACE # Pass. op.src_extents = [] self.assertIsNone( payload_checker._CheckReplaceOperation( - op, data_length, (data_length + block_size - 1) / block_size, + op, data_length, (data_length + block_size - 1) // block_size, 'foo')) # Fail, src extents founds. op.src_extents = ['bar'] self.assertRaises( PayloadError, payload_checker._CheckReplaceOperation, - op, data_length, (data_length + block_size - 1) / block_size, 'foo') + op, data_length, (data_length + block_size - 1) // block_size, 'foo') # Fail, missing data. op.src_extents = [] self.assertRaises( PayloadError, payload_checker._CheckReplaceOperation, - op, None, (data_length + block_size - 1) / block_size, 'foo') + op, None, (data_length + block_size - 1) // block_size, 'foo') # Fail, length / block number mismatch. op.src_extents = ['bar'] self.assertRaises( PayloadError, payload_checker._CheckReplaceOperation, - op, data_length, (data_length + block_size - 1) / block_size + 1, 'foo') + op, data_length, (data_length + block_size - 1) // block_size + 1, + 'foo') def testCheckReplaceBzOperation(self): """Tests _CheckReplaceOperation() where op.type == REPLACE_BZ.""" @@ -605,7 +620,7 @@ class PayloadCheckerTest(mox.MoxTestBase): block_size = payload_checker.block_size data_length = block_size * 3 - op = self.mox.CreateMock( + op = mock.create_autospec( update_metadata_pb2.InstallOperation) op.type = common.OpType.REPLACE_BZ @@ -613,25 +628,32 @@ class PayloadCheckerTest(mox.MoxTestBase): op.src_extents = [] self.assertIsNone( payload_checker._CheckReplaceOperation( - op, data_length, (data_length + block_size - 1) / block_size + 5, + op, data_length, (data_length + block_size - 1) // block_size + 5, 'foo')) # Fail, src extents founds. op.src_extents = ['bar'] self.assertRaises( PayloadError, payload_checker._CheckReplaceOperation, - op, data_length, (data_length + block_size - 1) / block_size + 5, 'foo') + op, data_length, (data_length + block_size - 1) // block_size + 5, + 'foo') # Fail, missing data. op.src_extents = [] self.assertRaises( PayloadError, payload_checker._CheckReplaceOperation, - op, None, (data_length + block_size - 1) / block_size, 'foo') + op, None, (data_length + block_size - 1) // block_size, 'foo') # Fail, too few blocks to justify BZ. op.src_extents = [] self.assertRaises( PayloadError, payload_checker._CheckReplaceOperation, + op, data_length, (data_length + block_size - 1) // block_size, 'foo') + + # Fail, total_dst_blocks is a floating point value. + op.src_extents = [] + self.assertRaises( + PayloadError, payload_checker._CheckReplaceOperation, op, data_length, (data_length + block_size - 1) / block_size, 'foo') def testCheckReplaceXzOperation(self): @@ -640,7 +662,7 @@ class PayloadCheckerTest(mox.MoxTestBase): block_size = payload_checker.block_size data_length = block_size * 3 - op = self.mox.CreateMock( + op = mock.create_autospec( update_metadata_pb2.InstallOperation) op.type = common.OpType.REPLACE_XZ @@ -648,152 +670,33 @@ class PayloadCheckerTest(mox.MoxTestBase): op.src_extents = [] self.assertIsNone( payload_checker._CheckReplaceOperation( - op, data_length, (data_length + block_size - 1) / block_size + 5, + op, data_length, (data_length + block_size - 1) // block_size + 5, 'foo')) # Fail, src extents founds. op.src_extents = ['bar'] self.assertRaises( PayloadError, payload_checker._CheckReplaceOperation, - op, data_length, (data_length + block_size - 1) / block_size + 5, 'foo') + op, data_length, (data_length + block_size - 1) // block_size + 5, + 'foo') # Fail, missing data. op.src_extents = [] self.assertRaises( PayloadError, payload_checker._CheckReplaceOperation, - op, None, (data_length + block_size - 1) / block_size, 'foo') + op, None, (data_length + block_size - 1) // block_size, 'foo') # Fail, too few blocks to justify XZ. op.src_extents = [] self.assertRaises( PayloadError, payload_checker._CheckReplaceOperation, - op, data_length, (data_length + block_size - 1) / block_size, 'foo') - - def testCheckMoveOperation_Pass(self): - """Tests _CheckMoveOperation(); pass case.""" - payload_checker = checker.PayloadChecker(self.MockPayload()) - op = update_metadata_pb2.InstallOperation() - op.type = common.OpType.MOVE - - self.AddToMessage(op.src_extents, - self.NewExtentList((1, 4), (12, 2), (1024, 128))) - self.AddToMessage(op.dst_extents, - self.NewExtentList((16, 128), (512, 6))) - self.assertIsNone( - payload_checker._CheckMoveOperation(op, None, 134, 134, 'foo')) - - def testCheckMoveOperation_FailContainsData(self): - """Tests _CheckMoveOperation(); fails, message contains data.""" - payload_checker = checker.PayloadChecker(self.MockPayload()) - op = update_metadata_pb2.InstallOperation() - op.type = common.OpType.MOVE - - self.AddToMessage(op.src_extents, - self.NewExtentList((1, 4), (12, 2), (1024, 128))) - self.AddToMessage(op.dst_extents, - self.NewExtentList((16, 128), (512, 6))) - self.assertRaises( - PayloadError, payload_checker._CheckMoveOperation, - op, 1024, 134, 134, 'foo') - - def testCheckMoveOperation_FailInsufficientSrcBlocks(self): - """Tests _CheckMoveOperation(); fails, not enough actual src blocks.""" - payload_checker = checker.PayloadChecker(self.MockPayload()) - op = update_metadata_pb2.InstallOperation() - op.type = common.OpType.MOVE + op, data_length, (data_length + block_size - 1) // block_size, 'foo') - self.AddToMessage(op.src_extents, - self.NewExtentList((1, 4), (12, 2), (1024, 127))) - self.AddToMessage(op.dst_extents, - self.NewExtentList((16, 128), (512, 6))) - self.assertRaises( - PayloadError, payload_checker._CheckMoveOperation, - op, None, 134, 134, 'foo') - - def testCheckMoveOperation_FailInsufficientDstBlocks(self): - """Tests _CheckMoveOperation(); fails, not enough actual dst blocks.""" - payload_checker = checker.PayloadChecker(self.MockPayload()) - op = update_metadata_pb2.InstallOperation() - op.type = common.OpType.MOVE - - self.AddToMessage(op.src_extents, - self.NewExtentList((1, 4), (12, 2), (1024, 128))) - self.AddToMessage(op.dst_extents, - self.NewExtentList((16, 128), (512, 5))) - self.assertRaises( - PayloadError, payload_checker._CheckMoveOperation, - op, None, 134, 134, 'foo') - - def testCheckMoveOperation_FailExcessSrcBlocks(self): - """Tests _CheckMoveOperation(); fails, too many actual src blocks.""" - payload_checker = checker.PayloadChecker(self.MockPayload()) - op = update_metadata_pb2.InstallOperation() - op.type = common.OpType.MOVE - - self.AddToMessage(op.src_extents, - self.NewExtentList((1, 4), (12, 2), (1024, 128))) - self.AddToMessage(op.dst_extents, - self.NewExtentList((16, 128), (512, 5))) - self.assertRaises( - PayloadError, payload_checker._CheckMoveOperation, - op, None, 134, 134, 'foo') - self.AddToMessage(op.src_extents, - self.NewExtentList((1, 4), (12, 2), (1024, 129))) - self.AddToMessage(op.dst_extents, - self.NewExtentList((16, 128), (512, 6))) - self.assertRaises( - PayloadError, payload_checker._CheckMoveOperation, - op, None, 134, 134, 'foo') - - def testCheckMoveOperation_FailExcessDstBlocks(self): - """Tests _CheckMoveOperation(); fails, too many actual dst blocks.""" - payload_checker = checker.PayloadChecker(self.MockPayload()) - op = update_metadata_pb2.InstallOperation() - op.type = common.OpType.MOVE - - self.AddToMessage(op.src_extents, - self.NewExtentList((1, 4), (12, 2), (1024, 128))) - self.AddToMessage(op.dst_extents, - self.NewExtentList((16, 128), (512, 7))) - self.assertRaises( - PayloadError, payload_checker._CheckMoveOperation, - op, None, 134, 134, 'foo') - - def testCheckMoveOperation_FailStagnantBlocks(self): - """Tests _CheckMoveOperation(); fails, there are blocks that do not move.""" - payload_checker = checker.PayloadChecker(self.MockPayload()) - op = update_metadata_pb2.InstallOperation() - op.type = common.OpType.MOVE - - self.AddToMessage(op.src_extents, - self.NewExtentList((1, 4), (12, 2), (1024, 128))) - self.AddToMessage(op.dst_extents, - self.NewExtentList((8, 128), (512, 6))) - self.assertRaises( - PayloadError, payload_checker._CheckMoveOperation, - op, None, 134, 134, 'foo') - - def testCheckMoveOperation_FailZeroStartBlock(self): - """Tests _CheckMoveOperation(); fails, has extent with start block 0.""" - payload_checker = checker.PayloadChecker(self.MockPayload()) - op = update_metadata_pb2.InstallOperation() - op.type = common.OpType.MOVE - - self.AddToMessage(op.src_extents, - self.NewExtentList((0, 4), (12, 2), (1024, 128))) - self.AddToMessage(op.dst_extents, - self.NewExtentList((8, 128), (512, 6))) - self.assertRaises( - PayloadError, payload_checker._CheckMoveOperation, - op, None, 134, 134, 'foo') - - self.AddToMessage(op.src_extents, - self.NewExtentList((1, 4), (12, 2), (1024, 128))) - self.AddToMessage(op.dst_extents, - self.NewExtentList((0, 128), (512, 6))) + # Fail, total_dst_blocks is a floating point value. + op.src_extents = [] self.assertRaises( - PayloadError, payload_checker._CheckMoveOperation, - op, None, 134, 134, 'foo') + PayloadError, payload_checker._CheckReplaceOperation, + op, data_length, (data_length + block_size - 1) / block_size, 'foo') def testCheckAnyDiff(self): """Tests _CheckAnyDiffOperation().""" @@ -832,8 +735,8 @@ class PayloadCheckerTest(mox.MoxTestBase): self.assertRaises(PayloadError, payload_checker._CheckSourceCopyOperation, None, 0, 1, 'foo') - def DoCheckOperationTest(self, op_type_name, is_last, allow_signature, - allow_unhashed, fail_src_extents, fail_dst_extents, + def DoCheckOperationTest(self, op_type_name, allow_unhashed, + fail_src_extents, fail_dst_extents, fail_mismatched_data_offset_length, fail_missing_dst_extents, fail_src_length, fail_dst_length, fail_data_hash, @@ -841,10 +744,8 @@ class PayloadCheckerTest(mox.MoxTestBase): """Parametric testing of _CheckOperation(). Args: - op_type_name: 'REPLACE', 'REPLACE_BZ', 'REPLACE_XZ', 'MOVE', 'BSDIFF', + op_type_name: 'REPLACE', 'REPLACE_BZ', 'REPLACE_XZ', 'SOURCE_COPY', 'SOURCE_BSDIFF', BROTLI_BSDIFF or 'PUFFDIFF'. - is_last: Whether we're testing the last operation in a sequence. - allow_signature: Whether we're testing a signature-capable operation. allow_unhashed: Whether we're allowing to not hash the data. fail_src_extents: Tamper with src extents. fail_dst_extents: Tamper with dst extents. @@ -869,9 +770,9 @@ class PayloadCheckerTest(mox.MoxTestBase): old_part_size = test_utils.MiB(4) new_part_size = test_utils.MiB(8) old_block_counters = array.array( - 'B', [0] * ((old_part_size + block_size - 1) / block_size)) + 'B', [0] * ((old_part_size + block_size - 1) // block_size)) new_block_counters = array.array( - 'B', [0] * ((new_part_size + block_size - 1) / block_size)) + 'B', [0] * ((new_part_size + block_size - 1) // block_size)) prev_data_offset = 1876 blob_hash_counts = collections.defaultdict(int) @@ -880,8 +781,7 @@ class PayloadCheckerTest(mox.MoxTestBase): op.type = op_type total_src_blocks = 0 - if op_type in (common.OpType.MOVE, common.OpType.BSDIFF, - common.OpType.SOURCE_COPY, common.OpType.SOURCE_BSDIFF, + if op_type in (common.OpType.SOURCE_COPY, common.OpType.SOURCE_BSDIFF, common.OpType.PUFFDIFF, common.OpType.BROTLI_BSDIFF): if fail_src_extents: self.AddToMessage(op.src_extents, @@ -891,12 +791,9 @@ class PayloadCheckerTest(mox.MoxTestBase): self.NewExtentList((1, 16))) total_src_blocks = 16 - # TODO(tbrindus): add major version 2 tests. - payload_checker.major_version = common.CHROMEOS_MAJOR_PAYLOAD_VERSION + payload_checker.major_version = common.BRILLO_MAJOR_PAYLOAD_VERSION if op_type in (common.OpType.REPLACE, common.OpType.REPLACE_BZ): payload_checker.minor_version = 0 - elif op_type in (common.OpType.MOVE, common.OpType.BSDIFF): - payload_checker.minor_version = 2 if fail_bad_minor_version else 1 elif op_type in (common.OpType.SOURCE_COPY, common.OpType.SOURCE_BSDIFF): payload_checker.minor_version = 1 if fail_bad_minor_version else 2 if op_type == common.OpType.REPLACE_XZ: @@ -907,7 +804,7 @@ class PayloadCheckerTest(mox.MoxTestBase): elif op_type == common.OpType.PUFFDIFF: payload_checker.minor_version = 4 if fail_bad_minor_version else 5 - if op_type not in (common.OpType.MOVE, common.OpType.SOURCE_COPY): + if op_type != common.OpType.SOURCE_COPY: if not fail_mismatched_data_offset_length: op.data_length = 16 * block_size - 8 if fail_prev_data_offset: @@ -916,20 +813,16 @@ class PayloadCheckerTest(mox.MoxTestBase): op.data_offset = prev_data_offset fake_data = 'fake-data'.ljust(op.data_length) - if not (allow_unhashed or (is_last and allow_signature and - op_type == common.OpType.REPLACE)): - if not fail_data_hash: - # Create a valid data blob hash. - op.data_sha256_hash = hashlib.sha256(fake_data).digest() - payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn( - fake_data) + if not allow_unhashed and not fail_data_hash: + # Create a valid data blob hash. + op.data_sha256_hash = hashlib.sha256(fake_data.encode('utf-8')).digest() + payload.ReadDataBlob.return_value = fake_data.encode('utf-8') elif fail_data_hash: # Create an invalid data blob hash. op.data_sha256_hash = hashlib.sha256( - fake_data.replace(' ', '-')).digest() - payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn( - fake_data) + fake_data.replace(' ', '-').encode('utf-8')).digest() + payload.ReadDataBlob.return_value = fake_data.encode('utf-8') total_dst_blocks = 0 if not fail_missing_dst_extents: @@ -944,8 +837,7 @@ class PayloadCheckerTest(mox.MoxTestBase): if total_src_blocks: if fail_src_length: op.src_length = total_src_blocks * block_size + 8 - elif (op_type in (common.OpType.MOVE, common.OpType.BSDIFF, - common.OpType.SOURCE_BSDIFF) and + elif (op_type == common.OpType.SOURCE_BSDIFF and payload_checker.minor_version <= 3): op.src_length = total_src_blocks * block_size elif fail_src_length: @@ -955,19 +847,17 @@ class PayloadCheckerTest(mox.MoxTestBase): if total_dst_blocks: if fail_dst_length: op.dst_length = total_dst_blocks * block_size + 8 - elif (op_type in (common.OpType.MOVE, common.OpType.BSDIFF, - common.OpType.SOURCE_BSDIFF) and + elif (op_type == common.OpType.SOURCE_BSDIFF and payload_checker.minor_version <= 3): op.dst_length = total_dst_blocks * block_size - self.mox.ReplayAll() should_fail = (fail_src_extents or fail_dst_extents or fail_mismatched_data_offset_length or fail_missing_dst_extents or fail_src_length or fail_dst_length or fail_data_hash or fail_prev_data_offset or fail_bad_minor_version) - args = (op, 'foo', is_last, old_block_counters, new_block_counters, - old_part_size, new_part_size, prev_data_offset, allow_signature, + args = (op, 'foo', old_block_counters, new_block_counters, + old_part_size, new_part_size, prev_data_offset, blob_hash_counts) if should_fail: self.assertRaises(PayloadError, payload_checker._CheckOperation, *args) @@ -1009,8 +899,9 @@ class PayloadCheckerTest(mox.MoxTestBase): if fail_nonexhaustive_full_update: rootfs_data_length -= block_size - payload_gen.AddOperation(False, rootfs_op_type, - dst_extents=[(0, rootfs_data_length / block_size)], + payload_gen.AddOperation(common.ROOTFS, rootfs_op_type, + dst_extents= + [(0, rootfs_data_length // block_size)], data_offset=0, data_length=rootfs_data_length) @@ -1020,17 +911,17 @@ class PayloadCheckerTest(mox.MoxTestBase): 'allow_unhashed': True}) payload_checker.payload_type = checker._TYPE_FULL report = checker._PayloadReport() - - args = (payload_checker.payload.manifest.install_operations, report, 'foo', - 0, rootfs_part_size, rootfs_part_size, rootfs_part_size, 0, False) + partition = next((p for p in payload_checker.payload.manifest.partitions + if p.partition_name == common.ROOTFS), None) + args = (partition.operations, report, 'foo', + 0, rootfs_part_size, rootfs_part_size, rootfs_part_size, 0) if fail_nonexhaustive_full_update: self.assertRaises(PayloadError, payload_checker._CheckOperations, *args) else: self.assertEqual(rootfs_data_length, payload_checker._CheckOperations(*args)) - def DoCheckSignaturesTest(self, fail_empty_sigs_blob, fail_missing_pseudo_op, - fail_mismatched_pseudo_op, fail_sig_missing_fields, + def DoCheckSignaturesTest(self, fail_empty_sigs_blob, fail_sig_missing_fields, fail_unknown_sig_version, fail_incorrect_sig): """Tests _CheckSignatures().""" # Generate a test payload. For this test, we only care about the signature @@ -1041,20 +932,18 @@ class PayloadCheckerTest(mox.MoxTestBase): payload_gen.SetBlockSize(block_size) rootfs_part_size = test_utils.MiB(2) kernel_part_size = test_utils.KiB(16) - payload_gen.SetPartInfo(False, True, rootfs_part_size, - hashlib.sha256('fake-new-rootfs-content').digest()) - payload_gen.SetPartInfo(True, True, kernel_part_size, - hashlib.sha256('fake-new-kernel-content').digest()) + payload_gen.SetPartInfo(common.ROOTFS, True, rootfs_part_size, + hashlib.sha256(b'fake-new-rootfs-content').digest()) + payload_gen.SetPartInfo(common.KERNEL, True, kernel_part_size, + hashlib.sha256(b'fake-new-kernel-content').digest()) payload_gen.SetMinorVersion(0) payload_gen.AddOperationWithData( - False, common.OpType.REPLACE, - dst_extents=[(0, rootfs_part_size / block_size)], + common.ROOTFS, common.OpType.REPLACE, + dst_extents=[(0, rootfs_part_size // block_size)], data_blob=os.urandom(rootfs_part_size)) - do_forge_pseudo_op = (fail_missing_pseudo_op or fail_mismatched_pseudo_op) - do_forge_sigs_data = (do_forge_pseudo_op or fail_empty_sigs_blob or - fail_sig_missing_fields or fail_unknown_sig_version - or fail_incorrect_sig) + do_forge_sigs_data = (fail_empty_sigs_blob or fail_sig_missing_fields or + fail_unknown_sig_version or fail_incorrect_sig) sigs_data = None if do_forge_sigs_data: @@ -1063,29 +952,19 @@ class PayloadCheckerTest(mox.MoxTestBase): if fail_sig_missing_fields: sig_data = None else: - sig_data = test_utils.SignSha256('fake-payload-content', + sig_data = test_utils.SignSha256(b'fake-payload-content', test_utils._PRIVKEY_FILE_NAME) sigs_gen.AddSig(5 if fail_unknown_sig_version else 1, sig_data) sigs_data = sigs_gen.ToBinary() payload_gen.SetSignatures(payload_gen.curr_offset, len(sigs_data)) - if do_forge_pseudo_op: - assert sigs_data is not None, 'should have forged signatures blob by now' - sigs_len = len(sigs_data) - payload_gen.AddOperation( - False, common.OpType.REPLACE, - data_offset=payload_gen.curr_offset / 2, - data_length=sigs_len / 2, - dst_extents=[(0, (sigs_len / 2 + block_size - 1) / block_size)]) - # Generate payload (complete w/ signature) and create the test object. payload_checker = _GetPayloadChecker( payload_gen.WriteToFileWithData, payload_gen_dargs={ 'sigs_data': sigs_data, - 'privkey_file_name': test_utils._PRIVKEY_FILE_NAME, - 'do_add_pseudo_operation': not do_forge_pseudo_op}) + 'privkey_file_name': test_utils._PRIVKEY_FILE_NAME}) payload_checker.payload_type = checker._TYPE_FULL report = checker._PayloadReport() @@ -1095,8 +974,7 @@ class PayloadCheckerTest(mox.MoxTestBase): common.KERNEL: kernel_part_size }) - should_fail = (fail_empty_sigs_blob or fail_missing_pseudo_op or - fail_mismatched_pseudo_op or fail_sig_missing_fields or + should_fail = (fail_empty_sigs_blob or fail_sig_missing_fields or fail_unknown_sig_version or fail_incorrect_sig) args = (report, test_utils._PUBKEY_FILE_NAME) if should_fail: @@ -1120,7 +998,6 @@ class PayloadCheckerTest(mox.MoxTestBase): should_succeed = ( (minor_version == 0 and payload_type == checker._TYPE_FULL) or - (minor_version == 1 and payload_type == checker._TYPE_DELTA) or (minor_version == 2 and payload_type == checker._TYPE_DELTA) or (minor_version == 3 and payload_type == checker._TYPE_DELTA) or (minor_version == 4 and payload_type == checker._TYPE_DELTA) or @@ -1150,10 +1027,10 @@ class PayloadCheckerTest(mox.MoxTestBase): payload_gen.SetBlockSize(block_size) kernel_filesystem_size = test_utils.KiB(16) rootfs_filesystem_size = test_utils.MiB(2) - payload_gen.SetPartInfo(False, True, rootfs_filesystem_size, - hashlib.sha256('fake-new-rootfs-content').digest()) - payload_gen.SetPartInfo(True, True, kernel_filesystem_size, - hashlib.sha256('fake-new-kernel-content').digest()) + payload_gen.SetPartInfo(common.ROOTFS, True, rootfs_filesystem_size, + hashlib.sha256(b'fake-new-rootfs-content').digest()) + payload_gen.SetPartInfo(common.KERNEL, True, kernel_filesystem_size, + hashlib.sha256(b'fake-new-kernel-content').digest()) payload_gen.SetMinorVersion(0) rootfs_part_size = 0 @@ -1163,8 +1040,8 @@ class PayloadCheckerTest(mox.MoxTestBase): if fail_rootfs_part_size_exceeded: rootfs_op_size += block_size payload_gen.AddOperationWithData( - False, common.OpType.REPLACE, - dst_extents=[(0, rootfs_op_size / block_size)], + common.ROOTFS, common.OpType.REPLACE, + dst_extents=[(0, rootfs_op_size // block_size)], data_blob=os.urandom(rootfs_op_size)) kernel_part_size = 0 @@ -1174,8 +1051,8 @@ class PayloadCheckerTest(mox.MoxTestBase): if fail_kernel_part_size_exceeded: kernel_op_size += block_size payload_gen.AddOperationWithData( - True, common.OpType.REPLACE, - dst_extents=[(0, kernel_op_size / block_size)], + common.KERNEL, common.OpType.REPLACE, + dst_extents=[(0, kernel_op_size // block_size)], data_blob=os.urandom(kernel_op_size)) # Generate payload (complete w/ signature) and create the test object. @@ -1186,16 +1063,14 @@ class PayloadCheckerTest(mox.MoxTestBase): else: use_block_size = block_size - # For the unittests 246 is the value that generated for the payload. - metadata_size = 246 + # For the unittests 237 is the value that generated for the payload. + metadata_size = 237 if fail_mismatched_metadata_size: metadata_size += 1 kwargs = { 'payload_gen_dargs': { 'privkey_file_name': test_utils._PRIVKEY_FILE_NAME, - 'do_add_pseudo_operation': True, - 'is_pseudo_in_kernel': True, 'padding': os.urandom(1024) if fail_excess_data else None}, 'checker_init_dargs': { 'assert_type': 'delta' if fail_wrong_payload_type else 'full', @@ -1207,7 +1082,7 @@ class PayloadCheckerTest(mox.MoxTestBase): payload_checker = _GetPayloadChecker(payload_gen.WriteToFileWithData, **kwargs) - kwargs = { + kwargs2 = { 'pubkey_file_name': test_utils._PUBKEY_FILE_NAME, 'metadata_size': metadata_size, 'part_sizes': { @@ -1219,15 +1094,15 @@ class PayloadCheckerTest(mox.MoxTestBase): fail_rootfs_part_size_exceeded or fail_kernel_part_size_exceeded) if should_fail: - self.assertRaises(PayloadError, payload_checker.Run, **kwargs) + self.assertRaises(PayloadError, payload_checker.Run, **kwargs2) else: - self.assertIsNone(payload_checker.Run(**kwargs)) + self.assertIsNone(payload_checker.Run(**kwargs2)) + # This implements a generic API, hence the occasional unused args. # pylint: disable=W0613 -def ValidateCheckOperationTest(op_type_name, is_last, allow_signature, - allow_unhashed, fail_src_extents, - fail_dst_extents, +def ValidateCheckOperationTest(op_type_name, allow_unhashed, + fail_src_extents, fail_dst_extents, fail_mismatched_data_offset_length, fail_missing_dst_extents, fail_src_length, fail_dst_length, fail_data_hash, @@ -1244,8 +1119,8 @@ def ValidateCheckOperationTest(op_type_name, is_last, allow_signature, fail_bad_minor_version)): return False - # MOVE and SOURCE_COPY operations don't carry data. - if (op_type in (common.OpType.MOVE, common.OpType.SOURCE_COPY) and ( + # SOURCE_COPY operation does not carry data. + if (op_type == common.OpType.SOURCE_COPY and ( fail_mismatched_data_offset_length or fail_data_hash or fail_prev_data_offset)): return False @@ -1274,14 +1149,14 @@ def AddParametricTests(tested_method_name, arg_space, validate_func=None): (values) associated with them. validate_func: A function used for validating test argument combinations. """ - for value_tuple in itertools.product(*arg_space.itervalues()): - run_dargs = dict(zip(arg_space.iterkeys(), value_tuple)) + for value_tuple in itertools.product(*iter(arg_space.values())): + run_dargs = dict(zip(iter(arg_space.keys()), value_tuple)) if validate_func and not validate_func(**run_dargs): continue run_method_name = 'Do%sTest' % tested_method_name test_method_name = 'test%s' % tested_method_name - for arg_key, arg_val in run_dargs.iteritems(): - if arg_val or type(arg_val) is int: + for arg_key, arg_val in run_dargs.items(): + if arg_val or isinstance(arg_val, int): test_method_name += '__%s=%s' % (arg_key, arg_val) setattr(PayloadCheckerTest, test_method_name, TestMethodBody(run_method_name, run_dargs)) @@ -1328,11 +1203,8 @@ def AddAllParametricTests(): # Add all _CheckOperation() test cases. AddParametricTests('CheckOperation', {'op_type_name': ('REPLACE', 'REPLACE_BZ', 'REPLACE_XZ', - 'MOVE', 'BSDIFF', 'SOURCE_COPY', - 'SOURCE_BSDIFF', 'PUFFDIFF', - 'BROTLI_BSDIFF'), - 'is_last': (True, False), - 'allow_signature': (True, False), + 'SOURCE_COPY', 'SOURCE_BSDIFF', + 'PUFFDIFF', 'BROTLI_BSDIFF'), 'allow_unhashed': (True, False), 'fail_src_extents': (True, False), 'fail_dst_extents': (True, False), @@ -1352,15 +1224,13 @@ def AddAllParametricTests(): # Add all _CheckOperations() test cases. AddParametricTests('CheckSignatures', {'fail_empty_sigs_blob': (True, False), - 'fail_missing_pseudo_op': (True, False), - 'fail_mismatched_pseudo_op': (True, False), 'fail_sig_missing_fields': (True, False), 'fail_unknown_sig_version': (True, False), 'fail_incorrect_sig': (True, False)}) # Add all _CheckManifestMinorVersion() test cases. AddParametricTests('CheckManifestMinorVersion', - {'minor_version': (None, 0, 1, 2, 3, 4, 5, 555), + {'minor_version': (None, 0, 2, 3, 4, 5, 555), 'payload_type': (checker._TYPE_FULL, checker._TYPE_DELTA)}) |