blob: 36dc6198a927993081524ebb97d48158038156ae [file] [log] [blame]
Dan Albert8e0178d2015-01-27 15:53:15 -08001#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
Tao Bao17e4e612018-02-16 17:12:54 -080016
Dan Albert8e0178d2015-01-27 15:53:15 -080017import os
Tao Bao17e4e612018-02-16 17:12:54 -080018import subprocess
Dan Albert8e0178d2015-01-27 15:53:15 -080019import tempfile
20import time
21import unittest
22import zipfile
Tao Bao31b08072017-11-08 15:50:59 -080023from hashlib import sha1
24
Dan Albert8e0178d2015-01-27 15:53:15 -080025import common
Tao Bao04e1f012018-02-04 12:13:35 -080026import test_utils
Tianjie Xu9c384d22017-06-20 17:00:55 -070027import validate_target_files
Dan Albert8e0178d2015-01-27 15:53:15 -080028
Tao Bao04e1f012018-02-04 12:13:35 -080029
Tao Bao31b08072017-11-08 15:50:59 -080030KiB = 1024
31MiB = 1024 * KiB
32GiB = 1024 * MiB
Dan Albert8e0178d2015-01-27 15:53:15 -080033
Tao Bao1c830bf2017-12-25 10:43:47 -080034
Tao Baof3282b42015-04-01 11:21:55 -070035def get_2gb_string():
Tao Bao31b08072017-11-08 15:50:59 -080036 size = int(2 * GiB + 1)
37 block_size = 4 * KiB
38 step_size = 4 * MiB
39 # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
40 for _ in range(0, size, step_size):
41 yield os.urandom(block_size)
42 yield '\0' * (step_size - block_size)
Tao Baof3282b42015-04-01 11:21:55 -070043
Dan Albert8e0178d2015-01-27 15:53:15 -080044
45class CommonZipTest(unittest.TestCase):
Tao Bao31b08072017-11-08 15:50:59 -080046 def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
Tao Baof3282b42015-04-01 11:21:55 -070047 test_file_name=None, expected_stat=None, expected_mode=0o644,
48 expected_compress_type=zipfile.ZIP_STORED):
49 # Verify the stat if present.
50 if test_file_name is not None:
51 new_stat = os.stat(test_file_name)
52 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
53 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
54
55 # Reopen the zip file to verify.
56 zip_file = zipfile.ZipFile(zip_file_name, "r")
57
58 # Verify the timestamp.
59 info = zip_file.getinfo(arcname)
60 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
61
62 # Verify the file mode.
63 mode = (info.external_attr >> 16) & 0o777
64 self.assertEqual(mode, expected_mode)
65
66 # Verify the compress type.
67 self.assertEqual(info.compress_type, expected_compress_type)
68
69 # Verify the zip contents.
Tao Bao31b08072017-11-08 15:50:59 -080070 entry = zip_file.open(arcname)
71 sha1_hash = sha1()
72 for chunk in iter(lambda: entry.read(4 * MiB), ''):
73 sha1_hash.update(chunk)
74 self.assertEqual(expected_hash, sha1_hash.hexdigest())
Tao Baof3282b42015-04-01 11:21:55 -070075 self.assertIsNone(zip_file.testzip())
76
Dan Albert8e0178d2015-01-27 15:53:15 -080077 def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
78 extra_zipwrite_args = dict(extra_zipwrite_args or {})
79
80 test_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080081 test_file_name = test_file.name
Tao Baof3282b42015-04-01 11:21:55 -070082
83 zip_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080084 zip_file_name = zip_file.name
85
86 # File names within an archive strip the leading slash.
87 arcname = extra_zipwrite_args.get("arcname", test_file_name)
88 if arcname[0] == "/":
89 arcname = arcname[1:]
90
91 zip_file.close()
92 zip_file = zipfile.ZipFile(zip_file_name, "w")
93
94 try:
Tao Bao31b08072017-11-08 15:50:59 -080095 sha1_hash = sha1()
96 for data in contents:
97 sha1_hash.update(data)
98 test_file.write(data)
Dan Albert8e0178d2015-01-27 15:53:15 -080099 test_file.close()
100
Tao Baof3282b42015-04-01 11:21:55 -0700101 expected_stat = os.stat(test_file_name)
Dan Albert8e0178d2015-01-27 15:53:15 -0800102 expected_mode = extra_zipwrite_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700103 expected_compress_type = extra_zipwrite_args.get("compress_type",
104 zipfile.ZIP_STORED)
Dan Albert8e0178d2015-01-27 15:53:15 -0800105 time.sleep(5) # Make sure the atime/mtime will change measurably.
106
107 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
Tao Baof3282b42015-04-01 11:21:55 -0700108 common.ZipClose(zip_file)
Dan Albert8e0178d2015-01-27 15:53:15 -0800109
Tao Bao31b08072017-11-08 15:50:59 -0800110 self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
111 test_file_name, expected_stat, expected_mode,
112 expected_compress_type)
Dan Albert8e0178d2015-01-27 15:53:15 -0800113 finally:
114 os.remove(test_file_name)
115 os.remove(zip_file_name)
116
Tao Baof3282b42015-04-01 11:21:55 -0700117 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
118 extra_args = dict(extra_args or {})
119
120 zip_file = tempfile.NamedTemporaryFile(delete=False)
121 zip_file_name = zip_file.name
122 zip_file.close()
123
124 zip_file = zipfile.ZipFile(zip_file_name, "w")
125
126 try:
127 expected_compress_type = extra_args.get("compress_type",
128 zipfile.ZIP_STORED)
129 time.sleep(5) # Make sure the atime/mtime will change measurably.
130
131 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
Tao Bao58c1b962015-05-20 09:32:18 -0700132 arcname = zinfo_or_arcname
133 expected_mode = extra_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700134 else:
Tao Bao58c1b962015-05-20 09:32:18 -0700135 arcname = zinfo_or_arcname.filename
136 expected_mode = extra_args.get("perms",
137 zinfo_or_arcname.external_attr >> 16)
Tao Baof3282b42015-04-01 11:21:55 -0700138
Tao Bao58c1b962015-05-20 09:32:18 -0700139 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
Tao Baof3282b42015-04-01 11:21:55 -0700140 common.ZipClose(zip_file)
141
Tao Bao31b08072017-11-08 15:50:59 -0800142 self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700143 expected_mode=expected_mode,
Tao Baof3282b42015-04-01 11:21:55 -0700144 expected_compress_type=expected_compress_type)
145 finally:
146 os.remove(zip_file_name)
147
148 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
149 extra_args = dict(extra_args or {})
150
151 zip_file = tempfile.NamedTemporaryFile(delete=False)
152 zip_file_name = zip_file.name
153
154 test_file = tempfile.NamedTemporaryFile(delete=False)
155 test_file_name = test_file.name
156
157 arcname_large = test_file_name
158 arcname_small = "bar"
159
160 # File names within an archive strip the leading slash.
161 if arcname_large[0] == "/":
162 arcname_large = arcname_large[1:]
163
164 zip_file.close()
165 zip_file = zipfile.ZipFile(zip_file_name, "w")
166
167 try:
Tao Bao31b08072017-11-08 15:50:59 -0800168 sha1_hash = sha1()
169 for data in large:
170 sha1_hash.update(data)
171 test_file.write(data)
Tao Baof3282b42015-04-01 11:21:55 -0700172 test_file.close()
173
174 expected_stat = os.stat(test_file_name)
175 expected_mode = 0o644
176 expected_compress_type = extra_args.get("compress_type",
177 zipfile.ZIP_STORED)
178 time.sleep(5) # Make sure the atime/mtime will change measurably.
179
180 common.ZipWrite(zip_file, test_file_name, **extra_args)
181 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
182 common.ZipClose(zip_file)
183
184 # Verify the contents written by ZipWrite().
Tao Bao31b08072017-11-08 15:50:59 -0800185 self._verify(zip_file, zip_file_name, arcname_large,
186 sha1_hash.hexdigest(), test_file_name, expected_stat,
187 expected_mode, expected_compress_type)
Tao Baof3282b42015-04-01 11:21:55 -0700188
189 # Verify the contents written by ZipWriteStr().
Tao Bao31b08072017-11-08 15:50:59 -0800190 self._verify(zip_file, zip_file_name, arcname_small,
191 sha1(small).hexdigest(),
Tao Baof3282b42015-04-01 11:21:55 -0700192 expected_compress_type=expected_compress_type)
193 finally:
194 os.remove(zip_file_name)
195 os.remove(test_file_name)
196
197 def _test_reset_ZIP64_LIMIT(self, func, *args):
198 default_limit = (1 << 31) - 1
199 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
200 func(*args)
201 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
202
Dan Albert8e0178d2015-01-27 15:53:15 -0800203 def test_ZipWrite(self):
204 file_contents = os.urandom(1024)
205 self._test_ZipWrite(file_contents)
206
207 def test_ZipWrite_with_opts(self):
208 file_contents = os.urandom(1024)
209 self._test_ZipWrite(file_contents, {
210 "arcname": "foobar",
211 "perms": 0o777,
212 "compress_type": zipfile.ZIP_DEFLATED,
213 })
Tao Baof3282b42015-04-01 11:21:55 -0700214 self._test_ZipWrite(file_contents, {
215 "arcname": "foobar",
216 "perms": 0o700,
217 "compress_type": zipfile.ZIP_STORED,
218 })
Dan Albert8e0178d2015-01-27 15:53:15 -0800219
220 def test_ZipWrite_large_file(self):
Tao Baof3282b42015-04-01 11:21:55 -0700221 file_contents = get_2gb_string()
Dan Albert8e0178d2015-01-27 15:53:15 -0800222 self._test_ZipWrite(file_contents, {
223 "compress_type": zipfile.ZIP_DEFLATED,
224 })
225
226 def test_ZipWrite_resets_ZIP64_LIMIT(self):
Tao Baof3282b42015-04-01 11:21:55 -0700227 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
228
229 def test_ZipWriteStr(self):
230 random_string = os.urandom(1024)
231 # Passing arcname
232 self._test_ZipWriteStr("foo", random_string)
233
234 # Passing zinfo
235 zinfo = zipfile.ZipInfo(filename="foo")
236 self._test_ZipWriteStr(zinfo, random_string)
237
238 # Timestamp in the zinfo should be overwritten.
239 zinfo.date_time = (2015, 3, 1, 15, 30, 0)
240 self._test_ZipWriteStr(zinfo, random_string)
241
242 def test_ZipWriteStr_with_opts(self):
243 random_string = os.urandom(1024)
244 # Passing arcname
245 self._test_ZipWriteStr("foo", random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700246 "perms": 0o700,
Tao Baof3282b42015-04-01 11:21:55 -0700247 "compress_type": zipfile.ZIP_DEFLATED,
248 })
Tao Bao58c1b962015-05-20 09:32:18 -0700249 self._test_ZipWriteStr("bar", random_string, {
Tao Baof3282b42015-04-01 11:21:55 -0700250 "compress_type": zipfile.ZIP_STORED,
251 })
252
253 # Passing zinfo
254 zinfo = zipfile.ZipInfo(filename="foo")
255 self._test_ZipWriteStr(zinfo, random_string, {
256 "compress_type": zipfile.ZIP_DEFLATED,
257 })
258 self._test_ZipWriteStr(zinfo, random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700259 "perms": 0o600,
Tao Baof3282b42015-04-01 11:21:55 -0700260 "compress_type": zipfile.ZIP_STORED,
261 })
262
263 def test_ZipWriteStr_large_file(self):
264 # zipfile.writestr() doesn't work when the str size is over 2GiB even with
265 # the workaround. We will only test the case of writing a string into a
266 # large archive.
267 long_string = get_2gb_string()
268 short_string = os.urandom(1024)
269 self._test_ZipWriteStr_large_file(long_string, short_string, {
270 "compress_type": zipfile.ZIP_DEFLATED,
271 })
272
273 def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
274 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
275 zinfo = zipfile.ZipInfo(filename="foo")
276 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
Tao Bao58c1b962015-05-20 09:32:18 -0700277
278 def test_bug21309935(self):
279 zip_file = tempfile.NamedTemporaryFile(delete=False)
280 zip_file_name = zip_file.name
281 zip_file.close()
282
283 try:
284 random_string = os.urandom(1024)
285 zip_file = zipfile.ZipFile(zip_file_name, "w")
286 # Default perms should be 0o644 when passing the filename.
287 common.ZipWriteStr(zip_file, "foo", random_string)
288 # Honor the specified perms.
289 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
290 # The perms in zinfo should be untouched.
291 zinfo = zipfile.ZipInfo(filename="baz")
292 zinfo.external_attr = 0o740 << 16
293 common.ZipWriteStr(zip_file, zinfo, random_string)
294 # Explicitly specified perms has the priority.
295 zinfo = zipfile.ZipInfo(filename="qux")
296 zinfo.external_attr = 0o700 << 16
297 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
298 common.ZipClose(zip_file)
299
Tao Bao31b08072017-11-08 15:50:59 -0800300 self._verify(zip_file, zip_file_name, "foo",
301 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700302 expected_mode=0o644)
Tao Bao31b08072017-11-08 15:50:59 -0800303 self._verify(zip_file, zip_file_name, "bar",
304 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700305 expected_mode=0o755)
Tao Bao31b08072017-11-08 15:50:59 -0800306 self._verify(zip_file, zip_file_name, "baz",
307 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700308 expected_mode=0o740)
Tao Bao31b08072017-11-08 15:50:59 -0800309 self._verify(zip_file, zip_file_name, "qux",
310 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700311 expected_mode=0o400)
312 finally:
313 os.remove(zip_file_name)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700314
Tao Bao89d7ab22017-12-14 17:05:33 -0800315 def test_ZipDelete(self):
316 zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
317 output_zip = zipfile.ZipFile(zip_file.name, 'w',
318 compression=zipfile.ZIP_DEFLATED)
319 with tempfile.NamedTemporaryFile() as entry_file:
320 entry_file.write(os.urandom(1024))
321 common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
322 common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
323 common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
324 common.ZipClose(output_zip)
325 zip_file.close()
326
327 try:
328 common.ZipDelete(zip_file.name, 'Test2')
329 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
330 entries = check_zip.namelist()
331 self.assertTrue('Test1' in entries)
332 self.assertFalse('Test2' in entries)
333 self.assertTrue('Test3' in entries)
334
335 self.assertRaises(AssertionError, common.ZipDelete, zip_file.name,
336 'Test2')
337 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
338 entries = check_zip.namelist()
339 self.assertTrue('Test1' in entries)
340 self.assertFalse('Test2' in entries)
341 self.assertTrue('Test3' in entries)
342
343 common.ZipDelete(zip_file.name, ['Test3'])
344 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
345 entries = check_zip.namelist()
346 self.assertTrue('Test1' in entries)
347 self.assertFalse('Test2' in entries)
348 self.assertFalse('Test3' in entries)
349
350 common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
351 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
352 entries = check_zip.namelist()
353 self.assertFalse('Test1' in entries)
354 self.assertFalse('Test2' in entries)
355 self.assertFalse('Test3' in entries)
356 finally:
357 os.remove(zip_file.name)
358
359
Tao Bao818ddf52018-01-05 11:17:34 -0800360class CommonApkUtilsTest(unittest.TestCase):
361 """Tests the APK utils related functions."""
362
363 APKCERTS_TXT1 = (
364 'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
365 ' private_key="certs/devkey.pk8"\n'
366 'name="Settings.apk"'
367 ' certificate="build/target/product/security/platform.x509.pem"'
368 ' private_key="build/target/product/security/platform.pk8"\n'
369 'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
370 )
371
372 APKCERTS_CERTMAP1 = {
373 'RecoveryLocalizer.apk' : 'certs/devkey',
374 'Settings.apk' : 'build/target/product/security/platform',
375 'TV.apk' : 'PRESIGNED',
376 }
377
378 APKCERTS_TXT2 = (
379 'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
380 ' private_key="certs/compressed1.pk8" compressed="gz"\n'
381 'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
382 ' private_key="certs/compressed2.pk8" compressed="gz"\n'
383 'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
384 ' private_key="certs/compressed2.pk8" compressed="gz"\n'
385 'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
386 ' private_key="certs/compressed3.pk8" compressed="gz"\n'
387 )
388
389 APKCERTS_CERTMAP2 = {
390 'Compressed1.apk' : 'certs/compressed1',
391 'Compressed2a.apk' : 'certs/compressed2',
392 'Compressed2b.apk' : 'certs/compressed2',
393 'Compressed3.apk' : 'certs/compressed3',
394 }
395
396 APKCERTS_TXT3 = (
397 'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
398 ' private_key="certs/compressed4.pk8" compressed="xz"\n'
399 )
400
401 APKCERTS_CERTMAP3 = {
402 'Compressed4.apk' : 'certs/compressed4',
403 }
404
Tao Bao17e4e612018-02-16 17:12:54 -0800405 def setUp(self):
406 self.testdata_dir = test_utils.get_testdata_dir()
407
Tao Bao818ddf52018-01-05 11:17:34 -0800408 def tearDown(self):
409 common.Cleanup()
410
411 @staticmethod
412 def _write_apkcerts_txt(apkcerts_txt, additional=None):
413 if additional is None:
414 additional = []
415 target_files = common.MakeTempFile(suffix='.zip')
416 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
417 target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
418 for entry in additional:
419 target_files_zip.writestr(entry, '')
420 return target_files
421
422 def test_ReadApkCerts_NoncompressedApks(self):
423 target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
424 with zipfile.ZipFile(target_files, 'r') as input_zip:
425 certmap, ext = common.ReadApkCerts(input_zip)
426
427 self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
428 self.assertIsNone(ext)
429
430 def test_ReadApkCerts_CompressedApks(self):
431 # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
432 # not stored in '.gz' format, so it shouldn't be considered as installed.
433 target_files = self._write_apkcerts_txt(
434 self.APKCERTS_TXT2,
435 ['Compressed1.apk.gz', 'Compressed3.apk'])
436
437 with zipfile.ZipFile(target_files, 'r') as input_zip:
438 certmap, ext = common.ReadApkCerts(input_zip)
439
440 self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
441 self.assertEqual('.gz', ext)
442
443 # Alternative case with '.xz'.
444 target_files = self._write_apkcerts_txt(
445 self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
446
447 with zipfile.ZipFile(target_files, 'r') as input_zip:
448 certmap, ext = common.ReadApkCerts(input_zip)
449
450 self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
451 self.assertEqual('.xz', ext)
452
453 def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
454 target_files = self._write_apkcerts_txt(
455 self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
456 ['Compressed1.apk.gz', 'Compressed3.apk'])
457
458 with zipfile.ZipFile(target_files, 'r') as input_zip:
459 certmap, ext = common.ReadApkCerts(input_zip)
460
461 certmap_merged = self.APKCERTS_CERTMAP1.copy()
462 certmap_merged.update(self.APKCERTS_CERTMAP2)
463 self.assertDictEqual(certmap_merged, certmap)
464 self.assertEqual('.gz', ext)
465
466 def test_ReadApkCerts_MultipleCompressionMethods(self):
467 target_files = self._write_apkcerts_txt(
468 self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
469 ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
470
471 with zipfile.ZipFile(target_files, 'r') as input_zip:
472 self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
473
474 def test_ReadApkCerts_MismatchingKeys(self):
475 malformed_apkcerts_txt = (
476 'name="App1.apk" certificate="certs/cert1.x509.pem"'
477 ' private_key="certs/cert2.pk8"\n'
478 )
479 target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
480
481 with zipfile.ZipFile(target_files, 'r') as input_zip:
482 self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
483
Tao Bao04e1f012018-02-04 12:13:35 -0800484 def test_ExtractPublicKey(self):
Tao Bao17e4e612018-02-16 17:12:54 -0800485 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
486 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
Tao Bao04e1f012018-02-04 12:13:35 -0800487 with open(pubkey, 'rb') as pubkey_fp:
488 self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
489
490 def test_ExtractPublicKey_invalidInput(self):
Tao Bao17e4e612018-02-16 17:12:54 -0800491 wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
Tao Bao04e1f012018-02-04 12:13:35 -0800492 self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
493
Tao Bao17e4e612018-02-16 17:12:54 -0800494 def test_ParseCertificate(self):
495 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
496
497 cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
498 proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
499 expected, _ = proc.communicate()
500 self.assertEqual(0, proc.returncode)
501
502 with open(cert) as cert_fp:
503 actual = common.ParseCertificate(cert_fp.read())
504 self.assertEqual(expected, actual)
505
Tao Bao818ddf52018-01-05 11:17:34 -0800506
Tianjie Xu9c384d22017-06-20 17:00:55 -0700507class InstallRecoveryScriptFormatTest(unittest.TestCase):
Tao Bao1c830bf2017-12-25 10:43:47 -0800508 """Checks the format of install-recovery.sh.
Tianjie Xu9c384d22017-06-20 17:00:55 -0700509
Tao Bao1c830bf2017-12-25 10:43:47 -0800510 Its format should match between common.py and validate_target_files.py.
511 """
Tianjie Xu9c384d22017-06-20 17:00:55 -0700512
513 def setUp(self):
Tao Bao1c830bf2017-12-25 10:43:47 -0800514 self._tempdir = common.MakeTempDir()
Tianjie Xu9c384d22017-06-20 17:00:55 -0700515 # Create a dummy dict that contains the fstab info for boot&recovery.
516 self._info = {"fstab" : {}}
Tao Bao1c830bf2017-12-25 10:43:47 -0800517 dummy_fstab = [
518 "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
519 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
Tao Bao31b08072017-11-08 15:50:59 -0800520 self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
Tianjie Xudf055582017-11-07 12:22:58 -0800521 # Construct the gzipped recovery.img and boot.img
522 self.recovery_data = bytearray([
523 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
524 0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
525 0x08, 0x00, 0x00, 0x00
526 ])
527 # echo -n "boot" | gzip -f | hd
528 self.boot_data = bytearray([
529 0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
530 0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
531 ])
Tianjie Xu9c384d22017-06-20 17:00:55 -0700532
533 def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
534 loc = os.path.join(self._tempdir, prefix, name)
535 if not os.path.exists(os.path.dirname(loc)):
536 os.makedirs(os.path.dirname(loc))
537 with open(loc, "w+") as f:
538 f.write(data)
539
540 def test_full_recovery(self):
Tao Bao31b08072017-11-08 15:50:59 -0800541 recovery_image = common.File("recovery.img", self.recovery_data)
542 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700543 self._info["full_recovery_image"] = "true"
544
545 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
546 recovery_image, boot_image, self._info)
547 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
548 self._info)
549
550 def test_recovery_from_boot(self):
Tao Bao31b08072017-11-08 15:50:59 -0800551 recovery_image = common.File("recovery.img", self.recovery_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700552 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
Tao Bao31b08072017-11-08 15:50:59 -0800553 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700554 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
555
556 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
557 recovery_image, boot_image, self._info)
558 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
559 self._info)
560 # Validate 'recovery-from-boot' with bonus argument.
561 self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
562 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
563 recovery_image, boot_image, self._info)
564 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
565 self._info)
566
567 def tearDown(self):
Tao Bao1c830bf2017-12-25 10:43:47 -0800568 common.Cleanup()