Wrap zipfile.write(), writestr() and close()

In order to work around the zip 2GiB limit, we need to wrap the related
functions in zipfile. Calls to those functions should always be replaced
with calls to the wrappers instead.

Bug: 18015246
Change-Id: I499574cee51ec4804bc10cbefe0b17940afed918
(cherry picked from commit 2ed665a033c587b276b1615516e5354e2ace47cd)
diff --git a/tools/releasetools/test_common.py b/tools/releasetools/test_common.py
index 5fdc132..f28934d 100644
--- a/tools/releasetools/test_common.py
+++ b/tools/releasetools/test_common.py
@@ -29,15 +29,54 @@
     data[begin:end] = os.urandom(block_size)
   return "".join(data)
 
+def get_2gb_string():
+  kilobytes = 1024
+  megabytes = 1024 * kilobytes
+  gigabytes = 1024 * megabytes
+
+  size = int(2 * gigabytes + 1)
+  block_size = 4 * kilobytes
+  step_size = 4 * megabytes
+  two_gb_string = random_string_with_holes(
+        size, block_size, step_size)
+  return two_gb_string
+
 
 class CommonZipTest(unittest.TestCase):
+  def _verify(self, zip_file, zip_file_name, arcname, contents,
+              test_file_name=None, expected_stat=None, expected_mode=0o644,
+              expected_compress_type=zipfile.ZIP_STORED):
+    # Verify the stat if present.
+    if test_file_name is not None:
+      new_stat = os.stat(test_file_name)
+      self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
+      self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
+
+    # Reopen the zip file to verify.
+    zip_file = zipfile.ZipFile(zip_file_name, "r")
+
+    # Verify the timestamp.
+    info = zip_file.getinfo(arcname)
+    self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
+
+    # Verify the file mode.
+    mode = (info.external_attr >> 16) & 0o777
+    self.assertEqual(mode, expected_mode)
+
+    # Verify the compress type.
+    self.assertEqual(info.compress_type, expected_compress_type)
+
+    # Verify the zip contents.
+    self.assertEqual(zip_file.read(arcname), contents)
+    self.assertIsNone(zip_file.testzip())
+
   def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
     extra_zipwrite_args = dict(extra_zipwrite_args or {})
 
     test_file = tempfile.NamedTemporaryFile(delete=False)
-    zip_file = tempfile.NamedTemporaryFile(delete=False)
-
     test_file_name = test_file.name
+
+    zip_file = tempfile.NamedTemporaryFile(delete=False)
     zip_file_name = zip_file.name
 
     # File names within an archive strip the leading slash.
@@ -52,31 +91,100 @@
       test_file.write(contents)
       test_file.close()
 
-      old_stat = os.stat(test_file_name)
+      expected_stat = os.stat(test_file_name)
       expected_mode = extra_zipwrite_args.get("perms", 0o644)
-
+      expected_compress_type = extra_zipwrite_args.get("compress_type",
+                                                       zipfile.ZIP_STORED)
       time.sleep(5)  # Make sure the atime/mtime will change measurably.
 
       common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
+      common.ZipClose(zip_file)
 
-      new_stat = os.stat(test_file_name)
-      self.assertEqual(int(old_stat.st_mode), int(new_stat.st_mode))
-      self.assertEqual(int(old_stat.st_mtime), int(new_stat.st_mtime))
-      self.assertIsNone(zip_file.testzip())
-
-      zip_file.close()
-      zip_file = zipfile.ZipFile(zip_file_name, "r")
-      info = zip_file.getinfo(arcname)
-
-      self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
-      mode = (info.external_attr >> 16) & 0o777
-      self.assertEqual(mode, expected_mode)
-      self.assertEqual(zip_file.read(arcname), contents)
-      self.assertIsNone(zip_file.testzip())
+      self._verify(zip_file, zip_file_name, arcname, contents, test_file_name,
+                   expected_stat, expected_mode, expected_compress_type)
     finally:
       os.remove(test_file_name)
       os.remove(zip_file_name)
 
+  def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
+    extra_args = dict(extra_args or {})
+
+    zip_file = tempfile.NamedTemporaryFile(delete=False)
+    zip_file_name = zip_file.name
+    zip_file.close()
+
+    zip_file = zipfile.ZipFile(zip_file_name, "w")
+
+    try:
+      expected_compress_type = extra_args.get("compress_type",
+                                              zipfile.ZIP_STORED)
+      time.sleep(5)  # Make sure the atime/mtime will change measurably.
+
+      if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
+        zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname)
+      else:
+        zinfo = zinfo_or_arcname
+      arcname = zinfo.filename
+
+      common.ZipWriteStr(zip_file, zinfo, contents, **extra_args)
+      common.ZipClose(zip_file)
+
+      self._verify(zip_file, zip_file_name, arcname, contents,
+                   expected_compress_type=expected_compress_type)
+    finally:
+      os.remove(zip_file_name)
+
+  def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
+    extra_args = dict(extra_args or {})
+
+    zip_file = tempfile.NamedTemporaryFile(delete=False)
+    zip_file_name = zip_file.name
+
+    test_file = tempfile.NamedTemporaryFile(delete=False)
+    test_file_name = test_file.name
+
+    arcname_large = test_file_name
+    arcname_small = "bar"
+
+    # File names within an archive strip the leading slash.
+    if arcname_large[0] == "/":
+      arcname_large = arcname_large[1:]
+
+    zip_file.close()
+    zip_file = zipfile.ZipFile(zip_file_name, "w")
+
+    try:
+      test_file.write(large)
+      test_file.close()
+
+      expected_stat = os.stat(test_file_name)
+      expected_mode = 0o644
+      expected_compress_type = extra_args.get("compress_type",
+                                              zipfile.ZIP_STORED)
+      time.sleep(5)  # Make sure the atime/mtime will change measurably.
+
+      common.ZipWrite(zip_file, test_file_name, **extra_args)
+      common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
+      common.ZipClose(zip_file)
+
+      # Verify the contents written by ZipWrite().
+      self._verify(zip_file, zip_file_name, arcname_large, large,
+                   test_file_name, expected_stat, expected_mode,
+                   expected_compress_type)
+
+      # Verify the contents written by ZipWriteStr().
+      self._verify(zip_file, zip_file_name, arcname_small, small,
+                   expected_compress_type=expected_compress_type)
+    finally:
+      os.remove(zip_file_name)
+      os.remove(test_file_name)
+
+  def _test_reset_ZIP64_LIMIT(self, func, *args):
+    default_limit = (1 << 31) - 1
+    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
+    func(*args)
+    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
+
   def test_ZipWrite(self):
     file_contents = os.urandom(1024)
     self._test_ZipWrite(file_contents)
@@ -88,23 +196,64 @@
         "perms": 0o777,
         "compress_type": zipfile.ZIP_DEFLATED,
     })
+    self._test_ZipWrite(file_contents, {
+        "arcname": "foobar",
+        "perms": 0o700,
+        "compress_type": zipfile.ZIP_STORED,
+    })
 
   def test_ZipWrite_large_file(self):
-    kilobytes = 1024
-    megabytes = 1024 * kilobytes
-    gigabytes = 1024 * megabytes
-
-    size = int(2 * gigabytes + 1)
-    block_size = 4 * kilobytes
-    step_size = 4 * megabytes
-    file_contents = random_string_with_holes(
-        size, block_size, step_size)
+    file_contents = get_2gb_string()
     self._test_ZipWrite(file_contents, {
         "compress_type": zipfile.ZIP_DEFLATED,
     })
 
   def test_ZipWrite_resets_ZIP64_LIMIT(self):
-    default_limit = (1 << 31) - 1
-    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
-    self._test_ZipWrite('')
-    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
+    self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
+
+  def test_ZipWriteStr(self):
+    random_string = os.urandom(1024)
+    # Passing arcname
+    self._test_ZipWriteStr("foo", random_string)
+
+    # Passing zinfo
+    zinfo = zipfile.ZipInfo(filename="foo")
+    self._test_ZipWriteStr(zinfo, random_string)
+
+    # Timestamp in the zinfo should be overwritten.
+    zinfo.date_time = (2015, 3, 1, 15, 30, 0)
+    self._test_ZipWriteStr(zinfo, random_string)
+
+  def test_ZipWriteStr_with_opts(self):
+    random_string = os.urandom(1024)
+    # Passing arcname
+    self._test_ZipWriteStr("foo", random_string, {
+        "compress_type": zipfile.ZIP_DEFLATED,
+    })
+    self._test_ZipWriteStr("foo", random_string, {
+        "compress_type": zipfile.ZIP_STORED,
+    })
+
+    # Passing zinfo
+    zinfo = zipfile.ZipInfo(filename="foo")
+    self._test_ZipWriteStr(zinfo, random_string, {
+        "compress_type": zipfile.ZIP_DEFLATED,
+    })
+    self._test_ZipWriteStr(zinfo, random_string, {
+        "compress_type": zipfile.ZIP_STORED,
+    })
+
+  def test_ZipWriteStr_large_file(self):
+    # zipfile.writestr() doesn't work when the str size is over 2GiB even with
+    # the workaround. We will only test the case of writing a string into a
+    # large archive.
+    long_string = get_2gb_string()
+    short_string = os.urandom(1024)
+    self._test_ZipWriteStr_large_file(long_string, short_string, {
+        "compress_type": zipfile.ZIP_DEFLATED,
+    })
+
+  def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
+    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
+    zinfo = zipfile.ZipInfo(filename="foo")
+    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")