blob: 2a28db4f4098aaab5bbe552a8636a28f76f3ab3e [file] [log] [blame]
Dan Albert8e0178d2015-01-27 15:53:15 -08001#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
Tao Baofc7e0e02018-02-13 13:54:02 -080016
Dan Albert8e0178d2015-01-27 15:53:15 -080017import os
Tao Bao17e4e612018-02-16 17:12:54 -080018import subprocess
Dan Albert8e0178d2015-01-27 15:53:15 -080019import tempfile
20import time
21import unittest
22import zipfile
Tao Bao31b08072017-11-08 15:50:59 -080023from hashlib import sha1
24
Dan Albert8e0178d2015-01-27 15:53:15 -080025import common
Tao Bao04e1f012018-02-04 12:13:35 -080026import test_utils
Tianjie Xu9c384d22017-06-20 17:00:55 -070027import validate_target_files
Tao Baofc7e0e02018-02-13 13:54:02 -080028from rangelib import RangeSet
Dan Albert8e0178d2015-01-27 15:53:15 -080029
Tao Bao04e1f012018-02-04 12:13:35 -080030
Tao Bao31b08072017-11-08 15:50:59 -080031KiB = 1024
32MiB = 1024 * KiB
33GiB = 1024 * MiB
Dan Albert8e0178d2015-01-27 15:53:15 -080034
Tao Bao1c830bf2017-12-25 10:43:47 -080035
Tao Baof3282b42015-04-01 11:21:55 -070036def get_2gb_string():
Tao Bao31b08072017-11-08 15:50:59 -080037 size = int(2 * GiB + 1)
38 block_size = 4 * KiB
39 step_size = 4 * MiB
40 # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
41 for _ in range(0, size, step_size):
42 yield os.urandom(block_size)
43 yield '\0' * (step_size - block_size)
Tao Baof3282b42015-04-01 11:21:55 -070044
Dan Albert8e0178d2015-01-27 15:53:15 -080045
46class CommonZipTest(unittest.TestCase):
Tao Bao31b08072017-11-08 15:50:59 -080047 def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
Tao Baof3282b42015-04-01 11:21:55 -070048 test_file_name=None, expected_stat=None, expected_mode=0o644,
49 expected_compress_type=zipfile.ZIP_STORED):
50 # Verify the stat if present.
51 if test_file_name is not None:
52 new_stat = os.stat(test_file_name)
53 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
54 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
55
56 # Reopen the zip file to verify.
57 zip_file = zipfile.ZipFile(zip_file_name, "r")
58
59 # Verify the timestamp.
60 info = zip_file.getinfo(arcname)
61 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
62
63 # Verify the file mode.
64 mode = (info.external_attr >> 16) & 0o777
65 self.assertEqual(mode, expected_mode)
66
67 # Verify the compress type.
68 self.assertEqual(info.compress_type, expected_compress_type)
69
70 # Verify the zip contents.
Tao Bao31b08072017-11-08 15:50:59 -080071 entry = zip_file.open(arcname)
72 sha1_hash = sha1()
73 for chunk in iter(lambda: entry.read(4 * MiB), ''):
74 sha1_hash.update(chunk)
75 self.assertEqual(expected_hash, sha1_hash.hexdigest())
Tao Baof3282b42015-04-01 11:21:55 -070076 self.assertIsNone(zip_file.testzip())
77
Dan Albert8e0178d2015-01-27 15:53:15 -080078 def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
79 extra_zipwrite_args = dict(extra_zipwrite_args or {})
80
81 test_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080082 test_file_name = test_file.name
Tao Baof3282b42015-04-01 11:21:55 -070083
84 zip_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080085 zip_file_name = zip_file.name
86
87 # File names within an archive strip the leading slash.
88 arcname = extra_zipwrite_args.get("arcname", test_file_name)
89 if arcname[0] == "/":
90 arcname = arcname[1:]
91
92 zip_file.close()
93 zip_file = zipfile.ZipFile(zip_file_name, "w")
94
95 try:
Tao Bao31b08072017-11-08 15:50:59 -080096 sha1_hash = sha1()
97 for data in contents:
98 sha1_hash.update(data)
99 test_file.write(data)
Dan Albert8e0178d2015-01-27 15:53:15 -0800100 test_file.close()
101
Tao Baof3282b42015-04-01 11:21:55 -0700102 expected_stat = os.stat(test_file_name)
Dan Albert8e0178d2015-01-27 15:53:15 -0800103 expected_mode = extra_zipwrite_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700104 expected_compress_type = extra_zipwrite_args.get("compress_type",
105 zipfile.ZIP_STORED)
Dan Albert8e0178d2015-01-27 15:53:15 -0800106 time.sleep(5) # Make sure the atime/mtime will change measurably.
107
108 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
Tao Baof3282b42015-04-01 11:21:55 -0700109 common.ZipClose(zip_file)
Dan Albert8e0178d2015-01-27 15:53:15 -0800110
Tao Bao31b08072017-11-08 15:50:59 -0800111 self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
112 test_file_name, expected_stat, expected_mode,
113 expected_compress_type)
Dan Albert8e0178d2015-01-27 15:53:15 -0800114 finally:
115 os.remove(test_file_name)
116 os.remove(zip_file_name)
117
Tao Baof3282b42015-04-01 11:21:55 -0700118 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
119 extra_args = dict(extra_args or {})
120
121 zip_file = tempfile.NamedTemporaryFile(delete=False)
122 zip_file_name = zip_file.name
123 zip_file.close()
124
125 zip_file = zipfile.ZipFile(zip_file_name, "w")
126
127 try:
128 expected_compress_type = extra_args.get("compress_type",
129 zipfile.ZIP_STORED)
130 time.sleep(5) # Make sure the atime/mtime will change measurably.
131
132 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
Tao Bao58c1b962015-05-20 09:32:18 -0700133 arcname = zinfo_or_arcname
134 expected_mode = extra_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700135 else:
Tao Bao58c1b962015-05-20 09:32:18 -0700136 arcname = zinfo_or_arcname.filename
137 expected_mode = extra_args.get("perms",
138 zinfo_or_arcname.external_attr >> 16)
Tao Baof3282b42015-04-01 11:21:55 -0700139
Tao Bao58c1b962015-05-20 09:32:18 -0700140 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
Tao Baof3282b42015-04-01 11:21:55 -0700141 common.ZipClose(zip_file)
142
Tao Bao31b08072017-11-08 15:50:59 -0800143 self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700144 expected_mode=expected_mode,
Tao Baof3282b42015-04-01 11:21:55 -0700145 expected_compress_type=expected_compress_type)
146 finally:
147 os.remove(zip_file_name)
148
149 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
150 extra_args = dict(extra_args or {})
151
152 zip_file = tempfile.NamedTemporaryFile(delete=False)
153 zip_file_name = zip_file.name
154
155 test_file = tempfile.NamedTemporaryFile(delete=False)
156 test_file_name = test_file.name
157
158 arcname_large = test_file_name
159 arcname_small = "bar"
160
161 # File names within an archive strip the leading slash.
162 if arcname_large[0] == "/":
163 arcname_large = arcname_large[1:]
164
165 zip_file.close()
166 zip_file = zipfile.ZipFile(zip_file_name, "w")
167
168 try:
Tao Bao31b08072017-11-08 15:50:59 -0800169 sha1_hash = sha1()
170 for data in large:
171 sha1_hash.update(data)
172 test_file.write(data)
Tao Baof3282b42015-04-01 11:21:55 -0700173 test_file.close()
174
175 expected_stat = os.stat(test_file_name)
176 expected_mode = 0o644
177 expected_compress_type = extra_args.get("compress_type",
178 zipfile.ZIP_STORED)
179 time.sleep(5) # Make sure the atime/mtime will change measurably.
180
181 common.ZipWrite(zip_file, test_file_name, **extra_args)
182 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
183 common.ZipClose(zip_file)
184
185 # Verify the contents written by ZipWrite().
Tao Bao31b08072017-11-08 15:50:59 -0800186 self._verify(zip_file, zip_file_name, arcname_large,
187 sha1_hash.hexdigest(), test_file_name, expected_stat,
188 expected_mode, expected_compress_type)
Tao Baof3282b42015-04-01 11:21:55 -0700189
190 # Verify the contents written by ZipWriteStr().
Tao Bao31b08072017-11-08 15:50:59 -0800191 self._verify(zip_file, zip_file_name, arcname_small,
192 sha1(small).hexdigest(),
Tao Baof3282b42015-04-01 11:21:55 -0700193 expected_compress_type=expected_compress_type)
194 finally:
195 os.remove(zip_file_name)
196 os.remove(test_file_name)
197
198 def _test_reset_ZIP64_LIMIT(self, func, *args):
199 default_limit = (1 << 31) - 1
200 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
201 func(*args)
202 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
203
Dan Albert8e0178d2015-01-27 15:53:15 -0800204 def test_ZipWrite(self):
205 file_contents = os.urandom(1024)
206 self._test_ZipWrite(file_contents)
207
208 def test_ZipWrite_with_opts(self):
209 file_contents = os.urandom(1024)
210 self._test_ZipWrite(file_contents, {
211 "arcname": "foobar",
212 "perms": 0o777,
213 "compress_type": zipfile.ZIP_DEFLATED,
214 })
Tao Baof3282b42015-04-01 11:21:55 -0700215 self._test_ZipWrite(file_contents, {
216 "arcname": "foobar",
217 "perms": 0o700,
218 "compress_type": zipfile.ZIP_STORED,
219 })
Dan Albert8e0178d2015-01-27 15:53:15 -0800220
221 def test_ZipWrite_large_file(self):
Tao Baof3282b42015-04-01 11:21:55 -0700222 file_contents = get_2gb_string()
Dan Albert8e0178d2015-01-27 15:53:15 -0800223 self._test_ZipWrite(file_contents, {
224 "compress_type": zipfile.ZIP_DEFLATED,
225 })
226
227 def test_ZipWrite_resets_ZIP64_LIMIT(self):
Tao Baof3282b42015-04-01 11:21:55 -0700228 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
229
230 def test_ZipWriteStr(self):
231 random_string = os.urandom(1024)
232 # Passing arcname
233 self._test_ZipWriteStr("foo", random_string)
234
235 # Passing zinfo
236 zinfo = zipfile.ZipInfo(filename="foo")
237 self._test_ZipWriteStr(zinfo, random_string)
238
239 # Timestamp in the zinfo should be overwritten.
240 zinfo.date_time = (2015, 3, 1, 15, 30, 0)
241 self._test_ZipWriteStr(zinfo, random_string)
242
243 def test_ZipWriteStr_with_opts(self):
244 random_string = os.urandom(1024)
245 # Passing arcname
246 self._test_ZipWriteStr("foo", random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700247 "perms": 0o700,
Tao Baof3282b42015-04-01 11:21:55 -0700248 "compress_type": zipfile.ZIP_DEFLATED,
249 })
Tao Bao58c1b962015-05-20 09:32:18 -0700250 self._test_ZipWriteStr("bar", random_string, {
Tao Baof3282b42015-04-01 11:21:55 -0700251 "compress_type": zipfile.ZIP_STORED,
252 })
253
254 # Passing zinfo
255 zinfo = zipfile.ZipInfo(filename="foo")
256 self._test_ZipWriteStr(zinfo, random_string, {
257 "compress_type": zipfile.ZIP_DEFLATED,
258 })
259 self._test_ZipWriteStr(zinfo, random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700260 "perms": 0o600,
Tao Baof3282b42015-04-01 11:21:55 -0700261 "compress_type": zipfile.ZIP_STORED,
262 })
263
264 def test_ZipWriteStr_large_file(self):
265 # zipfile.writestr() doesn't work when the str size is over 2GiB even with
266 # the workaround. We will only test the case of writing a string into a
267 # large archive.
268 long_string = get_2gb_string()
269 short_string = os.urandom(1024)
270 self._test_ZipWriteStr_large_file(long_string, short_string, {
271 "compress_type": zipfile.ZIP_DEFLATED,
272 })
273
274 def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
275 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
276 zinfo = zipfile.ZipInfo(filename="foo")
277 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
Tao Bao58c1b962015-05-20 09:32:18 -0700278
279 def test_bug21309935(self):
280 zip_file = tempfile.NamedTemporaryFile(delete=False)
281 zip_file_name = zip_file.name
282 zip_file.close()
283
284 try:
285 random_string = os.urandom(1024)
286 zip_file = zipfile.ZipFile(zip_file_name, "w")
287 # Default perms should be 0o644 when passing the filename.
288 common.ZipWriteStr(zip_file, "foo", random_string)
289 # Honor the specified perms.
290 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
291 # The perms in zinfo should be untouched.
292 zinfo = zipfile.ZipInfo(filename="baz")
293 zinfo.external_attr = 0o740 << 16
294 common.ZipWriteStr(zip_file, zinfo, random_string)
295 # Explicitly specified perms has the priority.
296 zinfo = zipfile.ZipInfo(filename="qux")
297 zinfo.external_attr = 0o700 << 16
298 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
299 common.ZipClose(zip_file)
300
Tao Bao31b08072017-11-08 15:50:59 -0800301 self._verify(zip_file, zip_file_name, "foo",
302 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700303 expected_mode=0o644)
Tao Bao31b08072017-11-08 15:50:59 -0800304 self._verify(zip_file, zip_file_name, "bar",
305 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700306 expected_mode=0o755)
Tao Bao31b08072017-11-08 15:50:59 -0800307 self._verify(zip_file, zip_file_name, "baz",
308 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700309 expected_mode=0o740)
Tao Bao31b08072017-11-08 15:50:59 -0800310 self._verify(zip_file, zip_file_name, "qux",
311 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700312 expected_mode=0o400)
313 finally:
314 os.remove(zip_file_name)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700315
Tao Bao89d7ab22017-12-14 17:05:33 -0800316 def test_ZipDelete(self):
317 zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
318 output_zip = zipfile.ZipFile(zip_file.name, 'w',
319 compression=zipfile.ZIP_DEFLATED)
320 with tempfile.NamedTemporaryFile() as entry_file:
321 entry_file.write(os.urandom(1024))
322 common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
323 common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
324 common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
325 common.ZipClose(output_zip)
326 zip_file.close()
327
328 try:
329 common.ZipDelete(zip_file.name, 'Test2')
330 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
331 entries = check_zip.namelist()
332 self.assertTrue('Test1' in entries)
333 self.assertFalse('Test2' in entries)
334 self.assertTrue('Test3' in entries)
335
336 self.assertRaises(AssertionError, common.ZipDelete, zip_file.name,
337 'Test2')
338 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
339 entries = check_zip.namelist()
340 self.assertTrue('Test1' in entries)
341 self.assertFalse('Test2' in entries)
342 self.assertTrue('Test3' in entries)
343
344 common.ZipDelete(zip_file.name, ['Test3'])
345 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
346 entries = check_zip.namelist()
347 self.assertTrue('Test1' in entries)
348 self.assertFalse('Test2' in entries)
349 self.assertFalse('Test3' in entries)
350
351 common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
352 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
353 entries = check_zip.namelist()
354 self.assertFalse('Test1' in entries)
355 self.assertFalse('Test2' in entries)
356 self.assertFalse('Test3' in entries)
357 finally:
358 os.remove(zip_file.name)
359
360
Tao Bao818ddf52018-01-05 11:17:34 -0800361class CommonApkUtilsTest(unittest.TestCase):
362 """Tests the APK utils related functions."""
363
364 APKCERTS_TXT1 = (
365 'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
366 ' private_key="certs/devkey.pk8"\n'
367 'name="Settings.apk"'
368 ' certificate="build/target/product/security/platform.x509.pem"'
369 ' private_key="build/target/product/security/platform.pk8"\n'
370 'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
371 )
372
373 APKCERTS_CERTMAP1 = {
374 'RecoveryLocalizer.apk' : 'certs/devkey',
375 'Settings.apk' : 'build/target/product/security/platform',
376 'TV.apk' : 'PRESIGNED',
377 }
378
379 APKCERTS_TXT2 = (
380 'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
381 ' private_key="certs/compressed1.pk8" compressed="gz"\n'
382 'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
383 ' private_key="certs/compressed2.pk8" compressed="gz"\n'
384 'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
385 ' private_key="certs/compressed2.pk8" compressed="gz"\n'
386 'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
387 ' private_key="certs/compressed3.pk8" compressed="gz"\n'
388 )
389
390 APKCERTS_CERTMAP2 = {
391 'Compressed1.apk' : 'certs/compressed1',
392 'Compressed2a.apk' : 'certs/compressed2',
393 'Compressed2b.apk' : 'certs/compressed2',
394 'Compressed3.apk' : 'certs/compressed3',
395 }
396
397 APKCERTS_TXT3 = (
398 'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
399 ' private_key="certs/compressed4.pk8" compressed="xz"\n'
400 )
401
402 APKCERTS_CERTMAP3 = {
403 'Compressed4.apk' : 'certs/compressed4',
404 }
405
Tao Bao17e4e612018-02-16 17:12:54 -0800406 def setUp(self):
407 self.testdata_dir = test_utils.get_testdata_dir()
408
Tao Bao818ddf52018-01-05 11:17:34 -0800409 def tearDown(self):
410 common.Cleanup()
411
412 @staticmethod
413 def _write_apkcerts_txt(apkcerts_txt, additional=None):
414 if additional is None:
415 additional = []
416 target_files = common.MakeTempFile(suffix='.zip')
417 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
418 target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
419 for entry in additional:
420 target_files_zip.writestr(entry, '')
421 return target_files
422
423 def test_ReadApkCerts_NoncompressedApks(self):
424 target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
425 with zipfile.ZipFile(target_files, 'r') as input_zip:
426 certmap, ext = common.ReadApkCerts(input_zip)
427
428 self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
429 self.assertIsNone(ext)
430
431 def test_ReadApkCerts_CompressedApks(self):
432 # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
433 # not stored in '.gz' format, so it shouldn't be considered as installed.
434 target_files = self._write_apkcerts_txt(
435 self.APKCERTS_TXT2,
436 ['Compressed1.apk.gz', 'Compressed3.apk'])
437
438 with zipfile.ZipFile(target_files, 'r') as input_zip:
439 certmap, ext = common.ReadApkCerts(input_zip)
440
441 self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
442 self.assertEqual('.gz', ext)
443
444 # Alternative case with '.xz'.
445 target_files = self._write_apkcerts_txt(
446 self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
447
448 with zipfile.ZipFile(target_files, 'r') as input_zip:
449 certmap, ext = common.ReadApkCerts(input_zip)
450
451 self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
452 self.assertEqual('.xz', ext)
453
454 def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
455 target_files = self._write_apkcerts_txt(
456 self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
457 ['Compressed1.apk.gz', 'Compressed3.apk'])
458
459 with zipfile.ZipFile(target_files, 'r') as input_zip:
460 certmap, ext = common.ReadApkCerts(input_zip)
461
462 certmap_merged = self.APKCERTS_CERTMAP1.copy()
463 certmap_merged.update(self.APKCERTS_CERTMAP2)
464 self.assertDictEqual(certmap_merged, certmap)
465 self.assertEqual('.gz', ext)
466
467 def test_ReadApkCerts_MultipleCompressionMethods(self):
468 target_files = self._write_apkcerts_txt(
469 self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
470 ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
471
472 with zipfile.ZipFile(target_files, 'r') as input_zip:
473 self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
474
475 def test_ReadApkCerts_MismatchingKeys(self):
476 malformed_apkcerts_txt = (
477 'name="App1.apk" certificate="certs/cert1.x509.pem"'
478 ' private_key="certs/cert2.pk8"\n'
479 )
480 target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
481
482 with zipfile.ZipFile(target_files, 'r') as input_zip:
483 self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
484
Tao Bao04e1f012018-02-04 12:13:35 -0800485 def test_ExtractPublicKey(self):
Tao Bao17e4e612018-02-16 17:12:54 -0800486 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
487 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
Tao Bao04e1f012018-02-04 12:13:35 -0800488 with open(pubkey, 'rb') as pubkey_fp:
489 self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
490
491 def test_ExtractPublicKey_invalidInput(self):
Tao Bao17e4e612018-02-16 17:12:54 -0800492 wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
Tao Bao04e1f012018-02-04 12:13:35 -0800493 self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
494
Tao Bao17e4e612018-02-16 17:12:54 -0800495 def test_ParseCertificate(self):
496 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
497
498 cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
499 proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
500 expected, _ = proc.communicate()
501 self.assertEqual(0, proc.returncode)
502
503 with open(cert) as cert_fp:
504 actual = common.ParseCertificate(cert_fp.read())
505 self.assertEqual(expected, actual)
506
Tao Baof47bf0f2018-03-21 23:28:51 -0700507 def test_GetMinSdkVersion(self):
508 test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
509 self.assertEqual('24', common.GetMinSdkVersion(test_app))
510
511 def test_GetMinSdkVersion_invalidInput(self):
512 self.assertRaises(
513 common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk')
514
515 def test_GetMinSdkVersionInt(self):
516 test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
517 self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {}))
518
519 def test_GetMinSdkVersionInt_invalidInput(self):
520 self.assertRaises(
521 common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk',
522 {})
523
Tao Bao818ddf52018-01-05 11:17:34 -0800524
Tao Baofc7e0e02018-02-13 13:54:02 -0800525class CommonUtilsTest(unittest.TestCase):
526
Tao Bao02a08592018-07-22 12:40:45 -0700527 def setUp(self):
528 self.testdata_dir = test_utils.get_testdata_dir()
529
Tao Baofc7e0e02018-02-13 13:54:02 -0800530 def tearDown(self):
531 common.Cleanup()
532
533 def test_GetSparseImage_emptyBlockMapFile(self):
534 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
535 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
536 target_files_zip.write(
537 test_utils.construct_sparse_image([
538 (0xCAC1, 6),
539 (0xCAC3, 3),
540 (0xCAC1, 4)]),
541 arcname='IMAGES/system.img')
542 target_files_zip.writestr('IMAGES/system.map', '')
543 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
544 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
545
Tao Baodba59ee2018-01-09 13:21:02 -0800546 tempdir = common.UnzipTemp(target_files)
547 with zipfile.ZipFile(target_files, 'r') as input_zip:
548 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800549
550 self.assertDictEqual(
551 {
552 '__COPY': RangeSet("0"),
553 '__NONZERO-0': RangeSet("1-5 9-12"),
554 },
555 sparse_image.file_map)
556
557 def test_GetSparseImage_invalidImageName(self):
558 self.assertRaises(
559 AssertionError, common.GetSparseImage, 'system2', None, None, False)
560 self.assertRaises(
561 AssertionError, common.GetSparseImage, 'unknown', None, None, False)
562
563 def test_GetSparseImage_missingBlockMapFile(self):
564 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
565 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
566 target_files_zip.write(
567 test_utils.construct_sparse_image([
568 (0xCAC1, 6),
569 (0xCAC3, 3),
570 (0xCAC1, 4)]),
571 arcname='IMAGES/system.img')
572 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
573 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
574
Tao Baodba59ee2018-01-09 13:21:02 -0800575 tempdir = common.UnzipTemp(target_files)
576 with zipfile.ZipFile(target_files, 'r') as input_zip:
577 self.assertRaises(
578 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
579 False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800580
581 def test_GetSparseImage_sharedBlocks_notAllowed(self):
582 """Tests the case of having overlapping blocks but disallowed."""
583 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
584 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
585 target_files_zip.write(
586 test_utils.construct_sparse_image([(0xCAC2, 16)]),
587 arcname='IMAGES/system.img')
588 # Block 10 is shared between two files.
589 target_files_zip.writestr(
590 'IMAGES/system.map',
591 '\n'.join([
592 '/system/file1 1-5 9-10',
593 '/system/file2 10-12']))
594 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
595 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
596
Tao Baodba59ee2018-01-09 13:21:02 -0800597 tempdir = common.UnzipTemp(target_files)
598 with zipfile.ZipFile(target_files, 'r') as input_zip:
599 self.assertRaises(
600 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
601 False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800602
603 def test_GetSparseImage_sharedBlocks_allowed(self):
604 """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
605 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
606 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
607 # Construct an image with a care_map of "0-5 9-12".
608 target_files_zip.write(
609 test_utils.construct_sparse_image([(0xCAC2, 16)]),
610 arcname='IMAGES/system.img')
611 # Block 10 is shared between two files.
612 target_files_zip.writestr(
613 'IMAGES/system.map',
614 '\n'.join([
615 '/system/file1 1-5 9-10',
616 '/system/file2 10-12']))
617 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
618 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
619
Tao Baodba59ee2018-01-09 13:21:02 -0800620 tempdir = common.UnzipTemp(target_files)
621 with zipfile.ZipFile(target_files, 'r') as input_zip:
622 sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
Tao Baofc7e0e02018-02-13 13:54:02 -0800623
624 self.assertDictEqual(
625 {
626 '__COPY': RangeSet("0"),
627 '__NONZERO-0': RangeSet("6-8 13-15"),
628 '/system/file1': RangeSet("1-5 9-10"),
629 '/system/file2': RangeSet("11-12"),
630 },
631 sparse_image.file_map)
632
633 # '/system/file2' should be marked with 'uses_shared_blocks', but not with
634 # 'incomplete'.
635 self.assertTrue(
636 sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
637 self.assertNotIn(
638 'incomplete', sparse_image.file_map['/system/file2'].extra)
639
640 # All other entries should look normal without any tags.
641 self.assertFalse(sparse_image.file_map['__COPY'].extra)
642 self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
643 self.assertFalse(sparse_image.file_map['/system/file1'].extra)
644
645 def test_GetSparseImage_incompleteRanges(self):
646 """Tests the case of ext4 images with holes."""
647 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
648 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
649 target_files_zip.write(
650 test_utils.construct_sparse_image([(0xCAC2, 16)]),
651 arcname='IMAGES/system.img')
652 target_files_zip.writestr(
653 'IMAGES/system.map',
654 '\n'.join([
655 '/system/file1 1-5 9-10',
656 '/system/file2 11-12']))
657 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
658 # '/system/file2' has less blocks listed (2) than actual (3).
659 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
660
Tao Baodba59ee2018-01-09 13:21:02 -0800661 tempdir = common.UnzipTemp(target_files)
662 with zipfile.ZipFile(target_files, 'r') as input_zip:
663 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800664
665 self.assertFalse(sparse_image.file_map['/system/file1'].extra)
666 self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
667
Tao Baod3554e62018-07-10 15:31:22 -0700668 def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self):
669 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
670 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
671 target_files_zip.write(
672 test_utils.construct_sparse_image([(0xCAC2, 16)]),
673 arcname='IMAGES/system.img')
674 target_files_zip.writestr(
675 'IMAGES/system.map',
676 '\n'.join([
677 '//system/file1 1-5 9-10',
678 '//system/file2 11-12',
679 '/system/app/file3 13-15']))
680 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
681 # '/system/file2' has less blocks listed (2) than actual (3).
682 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
683 # '/system/app/file3' has less blocks listed (3) than actual (4).
684 target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4))
685
686 tempdir = common.UnzipTemp(target_files)
687 with zipfile.ZipFile(target_files, 'r') as input_zip:
688 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
689
690 self.assertFalse(sparse_image.file_map['//system/file1'].extra)
691 self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete'])
692 self.assertTrue(
693 sparse_image.file_map['/system/app/file3'].extra['incomplete'])
694
695 def test_GetSparseImage_systemRootImage_nonSystemFiles(self):
696 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
697 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
698 target_files_zip.write(
699 test_utils.construct_sparse_image([(0xCAC2, 16)]),
700 arcname='IMAGES/system.img')
701 target_files_zip.writestr(
702 'IMAGES/system.map',
703 '\n'.join([
704 '//system/file1 1-5 9-10',
705 '//init.rc 13-15']))
706 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
707 # '/init.rc' has less blocks listed (3) than actual (4).
708 target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4))
709
710 tempdir = common.UnzipTemp(target_files)
711 with zipfile.ZipFile(target_files, 'r') as input_zip:
712 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
713
714 self.assertFalse(sparse_image.file_map['//system/file1'].extra)
715 self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete'])
716
717 def test_GetSparseImage_fileNotFound(self):
718 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
719 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
720 target_files_zip.write(
721 test_utils.construct_sparse_image([(0xCAC2, 16)]),
722 arcname='IMAGES/system.img')
723 target_files_zip.writestr(
724 'IMAGES/system.map',
725 '\n'.join([
726 '//system/file1 1-5 9-10',
727 '//system/file2 11-12']))
728 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
729
730 tempdir = common.UnzipTemp(target_files)
731 with zipfile.ZipFile(target_files, 'r') as input_zip:
732 self.assertRaises(
733 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
734 False)
735
Tao Bao02a08592018-07-22 12:40:45 -0700736 def test_GetAvbChainedPartitionArg(self):
737 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
738 info_dict = {
739 'avb_avbtool': 'avbtool',
740 'avb_system_key_path': pubkey,
741 'avb_system_rollback_index_location': 2,
742 }
743 args = common.GetAvbChainedPartitionArg('system', info_dict).split(':')
744 self.assertEqual(3, len(args))
745 self.assertEqual('system', args[0])
746 self.assertEqual('2', args[1])
747 self.assertTrue(os.path.exists(args[2]))
748
749 def test_GetAvbChainedPartitionArg_withPrivateKey(self):
750 key = os.path.join(self.testdata_dir, 'testkey.key')
751 info_dict = {
752 'avb_avbtool': 'avbtool',
753 'avb_product_key_path': key,
754 'avb_product_rollback_index_location': 2,
755 }
756 args = common.GetAvbChainedPartitionArg('product', info_dict).split(':')
757 self.assertEqual(3, len(args))
758 self.assertEqual('product', args[0])
759 self.assertEqual('2', args[1])
760 self.assertTrue(os.path.exists(args[2]))
761
762 def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
763 info_dict = {
764 'avb_avbtool': 'avbtool',
765 'avb_system_key_path': 'does-not-exist',
766 'avb_system_rollback_index_location': 2,
767 }
768 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
769 args = common.GetAvbChainedPartitionArg(
770 'system', info_dict, pubkey).split(':')
771 self.assertEqual(3, len(args))
772 self.assertEqual('system', args[0])
773 self.assertEqual('2', args[1])
774 self.assertTrue(os.path.exists(args[2]))
775
776 def test_GetAvbChainedPartitionArg_invalidKey(self):
777 pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem')
778 info_dict = {
779 'avb_avbtool': 'avbtool',
780 'avb_system_key_path': pubkey,
781 'avb_system_rollback_index_location': 2,
782 }
783 self.assertRaises(
784 AssertionError, common.GetAvbChainedPartitionArg, 'system', info_dict)
785
Tao Baofc7e0e02018-02-13 13:54:02 -0800786
Tianjie Xu9c384d22017-06-20 17:00:55 -0700787class InstallRecoveryScriptFormatTest(unittest.TestCase):
Tao Bao1c830bf2017-12-25 10:43:47 -0800788 """Checks the format of install-recovery.sh.
Tianjie Xu9c384d22017-06-20 17:00:55 -0700789
Tao Bao1c830bf2017-12-25 10:43:47 -0800790 Its format should match between common.py and validate_target_files.py.
791 """
Tianjie Xu9c384d22017-06-20 17:00:55 -0700792
793 def setUp(self):
Tao Bao1c830bf2017-12-25 10:43:47 -0800794 self._tempdir = common.MakeTempDir()
Tianjie Xu9c384d22017-06-20 17:00:55 -0700795 # Create a dummy dict that contains the fstab info for boot&recovery.
796 self._info = {"fstab" : {}}
Tao Bao1c830bf2017-12-25 10:43:47 -0800797 dummy_fstab = [
798 "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
799 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
Tao Bao31b08072017-11-08 15:50:59 -0800800 self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
Tianjie Xudf055582017-11-07 12:22:58 -0800801 # Construct the gzipped recovery.img and boot.img
802 self.recovery_data = bytearray([
803 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
804 0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
805 0x08, 0x00, 0x00, 0x00
806 ])
807 # echo -n "boot" | gzip -f | hd
808 self.boot_data = bytearray([
809 0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
810 0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
811 ])
Tianjie Xu9c384d22017-06-20 17:00:55 -0700812
813 def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
814 loc = os.path.join(self._tempdir, prefix, name)
815 if not os.path.exists(os.path.dirname(loc)):
816 os.makedirs(os.path.dirname(loc))
817 with open(loc, "w+") as f:
818 f.write(data)
819
820 def test_full_recovery(self):
Tao Bao31b08072017-11-08 15:50:59 -0800821 recovery_image = common.File("recovery.img", self.recovery_data)
822 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700823 self._info["full_recovery_image"] = "true"
824
825 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
826 recovery_image, boot_image, self._info)
827 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
828 self._info)
829
830 def test_recovery_from_boot(self):
Tao Bao31b08072017-11-08 15:50:59 -0800831 recovery_image = common.File("recovery.img", self.recovery_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700832 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
Tao Bao31b08072017-11-08 15:50:59 -0800833 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700834 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
835
836 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
837 recovery_image, boot_image, self._info)
838 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
839 self._info)
840 # Validate 'recovery-from-boot' with bonus argument.
841 self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
842 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
843 recovery_image, boot_image, self._info)
844 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
845 self._info)
846
847 def tearDown(self):
Tao Bao1c830bf2017-12-25 10:43:47 -0800848 common.Cleanup()