blob: f28934d257ed5e0d3f6065d2326c0b6fe30e1dfa [file] [log] [blame]
Dan Albert8e0178d2015-01-27 15:53:15 -08001#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import os
17import tempfile
18import time
19import unittest
20import zipfile
21
22import common
23
24
25def random_string_with_holes(size, block_size, step_size):
26 data = ["\0"] * size
27 for begin in range(0, size, step_size):
28 end = begin + block_size
29 data[begin:end] = os.urandom(block_size)
30 return "".join(data)
31
Tao Baof3282b42015-04-01 11:21:55 -070032def get_2gb_string():
33 kilobytes = 1024
34 megabytes = 1024 * kilobytes
35 gigabytes = 1024 * megabytes
36
37 size = int(2 * gigabytes + 1)
38 block_size = 4 * kilobytes
39 step_size = 4 * megabytes
40 two_gb_string = random_string_with_holes(
41 size, block_size, step_size)
42 return two_gb_string
43
Dan Albert8e0178d2015-01-27 15:53:15 -080044
45class CommonZipTest(unittest.TestCase):
Tao Baof3282b42015-04-01 11:21:55 -070046 def _verify(self, zip_file, zip_file_name, arcname, contents,
47 test_file_name=None, expected_stat=None, expected_mode=0o644,
48 expected_compress_type=zipfile.ZIP_STORED):
49 # Verify the stat if present.
50 if test_file_name is not None:
51 new_stat = os.stat(test_file_name)
52 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
53 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
54
55 # Reopen the zip file to verify.
56 zip_file = zipfile.ZipFile(zip_file_name, "r")
57
58 # Verify the timestamp.
59 info = zip_file.getinfo(arcname)
60 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
61
62 # Verify the file mode.
63 mode = (info.external_attr >> 16) & 0o777
64 self.assertEqual(mode, expected_mode)
65
66 # Verify the compress type.
67 self.assertEqual(info.compress_type, expected_compress_type)
68
69 # Verify the zip contents.
70 self.assertEqual(zip_file.read(arcname), contents)
71 self.assertIsNone(zip_file.testzip())
72
Dan Albert8e0178d2015-01-27 15:53:15 -080073 def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
74 extra_zipwrite_args = dict(extra_zipwrite_args or {})
75
76 test_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080077 test_file_name = test_file.name
Tao Baof3282b42015-04-01 11:21:55 -070078
79 zip_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080080 zip_file_name = zip_file.name
81
82 # File names within an archive strip the leading slash.
83 arcname = extra_zipwrite_args.get("arcname", test_file_name)
84 if arcname[0] == "/":
85 arcname = arcname[1:]
86
87 zip_file.close()
88 zip_file = zipfile.ZipFile(zip_file_name, "w")
89
90 try:
91 test_file.write(contents)
92 test_file.close()
93
Tao Baof3282b42015-04-01 11:21:55 -070094 expected_stat = os.stat(test_file_name)
Dan Albert8e0178d2015-01-27 15:53:15 -080095 expected_mode = extra_zipwrite_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -070096 expected_compress_type = extra_zipwrite_args.get("compress_type",
97 zipfile.ZIP_STORED)
Dan Albert8e0178d2015-01-27 15:53:15 -080098 time.sleep(5) # Make sure the atime/mtime will change measurably.
99
100 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
Tao Baof3282b42015-04-01 11:21:55 -0700101 common.ZipClose(zip_file)
Dan Albert8e0178d2015-01-27 15:53:15 -0800102
Tao Baof3282b42015-04-01 11:21:55 -0700103 self._verify(zip_file, zip_file_name, arcname, contents, test_file_name,
104 expected_stat, expected_mode, expected_compress_type)
Dan Albert8e0178d2015-01-27 15:53:15 -0800105 finally:
106 os.remove(test_file_name)
107 os.remove(zip_file_name)
108
Tao Baof3282b42015-04-01 11:21:55 -0700109 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
110 extra_args = dict(extra_args or {})
111
112 zip_file = tempfile.NamedTemporaryFile(delete=False)
113 zip_file_name = zip_file.name
114 zip_file.close()
115
116 zip_file = zipfile.ZipFile(zip_file_name, "w")
117
118 try:
119 expected_compress_type = extra_args.get("compress_type",
120 zipfile.ZIP_STORED)
121 time.sleep(5) # Make sure the atime/mtime will change measurably.
122
123 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
124 zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname)
125 else:
126 zinfo = zinfo_or_arcname
127 arcname = zinfo.filename
128
129 common.ZipWriteStr(zip_file, zinfo, contents, **extra_args)
130 common.ZipClose(zip_file)
131
132 self._verify(zip_file, zip_file_name, arcname, contents,
133 expected_compress_type=expected_compress_type)
134 finally:
135 os.remove(zip_file_name)
136
137 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
138 extra_args = dict(extra_args or {})
139
140 zip_file = tempfile.NamedTemporaryFile(delete=False)
141 zip_file_name = zip_file.name
142
143 test_file = tempfile.NamedTemporaryFile(delete=False)
144 test_file_name = test_file.name
145
146 arcname_large = test_file_name
147 arcname_small = "bar"
148
149 # File names within an archive strip the leading slash.
150 if arcname_large[0] == "/":
151 arcname_large = arcname_large[1:]
152
153 zip_file.close()
154 zip_file = zipfile.ZipFile(zip_file_name, "w")
155
156 try:
157 test_file.write(large)
158 test_file.close()
159
160 expected_stat = os.stat(test_file_name)
161 expected_mode = 0o644
162 expected_compress_type = extra_args.get("compress_type",
163 zipfile.ZIP_STORED)
164 time.sleep(5) # Make sure the atime/mtime will change measurably.
165
166 common.ZipWrite(zip_file, test_file_name, **extra_args)
167 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
168 common.ZipClose(zip_file)
169
170 # Verify the contents written by ZipWrite().
171 self._verify(zip_file, zip_file_name, arcname_large, large,
172 test_file_name, expected_stat, expected_mode,
173 expected_compress_type)
174
175 # Verify the contents written by ZipWriteStr().
176 self._verify(zip_file, zip_file_name, arcname_small, small,
177 expected_compress_type=expected_compress_type)
178 finally:
179 os.remove(zip_file_name)
180 os.remove(test_file_name)
181
182 def _test_reset_ZIP64_LIMIT(self, func, *args):
183 default_limit = (1 << 31) - 1
184 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
185 func(*args)
186 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
187
Dan Albert8e0178d2015-01-27 15:53:15 -0800188 def test_ZipWrite(self):
189 file_contents = os.urandom(1024)
190 self._test_ZipWrite(file_contents)
191
192 def test_ZipWrite_with_opts(self):
193 file_contents = os.urandom(1024)
194 self._test_ZipWrite(file_contents, {
195 "arcname": "foobar",
196 "perms": 0o777,
197 "compress_type": zipfile.ZIP_DEFLATED,
198 })
Tao Baof3282b42015-04-01 11:21:55 -0700199 self._test_ZipWrite(file_contents, {
200 "arcname": "foobar",
201 "perms": 0o700,
202 "compress_type": zipfile.ZIP_STORED,
203 })
Dan Albert8e0178d2015-01-27 15:53:15 -0800204
205 def test_ZipWrite_large_file(self):
Tao Baof3282b42015-04-01 11:21:55 -0700206 file_contents = get_2gb_string()
Dan Albert8e0178d2015-01-27 15:53:15 -0800207 self._test_ZipWrite(file_contents, {
208 "compress_type": zipfile.ZIP_DEFLATED,
209 })
210
211 def test_ZipWrite_resets_ZIP64_LIMIT(self):
Tao Baof3282b42015-04-01 11:21:55 -0700212 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
213
214 def test_ZipWriteStr(self):
215 random_string = os.urandom(1024)
216 # Passing arcname
217 self._test_ZipWriteStr("foo", random_string)
218
219 # Passing zinfo
220 zinfo = zipfile.ZipInfo(filename="foo")
221 self._test_ZipWriteStr(zinfo, random_string)
222
223 # Timestamp in the zinfo should be overwritten.
224 zinfo.date_time = (2015, 3, 1, 15, 30, 0)
225 self._test_ZipWriteStr(zinfo, random_string)
226
227 def test_ZipWriteStr_with_opts(self):
228 random_string = os.urandom(1024)
229 # Passing arcname
230 self._test_ZipWriteStr("foo", random_string, {
231 "compress_type": zipfile.ZIP_DEFLATED,
232 })
233 self._test_ZipWriteStr("foo", random_string, {
234 "compress_type": zipfile.ZIP_STORED,
235 })
236
237 # Passing zinfo
238 zinfo = zipfile.ZipInfo(filename="foo")
239 self._test_ZipWriteStr(zinfo, random_string, {
240 "compress_type": zipfile.ZIP_DEFLATED,
241 })
242 self._test_ZipWriteStr(zinfo, random_string, {
243 "compress_type": zipfile.ZIP_STORED,
244 })
245
246 def test_ZipWriteStr_large_file(self):
247 # zipfile.writestr() doesn't work when the str size is over 2GiB even with
248 # the workaround. We will only test the case of writing a string into a
249 # large archive.
250 long_string = get_2gb_string()
251 short_string = os.urandom(1024)
252 self._test_ZipWriteStr_large_file(long_string, short_string, {
253 "compress_type": zipfile.ZIP_DEFLATED,
254 })
255
256 def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
257 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
258 zinfo = zipfile.ZipInfo(filename="foo")
259 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")