blob: fb26b6660a3b8755e4c98a1b705723d0e1bc0425 [file] [log] [blame]
Dan Albert8e0178d2015-01-27 15:53:15 -08001#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
Tao Baofc7e0e02018-02-13 13:54:02 -080016
Dan Albert8e0178d2015-01-27 15:53:15 -080017import os
Tao Bao17e4e612018-02-16 17:12:54 -080018import subprocess
Dan Albert8e0178d2015-01-27 15:53:15 -080019import tempfile
20import time
21import unittest
22import zipfile
Tao Bao31b08072017-11-08 15:50:59 -080023from hashlib import sha1
24
Dan Albert8e0178d2015-01-27 15:53:15 -080025import common
Tao Bao04e1f012018-02-04 12:13:35 -080026import test_utils
Tianjie Xu9c384d22017-06-20 17:00:55 -070027import validate_target_files
Tao Baofc7e0e02018-02-13 13:54:02 -080028from rangelib import RangeSet
Dan Albert8e0178d2015-01-27 15:53:15 -080029
Tao Bao04e1f012018-02-04 12:13:35 -080030
Tao Bao31b08072017-11-08 15:50:59 -080031KiB = 1024
32MiB = 1024 * KiB
33GiB = 1024 * MiB
Dan Albert8e0178d2015-01-27 15:53:15 -080034
Tao Bao1c830bf2017-12-25 10:43:47 -080035
Tao Baof3282b42015-04-01 11:21:55 -070036def get_2gb_string():
Tao Bao31b08072017-11-08 15:50:59 -080037 size = int(2 * GiB + 1)
38 block_size = 4 * KiB
39 step_size = 4 * MiB
40 # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
41 for _ in range(0, size, step_size):
42 yield os.urandom(block_size)
43 yield '\0' * (step_size - block_size)
Tao Baof3282b42015-04-01 11:21:55 -070044
Dan Albert8e0178d2015-01-27 15:53:15 -080045
46class CommonZipTest(unittest.TestCase):
Tao Bao31b08072017-11-08 15:50:59 -080047 def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
Tao Baof3282b42015-04-01 11:21:55 -070048 test_file_name=None, expected_stat=None, expected_mode=0o644,
49 expected_compress_type=zipfile.ZIP_STORED):
50 # Verify the stat if present.
51 if test_file_name is not None:
52 new_stat = os.stat(test_file_name)
53 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
54 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
55
56 # Reopen the zip file to verify.
57 zip_file = zipfile.ZipFile(zip_file_name, "r")
58
59 # Verify the timestamp.
60 info = zip_file.getinfo(arcname)
61 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
62
63 # Verify the file mode.
64 mode = (info.external_attr >> 16) & 0o777
65 self.assertEqual(mode, expected_mode)
66
67 # Verify the compress type.
68 self.assertEqual(info.compress_type, expected_compress_type)
69
70 # Verify the zip contents.
Tao Bao31b08072017-11-08 15:50:59 -080071 entry = zip_file.open(arcname)
72 sha1_hash = sha1()
73 for chunk in iter(lambda: entry.read(4 * MiB), ''):
74 sha1_hash.update(chunk)
75 self.assertEqual(expected_hash, sha1_hash.hexdigest())
Tao Baof3282b42015-04-01 11:21:55 -070076 self.assertIsNone(zip_file.testzip())
77
Dan Albert8e0178d2015-01-27 15:53:15 -080078 def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
79 extra_zipwrite_args = dict(extra_zipwrite_args or {})
80
81 test_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080082 test_file_name = test_file.name
Tao Baof3282b42015-04-01 11:21:55 -070083
84 zip_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080085 zip_file_name = zip_file.name
86
87 # File names within an archive strip the leading slash.
88 arcname = extra_zipwrite_args.get("arcname", test_file_name)
89 if arcname[0] == "/":
90 arcname = arcname[1:]
91
92 zip_file.close()
93 zip_file = zipfile.ZipFile(zip_file_name, "w")
94
95 try:
Tao Bao31b08072017-11-08 15:50:59 -080096 sha1_hash = sha1()
97 for data in contents:
98 sha1_hash.update(data)
99 test_file.write(data)
Dan Albert8e0178d2015-01-27 15:53:15 -0800100 test_file.close()
101
Tao Baof3282b42015-04-01 11:21:55 -0700102 expected_stat = os.stat(test_file_name)
Dan Albert8e0178d2015-01-27 15:53:15 -0800103 expected_mode = extra_zipwrite_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700104 expected_compress_type = extra_zipwrite_args.get("compress_type",
105 zipfile.ZIP_STORED)
Dan Albert8e0178d2015-01-27 15:53:15 -0800106 time.sleep(5) # Make sure the atime/mtime will change measurably.
107
108 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
Tao Baof3282b42015-04-01 11:21:55 -0700109 common.ZipClose(zip_file)
Dan Albert8e0178d2015-01-27 15:53:15 -0800110
Tao Bao31b08072017-11-08 15:50:59 -0800111 self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
112 test_file_name, expected_stat, expected_mode,
113 expected_compress_type)
Dan Albert8e0178d2015-01-27 15:53:15 -0800114 finally:
115 os.remove(test_file_name)
116 os.remove(zip_file_name)
117
Tao Baof3282b42015-04-01 11:21:55 -0700118 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
119 extra_args = dict(extra_args or {})
120
121 zip_file = tempfile.NamedTemporaryFile(delete=False)
122 zip_file_name = zip_file.name
123 zip_file.close()
124
125 zip_file = zipfile.ZipFile(zip_file_name, "w")
126
127 try:
128 expected_compress_type = extra_args.get("compress_type",
129 zipfile.ZIP_STORED)
130 time.sleep(5) # Make sure the atime/mtime will change measurably.
131
132 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
Tao Bao58c1b962015-05-20 09:32:18 -0700133 arcname = zinfo_or_arcname
134 expected_mode = extra_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700135 else:
Tao Bao58c1b962015-05-20 09:32:18 -0700136 arcname = zinfo_or_arcname.filename
137 expected_mode = extra_args.get("perms",
138 zinfo_or_arcname.external_attr >> 16)
Tao Baof3282b42015-04-01 11:21:55 -0700139
Tao Bao58c1b962015-05-20 09:32:18 -0700140 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
Tao Baof3282b42015-04-01 11:21:55 -0700141 common.ZipClose(zip_file)
142
Tao Bao31b08072017-11-08 15:50:59 -0800143 self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700144 expected_mode=expected_mode,
Tao Baof3282b42015-04-01 11:21:55 -0700145 expected_compress_type=expected_compress_type)
146 finally:
147 os.remove(zip_file_name)
148
149 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
150 extra_args = dict(extra_args or {})
151
152 zip_file = tempfile.NamedTemporaryFile(delete=False)
153 zip_file_name = zip_file.name
154
155 test_file = tempfile.NamedTemporaryFile(delete=False)
156 test_file_name = test_file.name
157
158 arcname_large = test_file_name
159 arcname_small = "bar"
160
161 # File names within an archive strip the leading slash.
162 if arcname_large[0] == "/":
163 arcname_large = arcname_large[1:]
164
165 zip_file.close()
166 zip_file = zipfile.ZipFile(zip_file_name, "w")
167
168 try:
Tao Bao31b08072017-11-08 15:50:59 -0800169 sha1_hash = sha1()
170 for data in large:
171 sha1_hash.update(data)
172 test_file.write(data)
Tao Baof3282b42015-04-01 11:21:55 -0700173 test_file.close()
174
175 expected_stat = os.stat(test_file_name)
176 expected_mode = 0o644
177 expected_compress_type = extra_args.get("compress_type",
178 zipfile.ZIP_STORED)
179 time.sleep(5) # Make sure the atime/mtime will change measurably.
180
181 common.ZipWrite(zip_file, test_file_name, **extra_args)
182 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
183 common.ZipClose(zip_file)
184
185 # Verify the contents written by ZipWrite().
Tao Bao31b08072017-11-08 15:50:59 -0800186 self._verify(zip_file, zip_file_name, arcname_large,
187 sha1_hash.hexdigest(), test_file_name, expected_stat,
188 expected_mode, expected_compress_type)
Tao Baof3282b42015-04-01 11:21:55 -0700189
190 # Verify the contents written by ZipWriteStr().
Tao Bao31b08072017-11-08 15:50:59 -0800191 self._verify(zip_file, zip_file_name, arcname_small,
192 sha1(small).hexdigest(),
Tao Baof3282b42015-04-01 11:21:55 -0700193 expected_compress_type=expected_compress_type)
194 finally:
195 os.remove(zip_file_name)
196 os.remove(test_file_name)
197
198 def _test_reset_ZIP64_LIMIT(self, func, *args):
199 default_limit = (1 << 31) - 1
200 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
201 func(*args)
202 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
203
Dan Albert8e0178d2015-01-27 15:53:15 -0800204 def test_ZipWrite(self):
205 file_contents = os.urandom(1024)
206 self._test_ZipWrite(file_contents)
207
208 def test_ZipWrite_with_opts(self):
209 file_contents = os.urandom(1024)
210 self._test_ZipWrite(file_contents, {
211 "arcname": "foobar",
212 "perms": 0o777,
213 "compress_type": zipfile.ZIP_DEFLATED,
214 })
Tao Baof3282b42015-04-01 11:21:55 -0700215 self._test_ZipWrite(file_contents, {
216 "arcname": "foobar",
217 "perms": 0o700,
218 "compress_type": zipfile.ZIP_STORED,
219 })
Dan Albert8e0178d2015-01-27 15:53:15 -0800220
221 def test_ZipWrite_large_file(self):
Tao Baof3282b42015-04-01 11:21:55 -0700222 file_contents = get_2gb_string()
Dan Albert8e0178d2015-01-27 15:53:15 -0800223 self._test_ZipWrite(file_contents, {
224 "compress_type": zipfile.ZIP_DEFLATED,
225 })
226
227 def test_ZipWrite_resets_ZIP64_LIMIT(self):
Tao Baof3282b42015-04-01 11:21:55 -0700228 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
229
230 def test_ZipWriteStr(self):
231 random_string = os.urandom(1024)
232 # Passing arcname
233 self._test_ZipWriteStr("foo", random_string)
234
235 # Passing zinfo
236 zinfo = zipfile.ZipInfo(filename="foo")
237 self._test_ZipWriteStr(zinfo, random_string)
238
239 # Timestamp in the zinfo should be overwritten.
240 zinfo.date_time = (2015, 3, 1, 15, 30, 0)
241 self._test_ZipWriteStr(zinfo, random_string)
242
243 def test_ZipWriteStr_with_opts(self):
244 random_string = os.urandom(1024)
245 # Passing arcname
246 self._test_ZipWriteStr("foo", random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700247 "perms": 0o700,
Tao Baof3282b42015-04-01 11:21:55 -0700248 "compress_type": zipfile.ZIP_DEFLATED,
249 })
Tao Bao58c1b962015-05-20 09:32:18 -0700250 self._test_ZipWriteStr("bar", random_string, {
Tao Baof3282b42015-04-01 11:21:55 -0700251 "compress_type": zipfile.ZIP_STORED,
252 })
253
254 # Passing zinfo
255 zinfo = zipfile.ZipInfo(filename="foo")
256 self._test_ZipWriteStr(zinfo, random_string, {
257 "compress_type": zipfile.ZIP_DEFLATED,
258 })
259 self._test_ZipWriteStr(zinfo, random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700260 "perms": 0o600,
Tao Baof3282b42015-04-01 11:21:55 -0700261 "compress_type": zipfile.ZIP_STORED,
262 })
263
264 def test_ZipWriteStr_large_file(self):
265 # zipfile.writestr() doesn't work when the str size is over 2GiB even with
266 # the workaround. We will only test the case of writing a string into a
267 # large archive.
268 long_string = get_2gb_string()
269 short_string = os.urandom(1024)
270 self._test_ZipWriteStr_large_file(long_string, short_string, {
271 "compress_type": zipfile.ZIP_DEFLATED,
272 })
273
274 def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
275 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
276 zinfo = zipfile.ZipInfo(filename="foo")
277 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
Tao Bao58c1b962015-05-20 09:32:18 -0700278
279 def test_bug21309935(self):
280 zip_file = tempfile.NamedTemporaryFile(delete=False)
281 zip_file_name = zip_file.name
282 zip_file.close()
283
284 try:
285 random_string = os.urandom(1024)
286 zip_file = zipfile.ZipFile(zip_file_name, "w")
287 # Default perms should be 0o644 when passing the filename.
288 common.ZipWriteStr(zip_file, "foo", random_string)
289 # Honor the specified perms.
290 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
291 # The perms in zinfo should be untouched.
292 zinfo = zipfile.ZipInfo(filename="baz")
293 zinfo.external_attr = 0o740 << 16
294 common.ZipWriteStr(zip_file, zinfo, random_string)
295 # Explicitly specified perms has the priority.
296 zinfo = zipfile.ZipInfo(filename="qux")
297 zinfo.external_attr = 0o700 << 16
298 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
299 common.ZipClose(zip_file)
300
Tao Bao31b08072017-11-08 15:50:59 -0800301 self._verify(zip_file, zip_file_name, "foo",
302 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700303 expected_mode=0o644)
Tao Bao31b08072017-11-08 15:50:59 -0800304 self._verify(zip_file, zip_file_name, "bar",
305 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700306 expected_mode=0o755)
Tao Bao31b08072017-11-08 15:50:59 -0800307 self._verify(zip_file, zip_file_name, "baz",
308 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700309 expected_mode=0o740)
Tao Bao31b08072017-11-08 15:50:59 -0800310 self._verify(zip_file, zip_file_name, "qux",
311 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700312 expected_mode=0o400)
313 finally:
314 os.remove(zip_file_name)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700315
Tao Bao89d7ab22017-12-14 17:05:33 -0800316 def test_ZipDelete(self):
317 zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
318 output_zip = zipfile.ZipFile(zip_file.name, 'w',
319 compression=zipfile.ZIP_DEFLATED)
320 with tempfile.NamedTemporaryFile() as entry_file:
321 entry_file.write(os.urandom(1024))
322 common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
323 common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
324 common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
325 common.ZipClose(output_zip)
326 zip_file.close()
327
328 try:
329 common.ZipDelete(zip_file.name, 'Test2')
330 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
331 entries = check_zip.namelist()
332 self.assertTrue('Test1' in entries)
333 self.assertFalse('Test2' in entries)
334 self.assertTrue('Test3' in entries)
335
336 self.assertRaises(AssertionError, common.ZipDelete, zip_file.name,
337 'Test2')
338 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
339 entries = check_zip.namelist()
340 self.assertTrue('Test1' in entries)
341 self.assertFalse('Test2' in entries)
342 self.assertTrue('Test3' in entries)
343
344 common.ZipDelete(zip_file.name, ['Test3'])
345 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
346 entries = check_zip.namelist()
347 self.assertTrue('Test1' in entries)
348 self.assertFalse('Test2' in entries)
349 self.assertFalse('Test3' in entries)
350
351 common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
352 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
353 entries = check_zip.namelist()
354 self.assertFalse('Test1' in entries)
355 self.assertFalse('Test2' in entries)
356 self.assertFalse('Test3' in entries)
357 finally:
358 os.remove(zip_file.name)
359
360
Tao Bao818ddf52018-01-05 11:17:34 -0800361class CommonApkUtilsTest(unittest.TestCase):
362 """Tests the APK utils related functions."""
363
364 APKCERTS_TXT1 = (
365 'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
366 ' private_key="certs/devkey.pk8"\n'
367 'name="Settings.apk"'
368 ' certificate="build/target/product/security/platform.x509.pem"'
369 ' private_key="build/target/product/security/platform.pk8"\n'
370 'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
371 )
372
373 APKCERTS_CERTMAP1 = {
374 'RecoveryLocalizer.apk' : 'certs/devkey',
375 'Settings.apk' : 'build/target/product/security/platform',
376 'TV.apk' : 'PRESIGNED',
377 }
378
379 APKCERTS_TXT2 = (
380 'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
381 ' private_key="certs/compressed1.pk8" compressed="gz"\n'
382 'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
383 ' private_key="certs/compressed2.pk8" compressed="gz"\n'
384 'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
385 ' private_key="certs/compressed2.pk8" compressed="gz"\n'
386 'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
387 ' private_key="certs/compressed3.pk8" compressed="gz"\n'
388 )
389
390 APKCERTS_CERTMAP2 = {
391 'Compressed1.apk' : 'certs/compressed1',
392 'Compressed2a.apk' : 'certs/compressed2',
393 'Compressed2b.apk' : 'certs/compressed2',
394 'Compressed3.apk' : 'certs/compressed3',
395 }
396
397 APKCERTS_TXT3 = (
398 'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
399 ' private_key="certs/compressed4.pk8" compressed="xz"\n'
400 )
401
402 APKCERTS_CERTMAP3 = {
403 'Compressed4.apk' : 'certs/compressed4',
404 }
405
Tao Bao17e4e612018-02-16 17:12:54 -0800406 def setUp(self):
407 self.testdata_dir = test_utils.get_testdata_dir()
408
Tao Bao818ddf52018-01-05 11:17:34 -0800409 def tearDown(self):
410 common.Cleanup()
411
412 @staticmethod
413 def _write_apkcerts_txt(apkcerts_txt, additional=None):
414 if additional is None:
415 additional = []
416 target_files = common.MakeTempFile(suffix='.zip')
417 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
418 target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
419 for entry in additional:
420 target_files_zip.writestr(entry, '')
421 return target_files
422
423 def test_ReadApkCerts_NoncompressedApks(self):
424 target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
425 with zipfile.ZipFile(target_files, 'r') as input_zip:
426 certmap, ext = common.ReadApkCerts(input_zip)
427
428 self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
429 self.assertIsNone(ext)
430
431 def test_ReadApkCerts_CompressedApks(self):
432 # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
433 # not stored in '.gz' format, so it shouldn't be considered as installed.
434 target_files = self._write_apkcerts_txt(
435 self.APKCERTS_TXT2,
436 ['Compressed1.apk.gz', 'Compressed3.apk'])
437
438 with zipfile.ZipFile(target_files, 'r') as input_zip:
439 certmap, ext = common.ReadApkCerts(input_zip)
440
441 self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
442 self.assertEqual('.gz', ext)
443
444 # Alternative case with '.xz'.
445 target_files = self._write_apkcerts_txt(
446 self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
447
448 with zipfile.ZipFile(target_files, 'r') as input_zip:
449 certmap, ext = common.ReadApkCerts(input_zip)
450
451 self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
452 self.assertEqual('.xz', ext)
453
454 def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
455 target_files = self._write_apkcerts_txt(
456 self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
457 ['Compressed1.apk.gz', 'Compressed3.apk'])
458
459 with zipfile.ZipFile(target_files, 'r') as input_zip:
460 certmap, ext = common.ReadApkCerts(input_zip)
461
462 certmap_merged = self.APKCERTS_CERTMAP1.copy()
463 certmap_merged.update(self.APKCERTS_CERTMAP2)
464 self.assertDictEqual(certmap_merged, certmap)
465 self.assertEqual('.gz', ext)
466
467 def test_ReadApkCerts_MultipleCompressionMethods(self):
468 target_files = self._write_apkcerts_txt(
469 self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
470 ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
471
472 with zipfile.ZipFile(target_files, 'r') as input_zip:
473 self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
474
475 def test_ReadApkCerts_MismatchingKeys(self):
476 malformed_apkcerts_txt = (
477 'name="App1.apk" certificate="certs/cert1.x509.pem"'
478 ' private_key="certs/cert2.pk8"\n'
479 )
480 target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
481
482 with zipfile.ZipFile(target_files, 'r') as input_zip:
483 self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
484
Tao Bao04e1f012018-02-04 12:13:35 -0800485 def test_ExtractPublicKey(self):
Tao Bao17e4e612018-02-16 17:12:54 -0800486 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
487 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
Tao Bao04e1f012018-02-04 12:13:35 -0800488 with open(pubkey, 'rb') as pubkey_fp:
489 self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
490
491 def test_ExtractPublicKey_invalidInput(self):
Tao Bao17e4e612018-02-16 17:12:54 -0800492 wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
Tao Bao04e1f012018-02-04 12:13:35 -0800493 self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
494
Tao Bao17e4e612018-02-16 17:12:54 -0800495 def test_ParseCertificate(self):
496 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
497
498 cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
499 proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
500 expected, _ = proc.communicate()
501 self.assertEqual(0, proc.returncode)
502
503 with open(cert) as cert_fp:
504 actual = common.ParseCertificate(cert_fp.read())
505 self.assertEqual(expected, actual)
506
Tao Bao818ddf52018-01-05 11:17:34 -0800507
Tao Baofc7e0e02018-02-13 13:54:02 -0800508class CommonUtilsTest(unittest.TestCase):
509
510 def tearDown(self):
511 common.Cleanup()
512
513 def test_GetSparseImage_emptyBlockMapFile(self):
514 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
515 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
516 target_files_zip.write(
517 test_utils.construct_sparse_image([
518 (0xCAC1, 6),
519 (0xCAC3, 3),
520 (0xCAC1, 4)]),
521 arcname='IMAGES/system.img')
522 target_files_zip.writestr('IMAGES/system.map', '')
523 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
524 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
525
Tao Baodba59ee2018-01-09 13:21:02 -0800526 tempdir = common.UnzipTemp(target_files)
527 with zipfile.ZipFile(target_files, 'r') as input_zip:
528 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800529
530 self.assertDictEqual(
531 {
532 '__COPY': RangeSet("0"),
533 '__NONZERO-0': RangeSet("1-5 9-12"),
534 },
535 sparse_image.file_map)
536
537 def test_GetSparseImage_invalidImageName(self):
538 self.assertRaises(
539 AssertionError, common.GetSparseImage, 'system2', None, None, False)
540 self.assertRaises(
541 AssertionError, common.GetSparseImage, 'unknown', None, None, False)
542
543 def test_GetSparseImage_missingBlockMapFile(self):
544 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
545 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
546 target_files_zip.write(
547 test_utils.construct_sparse_image([
548 (0xCAC1, 6),
549 (0xCAC3, 3),
550 (0xCAC1, 4)]),
551 arcname='IMAGES/system.img')
552 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
553 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
554
Tao Baodba59ee2018-01-09 13:21:02 -0800555 tempdir = common.UnzipTemp(target_files)
556 with zipfile.ZipFile(target_files, 'r') as input_zip:
557 self.assertRaises(
558 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
559 False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800560
561 def test_GetSparseImage_sharedBlocks_notAllowed(self):
562 """Tests the case of having overlapping blocks but disallowed."""
563 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
564 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
565 target_files_zip.write(
566 test_utils.construct_sparse_image([(0xCAC2, 16)]),
567 arcname='IMAGES/system.img')
568 # Block 10 is shared between two files.
569 target_files_zip.writestr(
570 'IMAGES/system.map',
571 '\n'.join([
572 '/system/file1 1-5 9-10',
573 '/system/file2 10-12']))
574 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
575 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
576
Tao Baodba59ee2018-01-09 13:21:02 -0800577 tempdir = common.UnzipTemp(target_files)
578 with zipfile.ZipFile(target_files, 'r') as input_zip:
579 self.assertRaises(
580 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
581 False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800582
583 def test_GetSparseImage_sharedBlocks_allowed(self):
584 """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
585 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
586 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
587 # Construct an image with a care_map of "0-5 9-12".
588 target_files_zip.write(
589 test_utils.construct_sparse_image([(0xCAC2, 16)]),
590 arcname='IMAGES/system.img')
591 # Block 10 is shared between two files.
592 target_files_zip.writestr(
593 'IMAGES/system.map',
594 '\n'.join([
595 '/system/file1 1-5 9-10',
596 '/system/file2 10-12']))
597 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
598 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
599
Tao Baodba59ee2018-01-09 13:21:02 -0800600 tempdir = common.UnzipTemp(target_files)
601 with zipfile.ZipFile(target_files, 'r') as input_zip:
602 sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
Tao Baofc7e0e02018-02-13 13:54:02 -0800603
604 self.assertDictEqual(
605 {
606 '__COPY': RangeSet("0"),
607 '__NONZERO-0': RangeSet("6-8 13-15"),
608 '/system/file1': RangeSet("1-5 9-10"),
609 '/system/file2': RangeSet("11-12"),
610 },
611 sparse_image.file_map)
612
613 # '/system/file2' should be marked with 'uses_shared_blocks', but not with
614 # 'incomplete'.
615 self.assertTrue(
616 sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
617 self.assertNotIn(
618 'incomplete', sparse_image.file_map['/system/file2'].extra)
619
620 # All other entries should look normal without any tags.
621 self.assertFalse(sparse_image.file_map['__COPY'].extra)
622 self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
623 self.assertFalse(sparse_image.file_map['/system/file1'].extra)
624
625 def test_GetSparseImage_incompleteRanges(self):
626 """Tests the case of ext4 images with holes."""
627 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
628 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
629 target_files_zip.write(
630 test_utils.construct_sparse_image([(0xCAC2, 16)]),
631 arcname='IMAGES/system.img')
632 target_files_zip.writestr(
633 'IMAGES/system.map',
634 '\n'.join([
635 '/system/file1 1-5 9-10',
636 '/system/file2 11-12']))
637 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
638 # '/system/file2' has less blocks listed (2) than actual (3).
639 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
640
Tao Baodba59ee2018-01-09 13:21:02 -0800641 tempdir = common.UnzipTemp(target_files)
642 with zipfile.ZipFile(target_files, 'r') as input_zip:
643 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800644
645 self.assertFalse(sparse_image.file_map['/system/file1'].extra)
646 self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
647
648
Tianjie Xu9c384d22017-06-20 17:00:55 -0700649class InstallRecoveryScriptFormatTest(unittest.TestCase):
Tao Bao1c830bf2017-12-25 10:43:47 -0800650 """Checks the format of install-recovery.sh.
Tianjie Xu9c384d22017-06-20 17:00:55 -0700651
Tao Bao1c830bf2017-12-25 10:43:47 -0800652 Its format should match between common.py and validate_target_files.py.
653 """
Tianjie Xu9c384d22017-06-20 17:00:55 -0700654
655 def setUp(self):
Tao Bao1c830bf2017-12-25 10:43:47 -0800656 self._tempdir = common.MakeTempDir()
Tianjie Xu9c384d22017-06-20 17:00:55 -0700657 # Create a dummy dict that contains the fstab info for boot&recovery.
658 self._info = {"fstab" : {}}
Tao Bao1c830bf2017-12-25 10:43:47 -0800659 dummy_fstab = [
660 "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
661 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
Tao Bao31b08072017-11-08 15:50:59 -0800662 self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
Tianjie Xudf055582017-11-07 12:22:58 -0800663 # Construct the gzipped recovery.img and boot.img
664 self.recovery_data = bytearray([
665 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
666 0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
667 0x08, 0x00, 0x00, 0x00
668 ])
669 # echo -n "boot" | gzip -f | hd
670 self.boot_data = bytearray([
671 0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
672 0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
673 ])
Tianjie Xu9c384d22017-06-20 17:00:55 -0700674
675 def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
676 loc = os.path.join(self._tempdir, prefix, name)
677 if not os.path.exists(os.path.dirname(loc)):
678 os.makedirs(os.path.dirname(loc))
679 with open(loc, "w+") as f:
680 f.write(data)
681
682 def test_full_recovery(self):
Tao Bao31b08072017-11-08 15:50:59 -0800683 recovery_image = common.File("recovery.img", self.recovery_data)
684 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700685 self._info["full_recovery_image"] = "true"
686
687 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
688 recovery_image, boot_image, self._info)
689 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
690 self._info)
691
692 def test_recovery_from_boot(self):
Tao Bao31b08072017-11-08 15:50:59 -0800693 recovery_image = common.File("recovery.img", self.recovery_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700694 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
Tao Bao31b08072017-11-08 15:50:59 -0800695 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700696 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
697
698 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
699 recovery_image, boot_image, self._info)
700 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
701 self._info)
702 # Validate 'recovery-from-boot' with bonus argument.
703 self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
704 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
705 recovery_image, boot_image, self._info)
706 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
707 self._info)
708
709 def tearDown(self):
Tao Bao1c830bf2017-12-25 10:43:47 -0800710 common.Cleanup()