blob: 10ec0d3ed6839d929d7d9e4e83e692fb47b66fe0 [file] [log] [blame]
Dan Albert8e0178d2015-01-27 15:53:15 -08001#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import os
Tianjie Xu9c384d22017-06-20 17:00:55 -070017import shutil
Dan Albert8e0178d2015-01-27 15:53:15 -080018import tempfile
19import time
20import unittest
21import zipfile
22
Tao Bao31b08072017-11-08 15:50:59 -080023from hashlib import sha1
24
Dan Albert8e0178d2015-01-27 15:53:15 -080025import common
Tianjie Xu9c384d22017-06-20 17:00:55 -070026import validate_target_files
Dan Albert8e0178d2015-01-27 15:53:15 -080027
Tao Bao31b08072017-11-08 15:50:59 -080028KiB = 1024
29MiB = 1024 * KiB
30GiB = 1024 * MiB
Dan Albert8e0178d2015-01-27 15:53:15 -080031
Tao Baof3282b42015-04-01 11:21:55 -070032def get_2gb_string():
Tao Bao31b08072017-11-08 15:50:59 -080033 size = int(2 * GiB + 1)
34 block_size = 4 * KiB
35 step_size = 4 * MiB
36 # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
37 for _ in range(0, size, step_size):
38 yield os.urandom(block_size)
39 yield '\0' * (step_size - block_size)
Tao Baof3282b42015-04-01 11:21:55 -070040
Dan Albert8e0178d2015-01-27 15:53:15 -080041
42class CommonZipTest(unittest.TestCase):
Tao Bao31b08072017-11-08 15:50:59 -080043 def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
Tao Baof3282b42015-04-01 11:21:55 -070044 test_file_name=None, expected_stat=None, expected_mode=0o644,
45 expected_compress_type=zipfile.ZIP_STORED):
46 # Verify the stat if present.
47 if test_file_name is not None:
48 new_stat = os.stat(test_file_name)
49 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
50 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
51
52 # Reopen the zip file to verify.
53 zip_file = zipfile.ZipFile(zip_file_name, "r")
54
55 # Verify the timestamp.
56 info = zip_file.getinfo(arcname)
57 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
58
59 # Verify the file mode.
60 mode = (info.external_attr >> 16) & 0o777
61 self.assertEqual(mode, expected_mode)
62
63 # Verify the compress type.
64 self.assertEqual(info.compress_type, expected_compress_type)
65
66 # Verify the zip contents.
Tao Bao31b08072017-11-08 15:50:59 -080067 entry = zip_file.open(arcname)
68 sha1_hash = sha1()
69 for chunk in iter(lambda: entry.read(4 * MiB), ''):
70 sha1_hash.update(chunk)
71 self.assertEqual(expected_hash, sha1_hash.hexdigest())
Tao Baof3282b42015-04-01 11:21:55 -070072 self.assertIsNone(zip_file.testzip())
73
Dan Albert8e0178d2015-01-27 15:53:15 -080074 def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
75 extra_zipwrite_args = dict(extra_zipwrite_args or {})
76
77 test_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080078 test_file_name = test_file.name
Tao Baof3282b42015-04-01 11:21:55 -070079
80 zip_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080081 zip_file_name = zip_file.name
82
83 # File names within an archive strip the leading slash.
84 arcname = extra_zipwrite_args.get("arcname", test_file_name)
85 if arcname[0] == "/":
86 arcname = arcname[1:]
87
88 zip_file.close()
89 zip_file = zipfile.ZipFile(zip_file_name, "w")
90
91 try:
Tao Bao31b08072017-11-08 15:50:59 -080092 sha1_hash = sha1()
93 for data in contents:
94 sha1_hash.update(data)
95 test_file.write(data)
Dan Albert8e0178d2015-01-27 15:53:15 -080096 test_file.close()
97
Tao Baof3282b42015-04-01 11:21:55 -070098 expected_stat = os.stat(test_file_name)
Dan Albert8e0178d2015-01-27 15:53:15 -080099 expected_mode = extra_zipwrite_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700100 expected_compress_type = extra_zipwrite_args.get("compress_type",
101 zipfile.ZIP_STORED)
Dan Albert8e0178d2015-01-27 15:53:15 -0800102 time.sleep(5) # Make sure the atime/mtime will change measurably.
103
104 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
Tao Baof3282b42015-04-01 11:21:55 -0700105 common.ZipClose(zip_file)
Dan Albert8e0178d2015-01-27 15:53:15 -0800106
Tao Bao31b08072017-11-08 15:50:59 -0800107 self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
108 test_file_name, expected_stat, expected_mode,
109 expected_compress_type)
Dan Albert8e0178d2015-01-27 15:53:15 -0800110 finally:
111 os.remove(test_file_name)
112 os.remove(zip_file_name)
113
Tao Baof3282b42015-04-01 11:21:55 -0700114 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
115 extra_args = dict(extra_args or {})
116
117 zip_file = tempfile.NamedTemporaryFile(delete=False)
118 zip_file_name = zip_file.name
119 zip_file.close()
120
121 zip_file = zipfile.ZipFile(zip_file_name, "w")
122
123 try:
124 expected_compress_type = extra_args.get("compress_type",
125 zipfile.ZIP_STORED)
126 time.sleep(5) # Make sure the atime/mtime will change measurably.
127
128 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
Tao Bao58c1b962015-05-20 09:32:18 -0700129 arcname = zinfo_or_arcname
130 expected_mode = extra_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700131 else:
Tao Bao58c1b962015-05-20 09:32:18 -0700132 arcname = zinfo_or_arcname.filename
133 expected_mode = extra_args.get("perms",
134 zinfo_or_arcname.external_attr >> 16)
Tao Baof3282b42015-04-01 11:21:55 -0700135
Tao Bao58c1b962015-05-20 09:32:18 -0700136 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
Tao Baof3282b42015-04-01 11:21:55 -0700137 common.ZipClose(zip_file)
138
Tao Bao31b08072017-11-08 15:50:59 -0800139 self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700140 expected_mode=expected_mode,
Tao Baof3282b42015-04-01 11:21:55 -0700141 expected_compress_type=expected_compress_type)
142 finally:
143 os.remove(zip_file_name)
144
145 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
146 extra_args = dict(extra_args or {})
147
148 zip_file = tempfile.NamedTemporaryFile(delete=False)
149 zip_file_name = zip_file.name
150
151 test_file = tempfile.NamedTemporaryFile(delete=False)
152 test_file_name = test_file.name
153
154 arcname_large = test_file_name
155 arcname_small = "bar"
156
157 # File names within an archive strip the leading slash.
158 if arcname_large[0] == "/":
159 arcname_large = arcname_large[1:]
160
161 zip_file.close()
162 zip_file = zipfile.ZipFile(zip_file_name, "w")
163
164 try:
Tao Bao31b08072017-11-08 15:50:59 -0800165 sha1_hash = sha1()
166 for data in large:
167 sha1_hash.update(data)
168 test_file.write(data)
Tao Baof3282b42015-04-01 11:21:55 -0700169 test_file.close()
170
171 expected_stat = os.stat(test_file_name)
172 expected_mode = 0o644
173 expected_compress_type = extra_args.get("compress_type",
174 zipfile.ZIP_STORED)
175 time.sleep(5) # Make sure the atime/mtime will change measurably.
176
177 common.ZipWrite(zip_file, test_file_name, **extra_args)
178 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
179 common.ZipClose(zip_file)
180
181 # Verify the contents written by ZipWrite().
Tao Bao31b08072017-11-08 15:50:59 -0800182 self._verify(zip_file, zip_file_name, arcname_large,
183 sha1_hash.hexdigest(), test_file_name, expected_stat,
184 expected_mode, expected_compress_type)
Tao Baof3282b42015-04-01 11:21:55 -0700185
186 # Verify the contents written by ZipWriteStr().
Tao Bao31b08072017-11-08 15:50:59 -0800187 self._verify(zip_file, zip_file_name, arcname_small,
188 sha1(small).hexdigest(),
Tao Baof3282b42015-04-01 11:21:55 -0700189 expected_compress_type=expected_compress_type)
190 finally:
191 os.remove(zip_file_name)
192 os.remove(test_file_name)
193
194 def _test_reset_ZIP64_LIMIT(self, func, *args):
195 default_limit = (1 << 31) - 1
196 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
197 func(*args)
198 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
199
Dan Albert8e0178d2015-01-27 15:53:15 -0800200 def test_ZipWrite(self):
201 file_contents = os.urandom(1024)
202 self._test_ZipWrite(file_contents)
203
204 def test_ZipWrite_with_opts(self):
205 file_contents = os.urandom(1024)
206 self._test_ZipWrite(file_contents, {
207 "arcname": "foobar",
208 "perms": 0o777,
209 "compress_type": zipfile.ZIP_DEFLATED,
210 })
Tao Baof3282b42015-04-01 11:21:55 -0700211 self._test_ZipWrite(file_contents, {
212 "arcname": "foobar",
213 "perms": 0o700,
214 "compress_type": zipfile.ZIP_STORED,
215 })
Dan Albert8e0178d2015-01-27 15:53:15 -0800216
217 def test_ZipWrite_large_file(self):
Tao Baof3282b42015-04-01 11:21:55 -0700218 file_contents = get_2gb_string()
Dan Albert8e0178d2015-01-27 15:53:15 -0800219 self._test_ZipWrite(file_contents, {
220 "compress_type": zipfile.ZIP_DEFLATED,
221 })
222
223 def test_ZipWrite_resets_ZIP64_LIMIT(self):
Tao Baof3282b42015-04-01 11:21:55 -0700224 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
225
226 def test_ZipWriteStr(self):
227 random_string = os.urandom(1024)
228 # Passing arcname
229 self._test_ZipWriteStr("foo", random_string)
230
231 # Passing zinfo
232 zinfo = zipfile.ZipInfo(filename="foo")
233 self._test_ZipWriteStr(zinfo, random_string)
234
235 # Timestamp in the zinfo should be overwritten.
236 zinfo.date_time = (2015, 3, 1, 15, 30, 0)
237 self._test_ZipWriteStr(zinfo, random_string)
238
239 def test_ZipWriteStr_with_opts(self):
240 random_string = os.urandom(1024)
241 # Passing arcname
242 self._test_ZipWriteStr("foo", random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700243 "perms": 0o700,
Tao Baof3282b42015-04-01 11:21:55 -0700244 "compress_type": zipfile.ZIP_DEFLATED,
245 })
Tao Bao58c1b962015-05-20 09:32:18 -0700246 self._test_ZipWriteStr("bar", random_string, {
Tao Baof3282b42015-04-01 11:21:55 -0700247 "compress_type": zipfile.ZIP_STORED,
248 })
249
250 # Passing zinfo
251 zinfo = zipfile.ZipInfo(filename="foo")
252 self._test_ZipWriteStr(zinfo, random_string, {
253 "compress_type": zipfile.ZIP_DEFLATED,
254 })
255 self._test_ZipWriteStr(zinfo, random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700256 "perms": 0o600,
Tao Baof3282b42015-04-01 11:21:55 -0700257 "compress_type": zipfile.ZIP_STORED,
258 })
259
260 def test_ZipWriteStr_large_file(self):
261 # zipfile.writestr() doesn't work when the str size is over 2GiB even with
262 # the workaround. We will only test the case of writing a string into a
263 # large archive.
264 long_string = get_2gb_string()
265 short_string = os.urandom(1024)
266 self._test_ZipWriteStr_large_file(long_string, short_string, {
267 "compress_type": zipfile.ZIP_DEFLATED,
268 })
269
270 def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
271 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
272 zinfo = zipfile.ZipInfo(filename="foo")
273 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
Tao Bao58c1b962015-05-20 09:32:18 -0700274
275 def test_bug21309935(self):
276 zip_file = tempfile.NamedTemporaryFile(delete=False)
277 zip_file_name = zip_file.name
278 zip_file.close()
279
280 try:
281 random_string = os.urandom(1024)
282 zip_file = zipfile.ZipFile(zip_file_name, "w")
283 # Default perms should be 0o644 when passing the filename.
284 common.ZipWriteStr(zip_file, "foo", random_string)
285 # Honor the specified perms.
286 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
287 # The perms in zinfo should be untouched.
288 zinfo = zipfile.ZipInfo(filename="baz")
289 zinfo.external_attr = 0o740 << 16
290 common.ZipWriteStr(zip_file, zinfo, random_string)
291 # Explicitly specified perms has the priority.
292 zinfo = zipfile.ZipInfo(filename="qux")
293 zinfo.external_attr = 0o700 << 16
294 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
295 common.ZipClose(zip_file)
296
Tao Bao31b08072017-11-08 15:50:59 -0800297 self._verify(zip_file, zip_file_name, "foo",
298 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700299 expected_mode=0o644)
Tao Bao31b08072017-11-08 15:50:59 -0800300 self._verify(zip_file, zip_file_name, "bar",
301 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700302 expected_mode=0o755)
Tao Bao31b08072017-11-08 15:50:59 -0800303 self._verify(zip_file, zip_file_name, "baz",
304 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700305 expected_mode=0o740)
Tao Bao31b08072017-11-08 15:50:59 -0800306 self._verify(zip_file, zip_file_name, "qux",
307 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700308 expected_mode=0o400)
309 finally:
310 os.remove(zip_file_name)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700311
312class InstallRecoveryScriptFormatTest(unittest.TestCase):
313 """Check the format of install-recovery.sh
314
315 Its format should match between common.py and validate_target_files.py."""
316
317 def setUp(self):
318 self._tempdir = tempfile.mkdtemp()
319 # Create a dummy dict that contains the fstab info for boot&recovery.
320 self._info = {"fstab" : {}}
321 dummy_fstab = \
322 ["/dev/soc.0/by-name/boot /boot emmc defaults defaults",
323 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
Tao Bao31b08072017-11-08 15:50:59 -0800324 self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
Tianjie Xudf055582017-11-07 12:22:58 -0800325 # Construct the gzipped recovery.img and boot.img
326 self.recovery_data = bytearray([
327 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
328 0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
329 0x08, 0x00, 0x00, 0x00
330 ])
331 # echo -n "boot" | gzip -f | hd
332 self.boot_data = bytearray([
333 0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
334 0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
335 ])
Tianjie Xu9c384d22017-06-20 17:00:55 -0700336
337 def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
338 loc = os.path.join(self._tempdir, prefix, name)
339 if not os.path.exists(os.path.dirname(loc)):
340 os.makedirs(os.path.dirname(loc))
341 with open(loc, "w+") as f:
342 f.write(data)
343
344 def test_full_recovery(self):
Tao Bao31b08072017-11-08 15:50:59 -0800345 recovery_image = common.File("recovery.img", self.recovery_data)
346 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700347 self._info["full_recovery_image"] = "true"
348
349 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
350 recovery_image, boot_image, self._info)
351 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
352 self._info)
353
354 def test_recovery_from_boot(self):
Tao Bao31b08072017-11-08 15:50:59 -0800355 recovery_image = common.File("recovery.img", self.recovery_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700356 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
Tao Bao31b08072017-11-08 15:50:59 -0800357 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700358 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
359
360 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
361 recovery_image, boot_image, self._info)
362 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
363 self._info)
364 # Validate 'recovery-from-boot' with bonus argument.
365 self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
366 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
367 recovery_image, boot_image, self._info)
368 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
369 self._info)
370
371 def tearDown(self):
372 shutil.rmtree(self._tempdir)