blob: 36f256dc5c54d8daf46d5a67ac272ddf37940ceb [file] [log] [blame]
Dan Albert8e0178d2015-01-27 15:53:15 -08001#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import os
Tianjie Xu9c384d22017-06-20 17:00:55 -070017import shutil
Dan Albert8e0178d2015-01-27 15:53:15 -080018import tempfile
19import time
20import unittest
21import zipfile
22
23import common
Tianjie Xu9c384d22017-06-20 17:00:55 -070024import validate_target_files
Dan Albert8e0178d2015-01-27 15:53:15 -080025
26
27def random_string_with_holes(size, block_size, step_size):
28 data = ["\0"] * size
29 for begin in range(0, size, step_size):
30 end = begin + block_size
31 data[begin:end] = os.urandom(block_size)
32 return "".join(data)
33
Tao Baof3282b42015-04-01 11:21:55 -070034def get_2gb_string():
35 kilobytes = 1024
36 megabytes = 1024 * kilobytes
37 gigabytes = 1024 * megabytes
38
39 size = int(2 * gigabytes + 1)
40 block_size = 4 * kilobytes
41 step_size = 4 * megabytes
42 two_gb_string = random_string_with_holes(
43 size, block_size, step_size)
44 return two_gb_string
45
Dan Albert8e0178d2015-01-27 15:53:15 -080046
47class CommonZipTest(unittest.TestCase):
Tao Baof3282b42015-04-01 11:21:55 -070048 def _verify(self, zip_file, zip_file_name, arcname, contents,
49 test_file_name=None, expected_stat=None, expected_mode=0o644,
50 expected_compress_type=zipfile.ZIP_STORED):
51 # Verify the stat if present.
52 if test_file_name is not None:
53 new_stat = os.stat(test_file_name)
54 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
55 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
56
57 # Reopen the zip file to verify.
58 zip_file = zipfile.ZipFile(zip_file_name, "r")
59
60 # Verify the timestamp.
61 info = zip_file.getinfo(arcname)
62 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
63
64 # Verify the file mode.
65 mode = (info.external_attr >> 16) & 0o777
66 self.assertEqual(mode, expected_mode)
67
68 # Verify the compress type.
69 self.assertEqual(info.compress_type, expected_compress_type)
70
71 # Verify the zip contents.
72 self.assertEqual(zip_file.read(arcname), contents)
73 self.assertIsNone(zip_file.testzip())
74
Dan Albert8e0178d2015-01-27 15:53:15 -080075 def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
76 extra_zipwrite_args = dict(extra_zipwrite_args or {})
77
78 test_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080079 test_file_name = test_file.name
Tao Baof3282b42015-04-01 11:21:55 -070080
81 zip_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080082 zip_file_name = zip_file.name
83
84 # File names within an archive strip the leading slash.
85 arcname = extra_zipwrite_args.get("arcname", test_file_name)
86 if arcname[0] == "/":
87 arcname = arcname[1:]
88
89 zip_file.close()
90 zip_file = zipfile.ZipFile(zip_file_name, "w")
91
92 try:
93 test_file.write(contents)
94 test_file.close()
95
Tao Baof3282b42015-04-01 11:21:55 -070096 expected_stat = os.stat(test_file_name)
Dan Albert8e0178d2015-01-27 15:53:15 -080097 expected_mode = extra_zipwrite_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -070098 expected_compress_type = extra_zipwrite_args.get("compress_type",
99 zipfile.ZIP_STORED)
Dan Albert8e0178d2015-01-27 15:53:15 -0800100 time.sleep(5) # Make sure the atime/mtime will change measurably.
101
102 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
Tao Baof3282b42015-04-01 11:21:55 -0700103 common.ZipClose(zip_file)
Dan Albert8e0178d2015-01-27 15:53:15 -0800104
Tao Baof3282b42015-04-01 11:21:55 -0700105 self._verify(zip_file, zip_file_name, arcname, contents, test_file_name,
106 expected_stat, expected_mode, expected_compress_type)
Dan Albert8e0178d2015-01-27 15:53:15 -0800107 finally:
108 os.remove(test_file_name)
109 os.remove(zip_file_name)
110
Tao Baof3282b42015-04-01 11:21:55 -0700111 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
112 extra_args = dict(extra_args or {})
113
114 zip_file = tempfile.NamedTemporaryFile(delete=False)
115 zip_file_name = zip_file.name
116 zip_file.close()
117
118 zip_file = zipfile.ZipFile(zip_file_name, "w")
119
120 try:
121 expected_compress_type = extra_args.get("compress_type",
122 zipfile.ZIP_STORED)
123 time.sleep(5) # Make sure the atime/mtime will change measurably.
124
125 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
Tao Bao58c1b962015-05-20 09:32:18 -0700126 arcname = zinfo_or_arcname
127 expected_mode = extra_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700128 else:
Tao Bao58c1b962015-05-20 09:32:18 -0700129 arcname = zinfo_or_arcname.filename
130 expected_mode = extra_args.get("perms",
131 zinfo_or_arcname.external_attr >> 16)
Tao Baof3282b42015-04-01 11:21:55 -0700132
Tao Bao58c1b962015-05-20 09:32:18 -0700133 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
Tao Baof3282b42015-04-01 11:21:55 -0700134 common.ZipClose(zip_file)
135
136 self._verify(zip_file, zip_file_name, arcname, contents,
Tao Bao58c1b962015-05-20 09:32:18 -0700137 expected_mode=expected_mode,
Tao Baof3282b42015-04-01 11:21:55 -0700138 expected_compress_type=expected_compress_type)
139 finally:
140 os.remove(zip_file_name)
141
142 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
143 extra_args = dict(extra_args or {})
144
145 zip_file = tempfile.NamedTemporaryFile(delete=False)
146 zip_file_name = zip_file.name
147
148 test_file = tempfile.NamedTemporaryFile(delete=False)
149 test_file_name = test_file.name
150
151 arcname_large = test_file_name
152 arcname_small = "bar"
153
154 # File names within an archive strip the leading slash.
155 if arcname_large[0] == "/":
156 arcname_large = arcname_large[1:]
157
158 zip_file.close()
159 zip_file = zipfile.ZipFile(zip_file_name, "w")
160
161 try:
162 test_file.write(large)
163 test_file.close()
164
165 expected_stat = os.stat(test_file_name)
166 expected_mode = 0o644
167 expected_compress_type = extra_args.get("compress_type",
168 zipfile.ZIP_STORED)
169 time.sleep(5) # Make sure the atime/mtime will change measurably.
170
171 common.ZipWrite(zip_file, test_file_name, **extra_args)
172 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
173 common.ZipClose(zip_file)
174
175 # Verify the contents written by ZipWrite().
176 self._verify(zip_file, zip_file_name, arcname_large, large,
177 test_file_name, expected_stat, expected_mode,
178 expected_compress_type)
179
180 # Verify the contents written by ZipWriteStr().
181 self._verify(zip_file, zip_file_name, arcname_small, small,
182 expected_compress_type=expected_compress_type)
183 finally:
184 os.remove(zip_file_name)
185 os.remove(test_file_name)
186
187 def _test_reset_ZIP64_LIMIT(self, func, *args):
188 default_limit = (1 << 31) - 1
189 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
190 func(*args)
191 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
192
Dan Albert8e0178d2015-01-27 15:53:15 -0800193 def test_ZipWrite(self):
194 file_contents = os.urandom(1024)
195 self._test_ZipWrite(file_contents)
196
197 def test_ZipWrite_with_opts(self):
198 file_contents = os.urandom(1024)
199 self._test_ZipWrite(file_contents, {
200 "arcname": "foobar",
201 "perms": 0o777,
202 "compress_type": zipfile.ZIP_DEFLATED,
203 })
Tao Baof3282b42015-04-01 11:21:55 -0700204 self._test_ZipWrite(file_contents, {
205 "arcname": "foobar",
206 "perms": 0o700,
207 "compress_type": zipfile.ZIP_STORED,
208 })
Dan Albert8e0178d2015-01-27 15:53:15 -0800209
210 def test_ZipWrite_large_file(self):
Tao Baof3282b42015-04-01 11:21:55 -0700211 file_contents = get_2gb_string()
Dan Albert8e0178d2015-01-27 15:53:15 -0800212 self._test_ZipWrite(file_contents, {
213 "compress_type": zipfile.ZIP_DEFLATED,
214 })
215
216 def test_ZipWrite_resets_ZIP64_LIMIT(self):
Tao Baof3282b42015-04-01 11:21:55 -0700217 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
218
219 def test_ZipWriteStr(self):
220 random_string = os.urandom(1024)
221 # Passing arcname
222 self._test_ZipWriteStr("foo", random_string)
223
224 # Passing zinfo
225 zinfo = zipfile.ZipInfo(filename="foo")
226 self._test_ZipWriteStr(zinfo, random_string)
227
228 # Timestamp in the zinfo should be overwritten.
229 zinfo.date_time = (2015, 3, 1, 15, 30, 0)
230 self._test_ZipWriteStr(zinfo, random_string)
231
232 def test_ZipWriteStr_with_opts(self):
233 random_string = os.urandom(1024)
234 # Passing arcname
235 self._test_ZipWriteStr("foo", random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700236 "perms": 0o700,
Tao Baof3282b42015-04-01 11:21:55 -0700237 "compress_type": zipfile.ZIP_DEFLATED,
238 })
Tao Bao58c1b962015-05-20 09:32:18 -0700239 self._test_ZipWriteStr("bar", random_string, {
Tao Baof3282b42015-04-01 11:21:55 -0700240 "compress_type": zipfile.ZIP_STORED,
241 })
242
243 # Passing zinfo
244 zinfo = zipfile.ZipInfo(filename="foo")
245 self._test_ZipWriteStr(zinfo, random_string, {
246 "compress_type": zipfile.ZIP_DEFLATED,
247 })
248 self._test_ZipWriteStr(zinfo, random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700249 "perms": 0o600,
Tao Baof3282b42015-04-01 11:21:55 -0700250 "compress_type": zipfile.ZIP_STORED,
251 })
252
253 def test_ZipWriteStr_large_file(self):
254 # zipfile.writestr() doesn't work when the str size is over 2GiB even with
255 # the workaround. We will only test the case of writing a string into a
256 # large archive.
257 long_string = get_2gb_string()
258 short_string = os.urandom(1024)
259 self._test_ZipWriteStr_large_file(long_string, short_string, {
260 "compress_type": zipfile.ZIP_DEFLATED,
261 })
262
263 def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
264 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
265 zinfo = zipfile.ZipInfo(filename="foo")
266 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
Tao Bao58c1b962015-05-20 09:32:18 -0700267
268 def test_bug21309935(self):
269 zip_file = tempfile.NamedTemporaryFile(delete=False)
270 zip_file_name = zip_file.name
271 zip_file.close()
272
273 try:
274 random_string = os.urandom(1024)
275 zip_file = zipfile.ZipFile(zip_file_name, "w")
276 # Default perms should be 0o644 when passing the filename.
277 common.ZipWriteStr(zip_file, "foo", random_string)
278 # Honor the specified perms.
279 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
280 # The perms in zinfo should be untouched.
281 zinfo = zipfile.ZipInfo(filename="baz")
282 zinfo.external_attr = 0o740 << 16
283 common.ZipWriteStr(zip_file, zinfo, random_string)
284 # Explicitly specified perms has the priority.
285 zinfo = zipfile.ZipInfo(filename="qux")
286 zinfo.external_attr = 0o700 << 16
287 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
288 common.ZipClose(zip_file)
289
290 self._verify(zip_file, zip_file_name, "foo", random_string,
291 expected_mode=0o644)
292 self._verify(zip_file, zip_file_name, "bar", random_string,
293 expected_mode=0o755)
294 self._verify(zip_file, zip_file_name, "baz", random_string,
295 expected_mode=0o740)
296 self._verify(zip_file, zip_file_name, "qux", random_string,
297 expected_mode=0o400)
298 finally:
299 os.remove(zip_file_name)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700300
301class InstallRecoveryScriptFormatTest(unittest.TestCase):
302 """Check the format of install-recovery.sh
303
304 Its format should match between common.py and validate_target_files.py."""
305
306 def setUp(self):
307 self._tempdir = tempfile.mkdtemp()
308 # Create a dummy dict that contains the fstab info for boot&recovery.
309 self._info = {"fstab" : {}}
310 dummy_fstab = \
311 ["/dev/soc.0/by-name/boot /boot emmc defaults defaults",
312 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
313 self._info["fstab"] = common.LoadRecoveryFSTab(lambda x : "\n".join(x),
314 2, dummy_fstab)
315
316 def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
317 loc = os.path.join(self._tempdir, prefix, name)
318 if not os.path.exists(os.path.dirname(loc)):
319 os.makedirs(os.path.dirname(loc))
320 with open(loc, "w+") as f:
321 f.write(data)
322
323 def test_full_recovery(self):
324 recovery_image = common.File("recovery.img", "recovery");
325 boot_image = common.File("boot.img", "boot");
326 self._info["full_recovery_image"] = "true"
327
328 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
329 recovery_image, boot_image, self._info)
330 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
331 self._info)
332
333 def test_recovery_from_boot(self):
334 recovery_image = common.File("recovery.img", "recovery");
335 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
336 boot_image = common.File("boot.img", "boot");
337 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
338
339 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
340 recovery_image, boot_image, self._info)
341 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
342 self._info)
343 # Validate 'recovery-from-boot' with bonus argument.
344 self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
345 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
346 recovery_image, boot_image, self._info)
347 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
348 self._info)
349
350 def tearDown(self):
351 shutil.rmtree(self._tempdir)