blob: ed454ca726413fdef2de7826f2a784d23c25a356 [file] [log] [blame]
Dan Albert8e0178d2015-01-27 15:53:15 -08001#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import os
17import tempfile
18import time
19import unittest
20import zipfile
Tao Bao31b08072017-11-08 15:50:59 -080021from hashlib import sha1
22
Dan Albert8e0178d2015-01-27 15:53:15 -080023import common
Tianjie Xu9c384d22017-06-20 17:00:55 -070024import validate_target_files
Dan Albert8e0178d2015-01-27 15:53:15 -080025
Tao Bao31b08072017-11-08 15:50:59 -080026KiB = 1024
27MiB = 1024 * KiB
28GiB = 1024 * MiB
Dan Albert8e0178d2015-01-27 15:53:15 -080029
Tao Bao1c830bf2017-12-25 10:43:47 -080030
Tao Baof3282b42015-04-01 11:21:55 -070031def get_2gb_string():
Tao Bao31b08072017-11-08 15:50:59 -080032 size = int(2 * GiB + 1)
33 block_size = 4 * KiB
34 step_size = 4 * MiB
35 # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
36 for _ in range(0, size, step_size):
37 yield os.urandom(block_size)
38 yield '\0' * (step_size - block_size)
Tao Baof3282b42015-04-01 11:21:55 -070039
Dan Albert8e0178d2015-01-27 15:53:15 -080040
41class CommonZipTest(unittest.TestCase):
Tao Bao31b08072017-11-08 15:50:59 -080042 def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
Tao Baof3282b42015-04-01 11:21:55 -070043 test_file_name=None, expected_stat=None, expected_mode=0o644,
44 expected_compress_type=zipfile.ZIP_STORED):
45 # Verify the stat if present.
46 if test_file_name is not None:
47 new_stat = os.stat(test_file_name)
48 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
49 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
50
51 # Reopen the zip file to verify.
52 zip_file = zipfile.ZipFile(zip_file_name, "r")
53
54 # Verify the timestamp.
55 info = zip_file.getinfo(arcname)
56 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
57
58 # Verify the file mode.
59 mode = (info.external_attr >> 16) & 0o777
60 self.assertEqual(mode, expected_mode)
61
62 # Verify the compress type.
63 self.assertEqual(info.compress_type, expected_compress_type)
64
65 # Verify the zip contents.
Tao Bao31b08072017-11-08 15:50:59 -080066 entry = zip_file.open(arcname)
67 sha1_hash = sha1()
68 for chunk in iter(lambda: entry.read(4 * MiB), ''):
69 sha1_hash.update(chunk)
70 self.assertEqual(expected_hash, sha1_hash.hexdigest())
Tao Baof3282b42015-04-01 11:21:55 -070071 self.assertIsNone(zip_file.testzip())
72
Dan Albert8e0178d2015-01-27 15:53:15 -080073 def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
74 extra_zipwrite_args = dict(extra_zipwrite_args or {})
75
76 test_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080077 test_file_name = test_file.name
Tao Baof3282b42015-04-01 11:21:55 -070078
79 zip_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080080 zip_file_name = zip_file.name
81
82 # File names within an archive strip the leading slash.
83 arcname = extra_zipwrite_args.get("arcname", test_file_name)
84 if arcname[0] == "/":
85 arcname = arcname[1:]
86
87 zip_file.close()
88 zip_file = zipfile.ZipFile(zip_file_name, "w")
89
90 try:
Tao Bao31b08072017-11-08 15:50:59 -080091 sha1_hash = sha1()
92 for data in contents:
93 sha1_hash.update(data)
94 test_file.write(data)
Dan Albert8e0178d2015-01-27 15:53:15 -080095 test_file.close()
96
Tao Baof3282b42015-04-01 11:21:55 -070097 expected_stat = os.stat(test_file_name)
Dan Albert8e0178d2015-01-27 15:53:15 -080098 expected_mode = extra_zipwrite_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -070099 expected_compress_type = extra_zipwrite_args.get("compress_type",
100 zipfile.ZIP_STORED)
Dan Albert8e0178d2015-01-27 15:53:15 -0800101 time.sleep(5) # Make sure the atime/mtime will change measurably.
102
103 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
Tao Baof3282b42015-04-01 11:21:55 -0700104 common.ZipClose(zip_file)
Dan Albert8e0178d2015-01-27 15:53:15 -0800105
Tao Bao31b08072017-11-08 15:50:59 -0800106 self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
107 test_file_name, expected_stat, expected_mode,
108 expected_compress_type)
Dan Albert8e0178d2015-01-27 15:53:15 -0800109 finally:
110 os.remove(test_file_name)
111 os.remove(zip_file_name)
112
Tao Baof3282b42015-04-01 11:21:55 -0700113 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
114 extra_args = dict(extra_args or {})
115
116 zip_file = tempfile.NamedTemporaryFile(delete=False)
117 zip_file_name = zip_file.name
118 zip_file.close()
119
120 zip_file = zipfile.ZipFile(zip_file_name, "w")
121
122 try:
123 expected_compress_type = extra_args.get("compress_type",
124 zipfile.ZIP_STORED)
125 time.sleep(5) # Make sure the atime/mtime will change measurably.
126
127 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
Tao Bao58c1b962015-05-20 09:32:18 -0700128 arcname = zinfo_or_arcname
129 expected_mode = extra_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700130 else:
Tao Bao58c1b962015-05-20 09:32:18 -0700131 arcname = zinfo_or_arcname.filename
132 expected_mode = extra_args.get("perms",
133 zinfo_or_arcname.external_attr >> 16)
Tao Baof3282b42015-04-01 11:21:55 -0700134
Tao Bao58c1b962015-05-20 09:32:18 -0700135 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
Tao Baof3282b42015-04-01 11:21:55 -0700136 common.ZipClose(zip_file)
137
Tao Bao31b08072017-11-08 15:50:59 -0800138 self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700139 expected_mode=expected_mode,
Tao Baof3282b42015-04-01 11:21:55 -0700140 expected_compress_type=expected_compress_type)
141 finally:
142 os.remove(zip_file_name)
143
144 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
145 extra_args = dict(extra_args or {})
146
147 zip_file = tempfile.NamedTemporaryFile(delete=False)
148 zip_file_name = zip_file.name
149
150 test_file = tempfile.NamedTemporaryFile(delete=False)
151 test_file_name = test_file.name
152
153 arcname_large = test_file_name
154 arcname_small = "bar"
155
156 # File names within an archive strip the leading slash.
157 if arcname_large[0] == "/":
158 arcname_large = arcname_large[1:]
159
160 zip_file.close()
161 zip_file = zipfile.ZipFile(zip_file_name, "w")
162
163 try:
Tao Bao31b08072017-11-08 15:50:59 -0800164 sha1_hash = sha1()
165 for data in large:
166 sha1_hash.update(data)
167 test_file.write(data)
Tao Baof3282b42015-04-01 11:21:55 -0700168 test_file.close()
169
170 expected_stat = os.stat(test_file_name)
171 expected_mode = 0o644
172 expected_compress_type = extra_args.get("compress_type",
173 zipfile.ZIP_STORED)
174 time.sleep(5) # Make sure the atime/mtime will change measurably.
175
176 common.ZipWrite(zip_file, test_file_name, **extra_args)
177 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
178 common.ZipClose(zip_file)
179
180 # Verify the contents written by ZipWrite().
Tao Bao31b08072017-11-08 15:50:59 -0800181 self._verify(zip_file, zip_file_name, arcname_large,
182 sha1_hash.hexdigest(), test_file_name, expected_stat,
183 expected_mode, expected_compress_type)
Tao Baof3282b42015-04-01 11:21:55 -0700184
185 # Verify the contents written by ZipWriteStr().
Tao Bao31b08072017-11-08 15:50:59 -0800186 self._verify(zip_file, zip_file_name, arcname_small,
187 sha1(small).hexdigest(),
Tao Baof3282b42015-04-01 11:21:55 -0700188 expected_compress_type=expected_compress_type)
189 finally:
190 os.remove(zip_file_name)
191 os.remove(test_file_name)
192
193 def _test_reset_ZIP64_LIMIT(self, func, *args):
194 default_limit = (1 << 31) - 1
195 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
196 func(*args)
197 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
198
Dan Albert8e0178d2015-01-27 15:53:15 -0800199 def test_ZipWrite(self):
200 file_contents = os.urandom(1024)
201 self._test_ZipWrite(file_contents)
202
203 def test_ZipWrite_with_opts(self):
204 file_contents = os.urandom(1024)
205 self._test_ZipWrite(file_contents, {
206 "arcname": "foobar",
207 "perms": 0o777,
208 "compress_type": zipfile.ZIP_DEFLATED,
209 })
Tao Baof3282b42015-04-01 11:21:55 -0700210 self._test_ZipWrite(file_contents, {
211 "arcname": "foobar",
212 "perms": 0o700,
213 "compress_type": zipfile.ZIP_STORED,
214 })
Dan Albert8e0178d2015-01-27 15:53:15 -0800215
216 def test_ZipWrite_large_file(self):
Tao Baof3282b42015-04-01 11:21:55 -0700217 file_contents = get_2gb_string()
Dan Albert8e0178d2015-01-27 15:53:15 -0800218 self._test_ZipWrite(file_contents, {
219 "compress_type": zipfile.ZIP_DEFLATED,
220 })
221
222 def test_ZipWrite_resets_ZIP64_LIMIT(self):
Tao Baof3282b42015-04-01 11:21:55 -0700223 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
224
225 def test_ZipWriteStr(self):
226 random_string = os.urandom(1024)
227 # Passing arcname
228 self._test_ZipWriteStr("foo", random_string)
229
230 # Passing zinfo
231 zinfo = zipfile.ZipInfo(filename="foo")
232 self._test_ZipWriteStr(zinfo, random_string)
233
234 # Timestamp in the zinfo should be overwritten.
235 zinfo.date_time = (2015, 3, 1, 15, 30, 0)
236 self._test_ZipWriteStr(zinfo, random_string)
237
238 def test_ZipWriteStr_with_opts(self):
239 random_string = os.urandom(1024)
240 # Passing arcname
241 self._test_ZipWriteStr("foo", random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700242 "perms": 0o700,
Tao Baof3282b42015-04-01 11:21:55 -0700243 "compress_type": zipfile.ZIP_DEFLATED,
244 })
Tao Bao58c1b962015-05-20 09:32:18 -0700245 self._test_ZipWriteStr("bar", random_string, {
Tao Baof3282b42015-04-01 11:21:55 -0700246 "compress_type": zipfile.ZIP_STORED,
247 })
248
249 # Passing zinfo
250 zinfo = zipfile.ZipInfo(filename="foo")
251 self._test_ZipWriteStr(zinfo, random_string, {
252 "compress_type": zipfile.ZIP_DEFLATED,
253 })
254 self._test_ZipWriteStr(zinfo, random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700255 "perms": 0o600,
Tao Baof3282b42015-04-01 11:21:55 -0700256 "compress_type": zipfile.ZIP_STORED,
257 })
258
259 def test_ZipWriteStr_large_file(self):
260 # zipfile.writestr() doesn't work when the str size is over 2GiB even with
261 # the workaround. We will only test the case of writing a string into a
262 # large archive.
263 long_string = get_2gb_string()
264 short_string = os.urandom(1024)
265 self._test_ZipWriteStr_large_file(long_string, short_string, {
266 "compress_type": zipfile.ZIP_DEFLATED,
267 })
268
269 def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
270 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
271 zinfo = zipfile.ZipInfo(filename="foo")
272 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
Tao Bao58c1b962015-05-20 09:32:18 -0700273
274 def test_bug21309935(self):
275 zip_file = tempfile.NamedTemporaryFile(delete=False)
276 zip_file_name = zip_file.name
277 zip_file.close()
278
279 try:
280 random_string = os.urandom(1024)
281 zip_file = zipfile.ZipFile(zip_file_name, "w")
282 # Default perms should be 0o644 when passing the filename.
283 common.ZipWriteStr(zip_file, "foo", random_string)
284 # Honor the specified perms.
285 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
286 # The perms in zinfo should be untouched.
287 zinfo = zipfile.ZipInfo(filename="baz")
288 zinfo.external_attr = 0o740 << 16
289 common.ZipWriteStr(zip_file, zinfo, random_string)
290 # Explicitly specified perms has the priority.
291 zinfo = zipfile.ZipInfo(filename="qux")
292 zinfo.external_attr = 0o700 << 16
293 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
294 common.ZipClose(zip_file)
295
Tao Bao31b08072017-11-08 15:50:59 -0800296 self._verify(zip_file, zip_file_name, "foo",
297 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700298 expected_mode=0o644)
Tao Bao31b08072017-11-08 15:50:59 -0800299 self._verify(zip_file, zip_file_name, "bar",
300 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700301 expected_mode=0o755)
Tao Bao31b08072017-11-08 15:50:59 -0800302 self._verify(zip_file, zip_file_name, "baz",
303 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700304 expected_mode=0o740)
Tao Bao31b08072017-11-08 15:50:59 -0800305 self._verify(zip_file, zip_file_name, "qux",
306 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700307 expected_mode=0o400)
308 finally:
309 os.remove(zip_file_name)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700310
Tao Bao89d7ab22017-12-14 17:05:33 -0800311 def test_ZipDelete(self):
312 zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
313 output_zip = zipfile.ZipFile(zip_file.name, 'w',
314 compression=zipfile.ZIP_DEFLATED)
315 with tempfile.NamedTemporaryFile() as entry_file:
316 entry_file.write(os.urandom(1024))
317 common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
318 common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
319 common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
320 common.ZipClose(output_zip)
321 zip_file.close()
322
323 try:
324 common.ZipDelete(zip_file.name, 'Test2')
325 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
326 entries = check_zip.namelist()
327 self.assertTrue('Test1' in entries)
328 self.assertFalse('Test2' in entries)
329 self.assertTrue('Test3' in entries)
330
331 self.assertRaises(AssertionError, common.ZipDelete, zip_file.name,
332 'Test2')
333 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
334 entries = check_zip.namelist()
335 self.assertTrue('Test1' in entries)
336 self.assertFalse('Test2' in entries)
337 self.assertTrue('Test3' in entries)
338
339 common.ZipDelete(zip_file.name, ['Test3'])
340 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
341 entries = check_zip.namelist()
342 self.assertTrue('Test1' in entries)
343 self.assertFalse('Test2' in entries)
344 self.assertFalse('Test3' in entries)
345
346 common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
347 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
348 entries = check_zip.namelist()
349 self.assertFalse('Test1' in entries)
350 self.assertFalse('Test2' in entries)
351 self.assertFalse('Test3' in entries)
352 finally:
353 os.remove(zip_file.name)
354
355
Tianjie Xu9c384d22017-06-20 17:00:55 -0700356class InstallRecoveryScriptFormatTest(unittest.TestCase):
Tao Bao1c830bf2017-12-25 10:43:47 -0800357 """Checks the format of install-recovery.sh.
Tianjie Xu9c384d22017-06-20 17:00:55 -0700358
Tao Bao1c830bf2017-12-25 10:43:47 -0800359 Its format should match between common.py and validate_target_files.py.
360 """
Tianjie Xu9c384d22017-06-20 17:00:55 -0700361
362 def setUp(self):
Tao Bao1c830bf2017-12-25 10:43:47 -0800363 self._tempdir = common.MakeTempDir()
Tianjie Xu9c384d22017-06-20 17:00:55 -0700364 # Create a dummy dict that contains the fstab info for boot&recovery.
365 self._info = {"fstab" : {}}
Tao Bao1c830bf2017-12-25 10:43:47 -0800366 dummy_fstab = [
367 "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
368 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
Tao Bao31b08072017-11-08 15:50:59 -0800369 self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
Tianjie Xudf055582017-11-07 12:22:58 -0800370 # Construct the gzipped recovery.img and boot.img
371 self.recovery_data = bytearray([
372 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
373 0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
374 0x08, 0x00, 0x00, 0x00
375 ])
376 # echo -n "boot" | gzip -f | hd
377 self.boot_data = bytearray([
378 0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
379 0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
380 ])
Tianjie Xu9c384d22017-06-20 17:00:55 -0700381
382 def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
383 loc = os.path.join(self._tempdir, prefix, name)
384 if not os.path.exists(os.path.dirname(loc)):
385 os.makedirs(os.path.dirname(loc))
386 with open(loc, "w+") as f:
387 f.write(data)
388
389 def test_full_recovery(self):
Tao Bao31b08072017-11-08 15:50:59 -0800390 recovery_image = common.File("recovery.img", self.recovery_data)
391 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700392 self._info["full_recovery_image"] = "true"
393
394 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
395 recovery_image, boot_image, self._info)
396 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
397 self._info)
398
399 def test_recovery_from_boot(self):
Tao Bao31b08072017-11-08 15:50:59 -0800400 recovery_image = common.File("recovery.img", self.recovery_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700401 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
Tao Bao31b08072017-11-08 15:50:59 -0800402 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700403 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
404
405 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
406 recovery_image, boot_image, self._info)
407 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
408 self._info)
409 # Validate 'recovery-from-boot' with bonus argument.
410 self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
411 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
412 recovery_image, boot_image, self._info)
413 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
414 self._info)
415
416 def tearDown(self):
Tao Bao1c830bf2017-12-25 10:43:47 -0800417 common.Cleanup()