blob: dce27fe1c395366e19eda0cf162468fa96ee21b7 [file] [log] [blame]
Dan Albert8e0178d2015-01-27 15:53:15 -08001#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
Tao Baofc7e0e02018-02-13 13:54:02 -080016
Tao Baoa57ab9f2018-08-24 12:08:38 -070017import copy
Dan Albert8e0178d2015-01-27 15:53:15 -080018import os
Tao Bao17e4e612018-02-16 17:12:54 -080019import subprocess
Dan Albert8e0178d2015-01-27 15:53:15 -080020import tempfile
21import time
Dan Albert8e0178d2015-01-27 15:53:15 -080022import zipfile
Tao Bao31b08072017-11-08 15:50:59 -080023from hashlib import sha1
24
Dan Albert8e0178d2015-01-27 15:53:15 -080025import common
Tao Bao04e1f012018-02-04 12:13:35 -080026import test_utils
Tianjie Xu9c384d22017-06-20 17:00:55 -070027import validate_target_files
Tao Baofc7e0e02018-02-13 13:54:02 -080028from rangelib import RangeSet
Dan Albert8e0178d2015-01-27 15:53:15 -080029
Yifan Hongbb2658d2019-01-25 12:30:58 -080030from blockimgdiff import EmptyImage, DataImage
Tao Bao04e1f012018-02-04 12:13:35 -080031
Tao Bao31b08072017-11-08 15:50:59 -080032KiB = 1024
33MiB = 1024 * KiB
34GiB = 1024 * MiB
Dan Albert8e0178d2015-01-27 15:53:15 -080035
Tao Bao1c830bf2017-12-25 10:43:47 -080036
Tao Baof3282b42015-04-01 11:21:55 -070037def get_2gb_string():
Tao Bao31b08072017-11-08 15:50:59 -080038 size = int(2 * GiB + 1)
39 block_size = 4 * KiB
40 step_size = 4 * MiB
41 # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
42 for _ in range(0, size, step_size):
43 yield os.urandom(block_size)
44 yield '\0' * (step_size - block_size)
Tao Baof3282b42015-04-01 11:21:55 -070045
Dan Albert8e0178d2015-01-27 15:53:15 -080046
Tao Bao65b94e92018-10-11 21:57:26 -070047class CommonZipTest(test_utils.ReleaseToolsTestCase):
48
Tao Bao31b08072017-11-08 15:50:59 -080049 def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
Tao Baof3282b42015-04-01 11:21:55 -070050 test_file_name=None, expected_stat=None, expected_mode=0o644,
51 expected_compress_type=zipfile.ZIP_STORED):
52 # Verify the stat if present.
53 if test_file_name is not None:
54 new_stat = os.stat(test_file_name)
55 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
56 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
57
58 # Reopen the zip file to verify.
59 zip_file = zipfile.ZipFile(zip_file_name, "r")
60
61 # Verify the timestamp.
62 info = zip_file.getinfo(arcname)
63 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
64
65 # Verify the file mode.
66 mode = (info.external_attr >> 16) & 0o777
67 self.assertEqual(mode, expected_mode)
68
69 # Verify the compress type.
70 self.assertEqual(info.compress_type, expected_compress_type)
71
72 # Verify the zip contents.
Tao Bao31b08072017-11-08 15:50:59 -080073 entry = zip_file.open(arcname)
74 sha1_hash = sha1()
75 for chunk in iter(lambda: entry.read(4 * MiB), ''):
76 sha1_hash.update(chunk)
77 self.assertEqual(expected_hash, sha1_hash.hexdigest())
Tao Baof3282b42015-04-01 11:21:55 -070078 self.assertIsNone(zip_file.testzip())
79
Dan Albert8e0178d2015-01-27 15:53:15 -080080 def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
81 extra_zipwrite_args = dict(extra_zipwrite_args or {})
82
83 test_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080084 test_file_name = test_file.name
Tao Baof3282b42015-04-01 11:21:55 -070085
86 zip_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080087 zip_file_name = zip_file.name
88
89 # File names within an archive strip the leading slash.
90 arcname = extra_zipwrite_args.get("arcname", test_file_name)
91 if arcname[0] == "/":
92 arcname = arcname[1:]
93
94 zip_file.close()
95 zip_file = zipfile.ZipFile(zip_file_name, "w")
96
97 try:
Tao Bao31b08072017-11-08 15:50:59 -080098 sha1_hash = sha1()
99 for data in contents:
100 sha1_hash.update(data)
101 test_file.write(data)
Dan Albert8e0178d2015-01-27 15:53:15 -0800102 test_file.close()
103
Tao Baof3282b42015-04-01 11:21:55 -0700104 expected_stat = os.stat(test_file_name)
Dan Albert8e0178d2015-01-27 15:53:15 -0800105 expected_mode = extra_zipwrite_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700106 expected_compress_type = extra_zipwrite_args.get("compress_type",
107 zipfile.ZIP_STORED)
Dan Albert8e0178d2015-01-27 15:53:15 -0800108 time.sleep(5) # Make sure the atime/mtime will change measurably.
109
110 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
Tao Baof3282b42015-04-01 11:21:55 -0700111 common.ZipClose(zip_file)
Dan Albert8e0178d2015-01-27 15:53:15 -0800112
Tao Bao31b08072017-11-08 15:50:59 -0800113 self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
114 test_file_name, expected_stat, expected_mode,
115 expected_compress_type)
Dan Albert8e0178d2015-01-27 15:53:15 -0800116 finally:
117 os.remove(test_file_name)
118 os.remove(zip_file_name)
119
Tao Baof3282b42015-04-01 11:21:55 -0700120 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
121 extra_args = dict(extra_args or {})
122
123 zip_file = tempfile.NamedTemporaryFile(delete=False)
124 zip_file_name = zip_file.name
125 zip_file.close()
126
127 zip_file = zipfile.ZipFile(zip_file_name, "w")
128
129 try:
130 expected_compress_type = extra_args.get("compress_type",
131 zipfile.ZIP_STORED)
132 time.sleep(5) # Make sure the atime/mtime will change measurably.
133
134 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
Tao Bao58c1b962015-05-20 09:32:18 -0700135 arcname = zinfo_or_arcname
136 expected_mode = extra_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700137 else:
Tao Bao58c1b962015-05-20 09:32:18 -0700138 arcname = zinfo_or_arcname.filename
139 expected_mode = extra_args.get("perms",
140 zinfo_or_arcname.external_attr >> 16)
Tao Baof3282b42015-04-01 11:21:55 -0700141
Tao Bao58c1b962015-05-20 09:32:18 -0700142 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
Tao Baof3282b42015-04-01 11:21:55 -0700143 common.ZipClose(zip_file)
144
Tao Bao31b08072017-11-08 15:50:59 -0800145 self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700146 expected_mode=expected_mode,
Tao Baof3282b42015-04-01 11:21:55 -0700147 expected_compress_type=expected_compress_type)
148 finally:
149 os.remove(zip_file_name)
150
151 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
152 extra_args = dict(extra_args or {})
153
154 zip_file = tempfile.NamedTemporaryFile(delete=False)
155 zip_file_name = zip_file.name
156
157 test_file = tempfile.NamedTemporaryFile(delete=False)
158 test_file_name = test_file.name
159
160 arcname_large = test_file_name
161 arcname_small = "bar"
162
163 # File names within an archive strip the leading slash.
164 if arcname_large[0] == "/":
165 arcname_large = arcname_large[1:]
166
167 zip_file.close()
168 zip_file = zipfile.ZipFile(zip_file_name, "w")
169
170 try:
Tao Bao31b08072017-11-08 15:50:59 -0800171 sha1_hash = sha1()
172 for data in large:
173 sha1_hash.update(data)
174 test_file.write(data)
Tao Baof3282b42015-04-01 11:21:55 -0700175 test_file.close()
176
177 expected_stat = os.stat(test_file_name)
178 expected_mode = 0o644
179 expected_compress_type = extra_args.get("compress_type",
180 zipfile.ZIP_STORED)
181 time.sleep(5) # Make sure the atime/mtime will change measurably.
182
183 common.ZipWrite(zip_file, test_file_name, **extra_args)
184 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
185 common.ZipClose(zip_file)
186
187 # Verify the contents written by ZipWrite().
Tao Bao31b08072017-11-08 15:50:59 -0800188 self._verify(zip_file, zip_file_name, arcname_large,
189 sha1_hash.hexdigest(), test_file_name, expected_stat,
190 expected_mode, expected_compress_type)
Tao Baof3282b42015-04-01 11:21:55 -0700191
192 # Verify the contents written by ZipWriteStr().
Tao Bao31b08072017-11-08 15:50:59 -0800193 self._verify(zip_file, zip_file_name, arcname_small,
194 sha1(small).hexdigest(),
Tao Baof3282b42015-04-01 11:21:55 -0700195 expected_compress_type=expected_compress_type)
196 finally:
197 os.remove(zip_file_name)
198 os.remove(test_file_name)
199
200 def _test_reset_ZIP64_LIMIT(self, func, *args):
201 default_limit = (1 << 31) - 1
202 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
203 func(*args)
204 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
205
Dan Albert8e0178d2015-01-27 15:53:15 -0800206 def test_ZipWrite(self):
207 file_contents = os.urandom(1024)
208 self._test_ZipWrite(file_contents)
209
210 def test_ZipWrite_with_opts(self):
211 file_contents = os.urandom(1024)
212 self._test_ZipWrite(file_contents, {
213 "arcname": "foobar",
214 "perms": 0o777,
215 "compress_type": zipfile.ZIP_DEFLATED,
216 })
Tao Baof3282b42015-04-01 11:21:55 -0700217 self._test_ZipWrite(file_contents, {
218 "arcname": "foobar",
219 "perms": 0o700,
220 "compress_type": zipfile.ZIP_STORED,
221 })
Dan Albert8e0178d2015-01-27 15:53:15 -0800222
223 def test_ZipWrite_large_file(self):
Tao Baof3282b42015-04-01 11:21:55 -0700224 file_contents = get_2gb_string()
Dan Albert8e0178d2015-01-27 15:53:15 -0800225 self._test_ZipWrite(file_contents, {
226 "compress_type": zipfile.ZIP_DEFLATED,
227 })
228
229 def test_ZipWrite_resets_ZIP64_LIMIT(self):
Tao Baof3282b42015-04-01 11:21:55 -0700230 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
231
232 def test_ZipWriteStr(self):
233 random_string = os.urandom(1024)
234 # Passing arcname
235 self._test_ZipWriteStr("foo", random_string)
236
237 # Passing zinfo
238 zinfo = zipfile.ZipInfo(filename="foo")
239 self._test_ZipWriteStr(zinfo, random_string)
240
241 # Timestamp in the zinfo should be overwritten.
242 zinfo.date_time = (2015, 3, 1, 15, 30, 0)
243 self._test_ZipWriteStr(zinfo, random_string)
244
245 def test_ZipWriteStr_with_opts(self):
246 random_string = os.urandom(1024)
247 # Passing arcname
248 self._test_ZipWriteStr("foo", random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700249 "perms": 0o700,
Tao Baof3282b42015-04-01 11:21:55 -0700250 "compress_type": zipfile.ZIP_DEFLATED,
251 })
Tao Bao58c1b962015-05-20 09:32:18 -0700252 self._test_ZipWriteStr("bar", random_string, {
Tao Baof3282b42015-04-01 11:21:55 -0700253 "compress_type": zipfile.ZIP_STORED,
254 })
255
256 # Passing zinfo
257 zinfo = zipfile.ZipInfo(filename="foo")
258 self._test_ZipWriteStr(zinfo, random_string, {
259 "compress_type": zipfile.ZIP_DEFLATED,
260 })
261 self._test_ZipWriteStr(zinfo, random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700262 "perms": 0o600,
Tao Baof3282b42015-04-01 11:21:55 -0700263 "compress_type": zipfile.ZIP_STORED,
264 })
265
266 def test_ZipWriteStr_large_file(self):
267 # zipfile.writestr() doesn't work when the str size is over 2GiB even with
268 # the workaround. We will only test the case of writing a string into a
269 # large archive.
270 long_string = get_2gb_string()
271 short_string = os.urandom(1024)
272 self._test_ZipWriteStr_large_file(long_string, short_string, {
273 "compress_type": zipfile.ZIP_DEFLATED,
274 })
275
276 def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
277 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
278 zinfo = zipfile.ZipInfo(filename="foo")
279 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
Tao Bao58c1b962015-05-20 09:32:18 -0700280
281 def test_bug21309935(self):
282 zip_file = tempfile.NamedTemporaryFile(delete=False)
283 zip_file_name = zip_file.name
284 zip_file.close()
285
286 try:
287 random_string = os.urandom(1024)
288 zip_file = zipfile.ZipFile(zip_file_name, "w")
289 # Default perms should be 0o644 when passing the filename.
290 common.ZipWriteStr(zip_file, "foo", random_string)
291 # Honor the specified perms.
292 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
293 # The perms in zinfo should be untouched.
294 zinfo = zipfile.ZipInfo(filename="baz")
295 zinfo.external_attr = 0o740 << 16
296 common.ZipWriteStr(zip_file, zinfo, random_string)
297 # Explicitly specified perms has the priority.
298 zinfo = zipfile.ZipInfo(filename="qux")
299 zinfo.external_attr = 0o700 << 16
300 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
301 common.ZipClose(zip_file)
302
Tao Bao31b08072017-11-08 15:50:59 -0800303 self._verify(zip_file, zip_file_name, "foo",
304 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700305 expected_mode=0o644)
Tao Bao31b08072017-11-08 15:50:59 -0800306 self._verify(zip_file, zip_file_name, "bar",
307 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700308 expected_mode=0o755)
Tao Bao31b08072017-11-08 15:50:59 -0800309 self._verify(zip_file, zip_file_name, "baz",
310 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700311 expected_mode=0o740)
Tao Bao31b08072017-11-08 15:50:59 -0800312 self._verify(zip_file, zip_file_name, "qux",
313 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700314 expected_mode=0o400)
315 finally:
316 os.remove(zip_file_name)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700317
Tao Bao89d7ab22017-12-14 17:05:33 -0800318 def test_ZipDelete(self):
319 zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
320 output_zip = zipfile.ZipFile(zip_file.name, 'w',
321 compression=zipfile.ZIP_DEFLATED)
322 with tempfile.NamedTemporaryFile() as entry_file:
323 entry_file.write(os.urandom(1024))
324 common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
325 common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
326 common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
327 common.ZipClose(output_zip)
328 zip_file.close()
329
330 try:
331 common.ZipDelete(zip_file.name, 'Test2')
332 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
333 entries = check_zip.namelist()
334 self.assertTrue('Test1' in entries)
335 self.assertFalse('Test2' in entries)
336 self.assertTrue('Test3' in entries)
337
Tao Bao986ee862018-10-04 15:46:16 -0700338 self.assertRaises(
339 common.ExternalError, common.ZipDelete, zip_file.name, 'Test2')
Tao Bao89d7ab22017-12-14 17:05:33 -0800340 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
341 entries = check_zip.namelist()
342 self.assertTrue('Test1' in entries)
343 self.assertFalse('Test2' in entries)
344 self.assertTrue('Test3' in entries)
345
346 common.ZipDelete(zip_file.name, ['Test3'])
347 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
348 entries = check_zip.namelist()
349 self.assertTrue('Test1' in entries)
350 self.assertFalse('Test2' in entries)
351 self.assertFalse('Test3' in entries)
352
353 common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
354 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
355 entries = check_zip.namelist()
356 self.assertFalse('Test1' in entries)
357 self.assertFalse('Test2' in entries)
358 self.assertFalse('Test3' in entries)
359 finally:
360 os.remove(zip_file.name)
361
Tao Bao0ff15de2019-03-20 11:26:06 -0700362 @staticmethod
363 def _test_UnzipTemp_createZipFile():
364 zip_file = common.MakeTempFile(suffix='.zip')
365 output_zip = zipfile.ZipFile(
366 zip_file, 'w', compression=zipfile.ZIP_DEFLATED)
367 contents = os.urandom(1024)
368 with tempfile.NamedTemporaryFile() as entry_file:
369 entry_file.write(contents)
370 common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
371 common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
372 common.ZipWrite(output_zip, entry_file.name, arcname='Foo3')
373 common.ZipWrite(output_zip, entry_file.name, arcname='Bar4')
374 common.ZipWrite(output_zip, entry_file.name, arcname='Dir5/Baz5')
375 common.ZipClose(output_zip)
376 common.ZipClose(output_zip)
377 return zip_file
378
379 def test_UnzipTemp(self):
380 zip_file = self._test_UnzipTemp_createZipFile()
381 unzipped_dir = common.UnzipTemp(zip_file)
382 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
383 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
384 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
385 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
386 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
387
388 def test_UnzipTemp_withPatterns(self):
389 zip_file = self._test_UnzipTemp_createZipFile()
390
391 unzipped_dir = common.UnzipTemp(zip_file, ['Test1'])
392 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
393 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
394 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
395 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
396 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
397
398 unzipped_dir = common.UnzipTemp(zip_file, ['Test1', 'Foo3'])
399 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
400 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
401 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
402 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
403 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
404
405 unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Foo3*'])
406 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
407 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
408 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
409 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
410 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
411
412 unzipped_dir = common.UnzipTemp(zip_file, ['*Test1', '*Baz*'])
413 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
414 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
415 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
416 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
417 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
418
419 def test_UnzipTemp_withEmptyPatterns(self):
420 zip_file = self._test_UnzipTemp_createZipFile()
421 unzipped_dir = common.UnzipTemp(zip_file, [])
422 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
423 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
424 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
425 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
426 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
427
428 def test_UnzipTemp_withPartiallyMatchingPatterns(self):
429 zip_file = self._test_UnzipTemp_createZipFile()
430 unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Nonexistent*'])
431 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
432 self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
433 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
434 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
435 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
436
437 def test_UnzipTemp_withNoMatchingPatterns(self):
438 zip_file = self._test_UnzipTemp_createZipFile()
439 unzipped_dir = common.UnzipTemp(zip_file, ['Foo4', 'Nonexistent*'])
440 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
441 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
442 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
443 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
444 self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
445
Tao Bao89d7ab22017-12-14 17:05:33 -0800446
Tao Bao65b94e92018-10-11 21:57:26 -0700447class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase):
Tao Bao818ddf52018-01-05 11:17:34 -0800448 """Tests the APK utils related functions."""
449
450 APKCERTS_TXT1 = (
451 'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
452 ' private_key="certs/devkey.pk8"\n'
453 'name="Settings.apk"'
454 ' certificate="build/target/product/security/platform.x509.pem"'
455 ' private_key="build/target/product/security/platform.pk8"\n'
456 'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
457 )
458
459 APKCERTS_CERTMAP1 = {
460 'RecoveryLocalizer.apk' : 'certs/devkey',
461 'Settings.apk' : 'build/target/product/security/platform',
462 'TV.apk' : 'PRESIGNED',
463 }
464
465 APKCERTS_TXT2 = (
466 'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
467 ' private_key="certs/compressed1.pk8" compressed="gz"\n'
468 'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
469 ' private_key="certs/compressed2.pk8" compressed="gz"\n'
470 'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
471 ' private_key="certs/compressed2.pk8" compressed="gz"\n'
472 'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
473 ' private_key="certs/compressed3.pk8" compressed="gz"\n'
474 )
475
476 APKCERTS_CERTMAP2 = {
477 'Compressed1.apk' : 'certs/compressed1',
478 'Compressed2a.apk' : 'certs/compressed2',
479 'Compressed2b.apk' : 'certs/compressed2',
480 'Compressed3.apk' : 'certs/compressed3',
481 }
482
483 APKCERTS_TXT3 = (
484 'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
485 ' private_key="certs/compressed4.pk8" compressed="xz"\n'
486 )
487
488 APKCERTS_CERTMAP3 = {
489 'Compressed4.apk' : 'certs/compressed4',
490 }
491
Tao Bao17e4e612018-02-16 17:12:54 -0800492 def setUp(self):
493 self.testdata_dir = test_utils.get_testdata_dir()
494
Tao Bao818ddf52018-01-05 11:17:34 -0800495 @staticmethod
496 def _write_apkcerts_txt(apkcerts_txt, additional=None):
497 if additional is None:
498 additional = []
499 target_files = common.MakeTempFile(suffix='.zip')
500 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
501 target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
502 for entry in additional:
503 target_files_zip.writestr(entry, '')
504 return target_files
505
506 def test_ReadApkCerts_NoncompressedApks(self):
507 target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
508 with zipfile.ZipFile(target_files, 'r') as input_zip:
509 certmap, ext = common.ReadApkCerts(input_zip)
510
511 self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
512 self.assertIsNone(ext)
513
514 def test_ReadApkCerts_CompressedApks(self):
515 # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
516 # not stored in '.gz' format, so it shouldn't be considered as installed.
517 target_files = self._write_apkcerts_txt(
518 self.APKCERTS_TXT2,
519 ['Compressed1.apk.gz', 'Compressed3.apk'])
520
521 with zipfile.ZipFile(target_files, 'r') as input_zip:
522 certmap, ext = common.ReadApkCerts(input_zip)
523
524 self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
525 self.assertEqual('.gz', ext)
526
527 # Alternative case with '.xz'.
528 target_files = self._write_apkcerts_txt(
529 self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
530
531 with zipfile.ZipFile(target_files, 'r') as input_zip:
532 certmap, ext = common.ReadApkCerts(input_zip)
533
534 self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
535 self.assertEqual('.xz', ext)
536
537 def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
538 target_files = self._write_apkcerts_txt(
539 self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
540 ['Compressed1.apk.gz', 'Compressed3.apk'])
541
542 with zipfile.ZipFile(target_files, 'r') as input_zip:
543 certmap, ext = common.ReadApkCerts(input_zip)
544
545 certmap_merged = self.APKCERTS_CERTMAP1.copy()
546 certmap_merged.update(self.APKCERTS_CERTMAP2)
547 self.assertDictEqual(certmap_merged, certmap)
548 self.assertEqual('.gz', ext)
549
550 def test_ReadApkCerts_MultipleCompressionMethods(self):
551 target_files = self._write_apkcerts_txt(
552 self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
553 ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
554
555 with zipfile.ZipFile(target_files, 'r') as input_zip:
556 self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
557
558 def test_ReadApkCerts_MismatchingKeys(self):
559 malformed_apkcerts_txt = (
560 'name="App1.apk" certificate="certs/cert1.x509.pem"'
561 ' private_key="certs/cert2.pk8"\n'
562 )
563 target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
564
565 with zipfile.ZipFile(target_files, 'r') as input_zip:
566 self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
567
Tao Bao04e1f012018-02-04 12:13:35 -0800568 def test_ExtractPublicKey(self):
Tao Bao17e4e612018-02-16 17:12:54 -0800569 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
570 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
Tao Bao04e1f012018-02-04 12:13:35 -0800571 with open(pubkey, 'rb') as pubkey_fp:
572 self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
573
574 def test_ExtractPublicKey_invalidInput(self):
Tao Bao17e4e612018-02-16 17:12:54 -0800575 wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
Tao Bao04e1f012018-02-04 12:13:35 -0800576 self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
577
Tao Bao2cc0ca12019-03-15 10:44:43 -0700578 def test_ExtractAvbPublicKey(self):
579 privkey = os.path.join(self.testdata_dir, 'testkey.key')
580 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
581 with open(common.ExtractAvbPublicKey(privkey)) as privkey_fp, \
582 open(common.ExtractAvbPublicKey(pubkey)) as pubkey_fp:
583 self.assertEqual(privkey_fp.read(), pubkey_fp.read())
584
Tao Bao17e4e612018-02-16 17:12:54 -0800585 def test_ParseCertificate(self):
586 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
587
588 cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
589 proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
590 expected, _ = proc.communicate()
591 self.assertEqual(0, proc.returncode)
592
593 with open(cert) as cert_fp:
594 actual = common.ParseCertificate(cert_fp.read())
595 self.assertEqual(expected, actual)
596
Tao Baof47bf0f2018-03-21 23:28:51 -0700597 def test_GetMinSdkVersion(self):
598 test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
599 self.assertEqual('24', common.GetMinSdkVersion(test_app))
600
601 def test_GetMinSdkVersion_invalidInput(self):
602 self.assertRaises(
603 common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk')
604
605 def test_GetMinSdkVersionInt(self):
606 test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
607 self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {}))
608
609 def test_GetMinSdkVersionInt_invalidInput(self):
610 self.assertRaises(
611 common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk',
612 {})
613
Tao Bao818ddf52018-01-05 11:17:34 -0800614
Tao Bao65b94e92018-10-11 21:57:26 -0700615class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
Tao Baofc7e0e02018-02-13 13:54:02 -0800616
Tao Bao02a08592018-07-22 12:40:45 -0700617 def setUp(self):
618 self.testdata_dir = test_utils.get_testdata_dir()
619
Tao Baofc7e0e02018-02-13 13:54:02 -0800620 def test_GetSparseImage_emptyBlockMapFile(self):
621 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
622 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
623 target_files_zip.write(
624 test_utils.construct_sparse_image([
625 (0xCAC1, 6),
626 (0xCAC3, 3),
627 (0xCAC1, 4)]),
628 arcname='IMAGES/system.img')
629 target_files_zip.writestr('IMAGES/system.map', '')
630 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
631 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
632
Tao Baodba59ee2018-01-09 13:21:02 -0800633 tempdir = common.UnzipTemp(target_files)
634 with zipfile.ZipFile(target_files, 'r') as input_zip:
635 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800636
637 self.assertDictEqual(
638 {
639 '__COPY': RangeSet("0"),
640 '__NONZERO-0': RangeSet("1-5 9-12"),
641 },
642 sparse_image.file_map)
643
Tao Baob2de7d92019-04-10 10:01:47 -0700644 def test_GetSparseImage_missingImageFile(self):
Tao Baofc7e0e02018-02-13 13:54:02 -0800645 self.assertRaises(
Tao Baob2de7d92019-04-10 10:01:47 -0700646 AssertionError, common.GetSparseImage, 'system2', self.testdata_dir,
647 None, False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800648 self.assertRaises(
Tao Baob2de7d92019-04-10 10:01:47 -0700649 AssertionError, common.GetSparseImage, 'unknown', self.testdata_dir,
650 None, False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800651
652 def test_GetSparseImage_missingBlockMapFile(self):
653 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
654 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
655 target_files_zip.write(
656 test_utils.construct_sparse_image([
657 (0xCAC1, 6),
658 (0xCAC3, 3),
659 (0xCAC1, 4)]),
660 arcname='IMAGES/system.img')
661 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
662 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
663
Tao Baodba59ee2018-01-09 13:21:02 -0800664 tempdir = common.UnzipTemp(target_files)
665 with zipfile.ZipFile(target_files, 'r') as input_zip:
666 self.assertRaises(
667 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
668 False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800669
670 def test_GetSparseImage_sharedBlocks_notAllowed(self):
671 """Tests the case of having overlapping blocks but disallowed."""
672 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
673 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
674 target_files_zip.write(
675 test_utils.construct_sparse_image([(0xCAC2, 16)]),
676 arcname='IMAGES/system.img')
677 # Block 10 is shared between two files.
678 target_files_zip.writestr(
679 'IMAGES/system.map',
680 '\n'.join([
681 '/system/file1 1-5 9-10',
682 '/system/file2 10-12']))
683 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
684 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
685
Tao Baodba59ee2018-01-09 13:21:02 -0800686 tempdir = common.UnzipTemp(target_files)
687 with zipfile.ZipFile(target_files, 'r') as input_zip:
688 self.assertRaises(
689 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
690 False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800691
692 def test_GetSparseImage_sharedBlocks_allowed(self):
693 """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
694 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
695 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
696 # Construct an image with a care_map of "0-5 9-12".
697 target_files_zip.write(
698 test_utils.construct_sparse_image([(0xCAC2, 16)]),
699 arcname='IMAGES/system.img')
700 # Block 10 is shared between two files.
701 target_files_zip.writestr(
702 'IMAGES/system.map',
703 '\n'.join([
704 '/system/file1 1-5 9-10',
705 '/system/file2 10-12']))
706 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
707 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
708
Tao Baodba59ee2018-01-09 13:21:02 -0800709 tempdir = common.UnzipTemp(target_files)
710 with zipfile.ZipFile(target_files, 'r') as input_zip:
711 sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
Tao Baofc7e0e02018-02-13 13:54:02 -0800712
713 self.assertDictEqual(
714 {
715 '__COPY': RangeSet("0"),
716 '__NONZERO-0': RangeSet("6-8 13-15"),
717 '/system/file1': RangeSet("1-5 9-10"),
718 '/system/file2': RangeSet("11-12"),
719 },
720 sparse_image.file_map)
721
722 # '/system/file2' should be marked with 'uses_shared_blocks', but not with
723 # 'incomplete'.
724 self.assertTrue(
725 sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
726 self.assertNotIn(
727 'incomplete', sparse_image.file_map['/system/file2'].extra)
728
729 # All other entries should look normal without any tags.
730 self.assertFalse(sparse_image.file_map['__COPY'].extra)
731 self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
732 self.assertFalse(sparse_image.file_map['/system/file1'].extra)
733
734 def test_GetSparseImage_incompleteRanges(self):
735 """Tests the case of ext4 images with holes."""
736 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
737 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
738 target_files_zip.write(
739 test_utils.construct_sparse_image([(0xCAC2, 16)]),
740 arcname='IMAGES/system.img')
741 target_files_zip.writestr(
742 'IMAGES/system.map',
743 '\n'.join([
744 '/system/file1 1-5 9-10',
745 '/system/file2 11-12']))
746 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
747 # '/system/file2' has less blocks listed (2) than actual (3).
748 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
749
Tao Baodba59ee2018-01-09 13:21:02 -0800750 tempdir = common.UnzipTemp(target_files)
751 with zipfile.ZipFile(target_files, 'r') as input_zip:
752 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800753
754 self.assertFalse(sparse_image.file_map['/system/file1'].extra)
755 self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
756
Tao Baod3554e62018-07-10 15:31:22 -0700757 def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self):
758 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
759 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
760 target_files_zip.write(
761 test_utils.construct_sparse_image([(0xCAC2, 16)]),
762 arcname='IMAGES/system.img')
763 target_files_zip.writestr(
764 'IMAGES/system.map',
765 '\n'.join([
766 '//system/file1 1-5 9-10',
767 '//system/file2 11-12',
768 '/system/app/file3 13-15']))
769 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
770 # '/system/file2' has less blocks listed (2) than actual (3).
771 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
772 # '/system/app/file3' has less blocks listed (3) than actual (4).
773 target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4))
774
775 tempdir = common.UnzipTemp(target_files)
776 with zipfile.ZipFile(target_files, 'r') as input_zip:
777 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
778
779 self.assertFalse(sparse_image.file_map['//system/file1'].extra)
780 self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete'])
781 self.assertTrue(
782 sparse_image.file_map['/system/app/file3'].extra['incomplete'])
783
784 def test_GetSparseImage_systemRootImage_nonSystemFiles(self):
785 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
786 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
787 target_files_zip.write(
788 test_utils.construct_sparse_image([(0xCAC2, 16)]),
789 arcname='IMAGES/system.img')
790 target_files_zip.writestr(
791 'IMAGES/system.map',
792 '\n'.join([
793 '//system/file1 1-5 9-10',
794 '//init.rc 13-15']))
795 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
796 # '/init.rc' has less blocks listed (3) than actual (4).
797 target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4))
798
799 tempdir = common.UnzipTemp(target_files)
800 with zipfile.ZipFile(target_files, 'r') as input_zip:
801 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
802
803 self.assertFalse(sparse_image.file_map['//system/file1'].extra)
804 self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete'])
805
806 def test_GetSparseImage_fileNotFound(self):
807 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
808 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
809 target_files_zip.write(
810 test_utils.construct_sparse_image([(0xCAC2, 16)]),
811 arcname='IMAGES/system.img')
812 target_files_zip.writestr(
813 'IMAGES/system.map',
814 '\n'.join([
815 '//system/file1 1-5 9-10',
816 '//system/file2 11-12']))
817 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
818
819 tempdir = common.UnzipTemp(target_files)
820 with zipfile.ZipFile(target_files, 'r') as input_zip:
821 self.assertRaises(
822 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
823 False)
824
Tao Bao02a08592018-07-22 12:40:45 -0700825 def test_GetAvbChainedPartitionArg(self):
826 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
827 info_dict = {
828 'avb_avbtool': 'avbtool',
829 'avb_system_key_path': pubkey,
830 'avb_system_rollback_index_location': 2,
831 }
832 args = common.GetAvbChainedPartitionArg('system', info_dict).split(':')
833 self.assertEqual(3, len(args))
834 self.assertEqual('system', args[0])
835 self.assertEqual('2', args[1])
836 self.assertTrue(os.path.exists(args[2]))
837
838 def test_GetAvbChainedPartitionArg_withPrivateKey(self):
839 key = os.path.join(self.testdata_dir, 'testkey.key')
840 info_dict = {
841 'avb_avbtool': 'avbtool',
842 'avb_product_key_path': key,
843 'avb_product_rollback_index_location': 2,
844 }
845 args = common.GetAvbChainedPartitionArg('product', info_dict).split(':')
846 self.assertEqual(3, len(args))
847 self.assertEqual('product', args[0])
848 self.assertEqual('2', args[1])
849 self.assertTrue(os.path.exists(args[2]))
850
851 def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
852 info_dict = {
853 'avb_avbtool': 'avbtool',
854 'avb_system_key_path': 'does-not-exist',
855 'avb_system_rollback_index_location': 2,
856 }
857 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
858 args = common.GetAvbChainedPartitionArg(
859 'system', info_dict, pubkey).split(':')
860 self.assertEqual(3, len(args))
861 self.assertEqual('system', args[0])
862 self.assertEqual('2', args[1])
863 self.assertTrue(os.path.exists(args[2]))
864
865 def test_GetAvbChainedPartitionArg_invalidKey(self):
866 pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem')
867 info_dict = {
868 'avb_avbtool': 'avbtool',
869 'avb_system_key_path': pubkey,
870 'avb_system_rollback_index_location': 2,
871 }
872 self.assertRaises(
Tao Bao986ee862018-10-04 15:46:16 -0700873 common.ExternalError, common.GetAvbChainedPartitionArg, 'system',
874 info_dict)
Tao Bao02a08592018-07-22 12:40:45 -0700875
Tao Baoa57ab9f2018-08-24 12:08:38 -0700876 INFO_DICT_DEFAULT = {
877 'recovery_api_version': 3,
878 'fstab_version': 2,
879 'system_root_image': 'true',
880 'no_recovery' : 'true',
881 'recovery_as_boot': 'true',
882 }
883
884 @staticmethod
885 def _test_LoadInfoDict_createTargetFiles(info_dict, fstab_path):
886 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
887 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
888 info_values = ''.join(
889 ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.iteritems())])
890 common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values)
891
892 FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults"
893 if info_dict.get('system_root_image') == 'true':
894 fstab_values = FSTAB_TEMPLATE.format('/')
895 else:
896 fstab_values = FSTAB_TEMPLATE.format('/system')
897 common.ZipWriteStr(target_files_zip, fstab_path, fstab_values)
Tao Bao410ad8b2018-08-24 12:08:38 -0700898
899 common.ZipWriteStr(
900 target_files_zip, 'META/file_contexts', 'file-contexts')
Tao Baoa57ab9f2018-08-24 12:08:38 -0700901 return target_files
902
903 def test_LoadInfoDict(self):
904 target_files = self._test_LoadInfoDict_createTargetFiles(
905 self.INFO_DICT_DEFAULT,
906 'BOOT/RAMDISK/system/etc/recovery.fstab')
907 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
908 loaded_dict = common.LoadInfoDict(target_files_zip)
909 self.assertEqual(3, loaded_dict['recovery_api_version'])
910 self.assertEqual(2, loaded_dict['fstab_version'])
911 self.assertIn('/', loaded_dict['fstab'])
912 self.assertIn('/system', loaded_dict['fstab'])
913
914 def test_LoadInfoDict_legacyRecoveryFstabPath(self):
915 target_files = self._test_LoadInfoDict_createTargetFiles(
916 self.INFO_DICT_DEFAULT,
917 'BOOT/RAMDISK/etc/recovery.fstab')
918 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
919 loaded_dict = common.LoadInfoDict(target_files_zip)
920 self.assertEqual(3, loaded_dict['recovery_api_version'])
921 self.assertEqual(2, loaded_dict['fstab_version'])
922 self.assertIn('/', loaded_dict['fstab'])
923 self.assertIn('/system', loaded_dict['fstab'])
924
925 def test_LoadInfoDict_dirInput(self):
926 target_files = self._test_LoadInfoDict_createTargetFiles(
927 self.INFO_DICT_DEFAULT,
928 'BOOT/RAMDISK/system/etc/recovery.fstab')
929 unzipped = common.UnzipTemp(target_files)
930 loaded_dict = common.LoadInfoDict(unzipped)
931 self.assertEqual(3, loaded_dict['recovery_api_version'])
932 self.assertEqual(2, loaded_dict['fstab_version'])
933 self.assertIn('/', loaded_dict['fstab'])
934 self.assertIn('/system', loaded_dict['fstab'])
935
936 def test_LoadInfoDict_dirInput_legacyRecoveryFstabPath(self):
937 target_files = self._test_LoadInfoDict_createTargetFiles(
938 self.INFO_DICT_DEFAULT,
939 'BOOT/RAMDISK/system/etc/recovery.fstab')
940 unzipped = common.UnzipTemp(target_files)
941 loaded_dict = common.LoadInfoDict(unzipped)
942 self.assertEqual(3, loaded_dict['recovery_api_version'])
943 self.assertEqual(2, loaded_dict['fstab_version'])
944 self.assertIn('/', loaded_dict['fstab'])
945 self.assertIn('/system', loaded_dict['fstab'])
946
947 def test_LoadInfoDict_systemRootImageFalse(self):
948 # Devices not using system-as-root nor recovery-as-boot. Non-A/B devices
949 # launched prior to P will likely have this config.
950 info_dict = copy.copy(self.INFO_DICT_DEFAULT)
951 del info_dict['no_recovery']
952 del info_dict['system_root_image']
953 del info_dict['recovery_as_boot']
954 target_files = self._test_LoadInfoDict_createTargetFiles(
955 info_dict,
956 'RECOVERY/RAMDISK/system/etc/recovery.fstab')
957 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
958 loaded_dict = common.LoadInfoDict(target_files_zip)
959 self.assertEqual(3, loaded_dict['recovery_api_version'])
960 self.assertEqual(2, loaded_dict['fstab_version'])
961 self.assertNotIn('/', loaded_dict['fstab'])
962 self.assertIn('/system', loaded_dict['fstab'])
963
964 def test_LoadInfoDict_recoveryAsBootFalse(self):
965 # Devices using system-as-root, but with standalone recovery image. Non-A/B
966 # devices launched since P will likely have this config.
967 info_dict = copy.copy(self.INFO_DICT_DEFAULT)
968 del info_dict['no_recovery']
969 del info_dict['recovery_as_boot']
970 target_files = self._test_LoadInfoDict_createTargetFiles(
971 info_dict,
972 'RECOVERY/RAMDISK/system/etc/recovery.fstab')
973 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
974 loaded_dict = common.LoadInfoDict(target_files_zip)
975 self.assertEqual(3, loaded_dict['recovery_api_version'])
976 self.assertEqual(2, loaded_dict['fstab_version'])
977 self.assertIn('/', loaded_dict['fstab'])
978 self.assertIn('/system', loaded_dict['fstab'])
979
980 def test_LoadInfoDict_noRecoveryTrue(self):
981 # Device doesn't have a recovery partition at all.
982 info_dict = copy.copy(self.INFO_DICT_DEFAULT)
983 del info_dict['recovery_as_boot']
984 target_files = self._test_LoadInfoDict_createTargetFiles(
985 info_dict,
986 'RECOVERY/RAMDISK/system/etc/recovery.fstab')
987 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
988 loaded_dict = common.LoadInfoDict(target_files_zip)
989 self.assertEqual(3, loaded_dict['recovery_api_version'])
990 self.assertEqual(2, loaded_dict['fstab_version'])
991 self.assertIsNone(loaded_dict['fstab'])
992
Tao Bao410ad8b2018-08-24 12:08:38 -0700993 def test_LoadInfoDict_missingMetaMiscInfoTxt(self):
994 target_files = self._test_LoadInfoDict_createTargetFiles(
995 self.INFO_DICT_DEFAULT,
996 'BOOT/RAMDISK/system/etc/recovery.fstab')
997 common.ZipDelete(target_files, 'META/misc_info.txt')
998 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
999 self.assertRaises(ValueError, common.LoadInfoDict, target_files_zip)
1000
1001 def test_LoadInfoDict_repacking(self):
1002 target_files = self._test_LoadInfoDict_createTargetFiles(
1003 self.INFO_DICT_DEFAULT,
1004 'BOOT/RAMDISK/system/etc/recovery.fstab')
1005 unzipped = common.UnzipTemp(target_files)
1006 loaded_dict = common.LoadInfoDict(unzipped, True)
1007 self.assertEqual(3, loaded_dict['recovery_api_version'])
1008 self.assertEqual(2, loaded_dict['fstab_version'])
1009 self.assertIn('/', loaded_dict['fstab'])
1010 self.assertIn('/system', loaded_dict['fstab'])
1011 self.assertEqual(
1012 os.path.join(unzipped, 'ROOT'), loaded_dict['root_dir'])
1013 self.assertEqual(
1014 os.path.join(unzipped, 'META', 'root_filesystem_config.txt'),
1015 loaded_dict['root_fs_config'])
1016
1017 def test_LoadInfoDict_repackingWithZipFileInput(self):
1018 target_files = self._test_LoadInfoDict_createTargetFiles(
1019 self.INFO_DICT_DEFAULT,
1020 'BOOT/RAMDISK/system/etc/recovery.fstab')
1021 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1022 self.assertRaises(
1023 AssertionError, common.LoadInfoDict, target_files_zip, True)
1024
Tao Baofc7e0e02018-02-13 13:54:02 -08001025
Tao Bao65b94e92018-10-11 21:57:26 -07001026class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
Tao Bao1c830bf2017-12-25 10:43:47 -08001027 """Checks the format of install-recovery.sh.
Tianjie Xu9c384d22017-06-20 17:00:55 -07001028
Tao Bao1c830bf2017-12-25 10:43:47 -08001029 Its format should match between common.py and validate_target_files.py.
1030 """
Tianjie Xu9c384d22017-06-20 17:00:55 -07001031
1032 def setUp(self):
Tao Bao1c830bf2017-12-25 10:43:47 -08001033 self._tempdir = common.MakeTempDir()
Tianjie Xu9c384d22017-06-20 17:00:55 -07001034 # Create a dummy dict that contains the fstab info for boot&recovery.
1035 self._info = {"fstab" : {}}
Tao Bao1c830bf2017-12-25 10:43:47 -08001036 dummy_fstab = [
1037 "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
1038 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
Tao Bao31b08072017-11-08 15:50:59 -08001039 self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
Tianjie Xudf055582017-11-07 12:22:58 -08001040 # Construct the gzipped recovery.img and boot.img
1041 self.recovery_data = bytearray([
1042 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
1043 0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
1044 0x08, 0x00, 0x00, 0x00
1045 ])
1046 # echo -n "boot" | gzip -f | hd
1047 self.boot_data = bytearray([
1048 0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
1049 0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
1050 ])
Tianjie Xu9c384d22017-06-20 17:00:55 -07001051
1052 def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
1053 loc = os.path.join(self._tempdir, prefix, name)
1054 if not os.path.exists(os.path.dirname(loc)):
1055 os.makedirs(os.path.dirname(loc))
1056 with open(loc, "w+") as f:
1057 f.write(data)
1058
1059 def test_full_recovery(self):
Tao Bao31b08072017-11-08 15:50:59 -08001060 recovery_image = common.File("recovery.img", self.recovery_data)
1061 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -07001062 self._info["full_recovery_image"] = "true"
1063
1064 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1065 recovery_image, boot_image, self._info)
1066 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1067 self._info)
1068
1069 def test_recovery_from_boot(self):
Tao Bao31b08072017-11-08 15:50:59 -08001070 recovery_image = common.File("recovery.img", self.recovery_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -07001071 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
Tao Bao31b08072017-11-08 15:50:59 -08001072 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -07001073 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
1074
1075 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1076 recovery_image, boot_image, self._info)
1077 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1078 self._info)
1079 # Validate 'recovery-from-boot' with bonus argument.
1080 self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
1081 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1082 recovery_image, boot_image, self._info)
1083 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1084 self._info)
Yifan Hong45433e42019-01-18 13:55:25 -08001085
1086
1087class MockScriptWriter(object):
1088 """A class that mocks edify_generator.EdifyGenerator.
1089 """
1090 def __init__(self, enable_comments=False):
1091 self.lines = []
1092 self.enable_comments = enable_comments
1093 def Comment(self, comment):
1094 if self.enable_comments:
1095 self.lines.append("# {}".format(comment))
1096 def AppendExtra(self, extra):
1097 self.lines.append(extra)
1098 def __str__(self):
1099 return "\n".join(self.lines)
1100
1101
1102class MockBlockDifference(object):
1103 def __init__(self, partition, tgt, src=None):
1104 self.partition = partition
1105 self.tgt = tgt
1106 self.src = src
1107 def WriteScript(self, script, _, progress=None,
1108 write_verify_script=False):
1109 if progress:
1110 script.AppendExtra("progress({})".format(progress))
1111 script.AppendExtra("patch({});".format(self.partition))
1112 if write_verify_script:
1113 self.WritePostInstallVerifyScript(script)
1114 def WritePostInstallVerifyScript(self, script):
1115 script.AppendExtra("verify({});".format(self.partition))
1116
1117
1118class FakeSparseImage(object):
1119 def __init__(self, size):
1120 self.blocksize = 4096
1121 self.total_blocks = size // 4096
1122 assert size % 4096 == 0, "{} is not a multiple of 4096".format(size)
1123
1124
1125class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase):
1126 @staticmethod
1127 def get_op_list(output_path):
1128 with zipfile.ZipFile(output_path, 'r') as output_zip:
1129 with output_zip.open("dynamic_partitions_op_list") as op_list:
1130 return [line.strip() for line in op_list.readlines()
1131 if not line.startswith("#")]
1132
1133 def setUp(self):
1134 self.script = MockScriptWriter()
1135 self.output_path = common.MakeTempFile(suffix='.zip')
1136
1137 def test_full(self):
1138 target_info = common.LoadDictionaryFromLines("""
1139dynamic_partition_list=system vendor
1140super_partition_groups=group_foo
1141super_group_foo_group_size={group_size}
1142super_group_foo_partition_list=system vendor
1143""".format(group_size=4 * GiB).split("\n"))
1144 block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)),
1145 MockBlockDifference("vendor", FakeSparseImage(1 * GiB))]
1146
1147 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs)
1148 with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1149 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1150
1151 self.assertEqual(str(self.script).strip(), """
1152assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list")));
1153patch(vendor);
1154verify(vendor);
1155unmap_partition("vendor");
1156patch(system);
1157verify(system);
1158unmap_partition("system");
1159""".strip())
1160
1161 lines = self.get_op_list(self.output_path)
1162
1163 remove_all_groups = lines.index("remove_all_groups")
1164 add_group = lines.index("add_group group_foo 4294967296")
1165 add_vendor = lines.index("add vendor group_foo")
1166 add_system = lines.index("add system group_foo")
1167 resize_vendor = lines.index("resize vendor 1073741824")
1168 resize_system = lines.index("resize system 3221225472")
1169
1170 self.assertLess(remove_all_groups, add_group,
1171 "Should add groups after removing all groups")
1172 self.assertLess(add_group, min(add_vendor, add_system),
1173 "Should add partitions after adding group")
1174 self.assertLess(add_system, resize_system,
1175 "Should resize system after adding it")
1176 self.assertLess(add_vendor, resize_vendor,
1177 "Should resize vendor after adding it")
1178
1179 def test_inc_groups(self):
1180 source_info = common.LoadDictionaryFromLines("""
1181super_partition_groups=group_foo group_bar group_baz
1182super_group_foo_group_size={group_foo_size}
1183super_group_bar_group_size={group_bar_size}
1184""".format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n"))
1185 target_info = common.LoadDictionaryFromLines("""
1186super_partition_groups=group_foo group_baz group_qux
1187super_group_foo_group_size={group_foo_size}
1188super_group_baz_group_size={group_baz_size}
1189super_group_qux_group_size={group_qux_size}
1190""".format(group_foo_size=3 * GiB, group_baz_size=4 * GiB,
1191 group_qux_size=1 * GiB).split("\n"))
1192
1193 dp_diff = common.DynamicPartitionsDifference(target_info,
1194 block_diffs=[],
1195 source_info_dict=source_info)
1196 with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1197 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1198
1199 lines = self.get_op_list(self.output_path)
1200
1201 removed = lines.index("remove_group group_bar")
1202 shrunk = lines.index("resize_group group_foo 3221225472")
1203 grown = lines.index("resize_group group_baz 4294967296")
1204 added = lines.index("add_group group_qux 1073741824")
1205
1206 self.assertLess(max(removed, shrunk) < min(grown, added),
1207 "ops that remove / shrink partitions must precede ops that "
1208 "grow / add partitions")
1209
Yifan Hongbb2658d2019-01-25 12:30:58 -08001210 def test_incremental(self):
Yifan Hong45433e42019-01-18 13:55:25 -08001211 source_info = common.LoadDictionaryFromLines("""
1212dynamic_partition_list=system vendor product product_services
1213super_partition_groups=group_foo
1214super_group_foo_group_size={group_foo_size}
1215super_group_foo_partition_list=system vendor product product_services
1216""".format(group_foo_size=4 * GiB).split("\n"))
1217 target_info = common.LoadDictionaryFromLines("""
1218dynamic_partition_list=system vendor product odm
1219super_partition_groups=group_foo group_bar
1220super_group_foo_group_size={group_foo_size}
1221super_group_foo_partition_list=system vendor odm
1222super_group_bar_group_size={group_bar_size}
1223super_group_bar_partition_list=product
1224""".format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n"))
1225
1226 block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB),
1227 src=FakeSparseImage(1024 * MiB)),
1228 MockBlockDifference("vendor", FakeSparseImage(512 * MiB),
1229 src=FakeSparseImage(1024 * MiB)),
1230 MockBlockDifference("product", FakeSparseImage(1024 * MiB),
1231 src=FakeSparseImage(1024 * MiB)),
1232 MockBlockDifference("product_services", None,
1233 src=FakeSparseImage(1024 * MiB)),
1234 MockBlockDifference("odm", FakeSparseImage(1024 * MiB),
1235 src=None)]
1236
1237 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1238 source_info_dict=source_info)
1239 with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1240 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1241
1242 metadata_idx = self.script.lines.index(
1243 'assert(update_dynamic_partitions(package_extract_file('
1244 '"dynamic_partitions_op_list")));')
1245 self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx)
1246 self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);'))
1247 for p in ("product", "system", "odm"):
1248 patch_idx = self.script.lines.index("patch({});".format(p))
1249 verify_idx = self.script.lines.index("verify({});".format(p))
1250 self.assertLess(metadata_idx, patch_idx,
1251 "Should patch {} after updating metadata".format(p))
1252 self.assertLess(patch_idx, verify_idx,
1253 "Should verify {} after patching".format(p))
1254
1255 self.assertNotIn("patch(product_services);", self.script.lines)
1256
1257 lines = self.get_op_list(self.output_path)
1258
1259 remove = lines.index("remove product_services")
1260 move_product_out = lines.index("move product default")
1261 shrink = lines.index("resize vendor 536870912")
1262 shrink_group = lines.index("resize_group group_foo 3221225472")
1263 add_group_bar = lines.index("add_group group_bar 1073741824")
1264 add_odm = lines.index("add odm group_foo")
1265 grow_existing = lines.index("resize system 1610612736")
1266 grow_added = lines.index("resize odm 1073741824")
1267 move_product_in = lines.index("move product group_bar")
1268
1269 max_idx_move_partition_out_foo = max(remove, move_product_out, shrink)
1270 min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added)
1271
1272 self.assertLess(max_idx_move_partition_out_foo, shrink_group,
1273 "Must shrink group after partitions inside group are shrunk"
1274 " / removed")
1275
1276 self.assertLess(add_group_bar, move_product_in,
1277 "Must add partitions to group after group is added")
1278
1279 self.assertLess(max_idx_move_partition_out_foo,
1280 min_idx_move_partition_in_foo,
1281 "Must shrink partitions / remove partitions from group"
1282 "before adding / moving partitions into group")
Yifan Hongbb2658d2019-01-25 12:30:58 -08001283
1284 def test_remove_partition(self):
1285 source_info = common.LoadDictionaryFromLines("""
1286blockimgdiff_versions=3,4
1287use_dynamic_partitions=true
1288dynamic_partition_list=foo
1289super_partition_groups=group_foo
1290super_group_foo_group_size={group_foo_size}
1291super_group_foo_partition_list=foo
1292""".format(group_foo_size=4 * GiB).split("\n"))
1293 target_info = common.LoadDictionaryFromLines("""
1294blockimgdiff_versions=3,4
1295use_dynamic_partitions=true
1296super_partition_groups=group_foo
1297super_group_foo_group_size={group_foo_size}
1298""".format(group_foo_size=4 * GiB).split("\n"))
1299
1300 common.OPTIONS.info_dict = target_info
1301 common.OPTIONS.target_info_dict = target_info
1302 common.OPTIONS.source_info_dict = source_info
1303 common.OPTIONS.cache_size = 4 * 4096
1304
1305 block_diffs = [common.BlockDifference("foo", EmptyImage(),
1306 src=DataImage("source", pad=True))]
1307
1308 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1309 source_info_dict=source_info)
1310 with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1311 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1312
1313 self.assertNotIn("block_image_update", str(self.script),
Tao Bao2cc0ca12019-03-15 10:44:43 -07001314 "Removed partition should not be patched.")
Yifan Hongbb2658d2019-01-25 12:30:58 -08001315
1316 lines = self.get_op_list(self.output_path)
1317 self.assertEqual(lines, ["remove foo"])