[incfs] Make native library extraction async

IncrementalService can create the library files beforehand, but
delay filling in their data. As it takes quite a while in
general (over a second in cases when the phone is busy), it's
better to run the unzipping and filling in a separate thread
and only make sure it finishes before the whole installation
process is complete.
This speeds up the megacity.apk installation by ~250-300ms,
1000-1100ms -> 750-800ms

Bug: 153513507
Test: adb install megacity.apk

Change-Id: Ia44f7e45b9e0abaebdfb6fe5352f9dcf29ab4ece
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp
index eb65a2d..400388e 100644
--- a/services/incremental/IncrementalService.cpp
+++ b/services/incremental/IncrementalService.cpp
@@ -26,23 +26,18 @@
 #include <android-base/strings.h>
 #include <android/content/pm/IDataLoaderStatusListener.h>
 #include <android/os/IVold.h>
-#include <androidfw/ZipFileRO.h>
-#include <androidfw/ZipUtils.h>
 #include <binder/BinderService.h>
 #include <binder/Nullable.h>
 #include <binder/ParcelFileDescriptor.h>
 #include <binder/Status.h>
 #include <sys/stat.h>
 #include <uuid/uuid.h>
-#include <zlib.h>
 
 #include <charconv>
 #include <ctime>
 #include <filesystem>
 #include <iterator>
 #include <span>
-#include <stack>
-#include <thread>
 #include <type_traits>
 
 #include "Metadata.pb.h"
@@ -252,6 +247,10 @@
     if (!mAppOpsManager) {
         LOG(FATAL) << "AppOpsManager is unavailable";
     }
+
+    mJobQueue.reserve(16);
+    mJobProcessor = std::thread([this]() { runJobProcessing(); });
+
     mountExistingImages();
 }
 
@@ -259,7 +258,14 @@
     return IncFs_FileIdFromMetadata({(const char*)metadata.data(), metadata.size()});
 }
 
-IncrementalService::~IncrementalService() = default;
+IncrementalService::~IncrementalService() {
+    {
+        std::lock_guard lock(mJobMutex);
+        mRunning = false;
+    }
+    mJobCondition.notify_all();
+    mJobProcessor.join();
+}
 
 inline const char* toString(TimePoint t) {
     using SystemClock = std::chrono::system_clock;
@@ -1158,8 +1164,6 @@
 bool IncrementalService::configureNativeBinaries(StorageId storage, std::string_view apkFullPath,
                                                  std::string_view libDirRelativePath,
                                                  std::string_view abi) {
-    namespace sc = std::chrono;
-    using Clock = sc::steady_clock;
     auto start = Clock::now();
 
     const auto ifs = getIfs(storage);
@@ -1176,33 +1180,35 @@
     }
 
     auto mkDirsTs = Clock::now();
-
-    std::unique_ptr<ZipFileRO> zipFile(ZipFileRO::open(path::c_str(apkFullPath)));
-    if (!zipFile) {
+    ZipArchiveHandle zipFileHandle;
+    if (OpenArchive(path::c_str(apkFullPath), &zipFileHandle)) {
         LOG(ERROR) << "Failed to open zip file at " << apkFullPath;
         return false;
     }
+
+    // Need a shared pointer: will be passing it into all unpacking jobs.
+    std::shared_ptr<ZipArchive> zipFile(zipFileHandle, [](ZipArchiveHandle h) { CloseArchive(h); });
     void* cookie = nullptr;
     const auto libFilePrefix = path::join(constants().libDir, abi);
-    if (!zipFile->startIteration(&cookie, libFilePrefix.c_str() /* prefix */,
-                                 constants().libSuffix.data() /* suffix */)) {
+    if (StartIteration(zipFile.get(), &cookie, libFilePrefix, constants().libSuffix)) {
         LOG(ERROR) << "Failed to start zip iteration for " << apkFullPath;
         return false;
     }
-    auto endIteration = [&zipFile](void* cookie) { zipFile->endIteration(cookie); };
+    auto endIteration = [](void* cookie) { EndIteration(cookie); };
     auto iterationCleaner = std::unique_ptr<void, decltype(endIteration)>(cookie, endIteration);
 
     auto openZipTs = Clock::now();
 
-    std::vector<IncFsDataBlock> instructions;
-    ZipEntryRO entry = nullptr;
-    while ((entry = zipFile->nextEntry(cookie)) != nullptr) {
-        auto startFileTs = Clock::now();
-
-        char fileName[PATH_MAX];
-        if (zipFile->getEntryFileName(entry, fileName, sizeof(fileName))) {
+    std::vector<Job> jobQueue;
+    ZipEntry entry;
+    std::string_view fileName;
+    while (!Next(cookie, &entry, &fileName)) {
+        if (fileName.empty()) {
             continue;
         }
+
+        auto startFileTs = Clock::now();
+
         const auto libName = path::basename(fileName);
         const auto targetLibPath = path::join(libDirRelativePath, libName);
         const auto targetLibPathAbsolute = normalizePathToStorage(ifs, storage, targetLibPath);
@@ -1216,16 +1222,9 @@
             continue;
         }
 
-        uint32_t uncompressedLen, compressedLen;
-        if (!zipFile->getEntryInfo(entry, nullptr, &uncompressedLen, &compressedLen, nullptr,
-                                   nullptr, nullptr)) {
-            LOG(ERROR) << "Failed to read native lib entry: " << fileName;
-            return false;
-        }
-
         // Create new lib file without signature info
         incfs::NewFileParams libFileParams = {
-                .size = uncompressedLen,
+                .size = entry.uncompressed_length,
                 .signature = {},
                 // Metadata of the new lib file is its relative path
                 .metadata = {targetLibPath.c_str(), (IncFsSize)targetLibPath.size()},
@@ -1241,81 +1240,152 @@
         auto makeFileTs = Clock::now();
 
         // If it is a zero-byte file, skip data writing
-        if (uncompressedLen == 0) {
+        if (entry.uncompressed_length == 0) {
             if (sEnablePerfLogging) {
-                LOG(INFO) << "incfs: Extracted " << libName << "(" << compressedLen << " -> "
-                          << uncompressedLen << " bytes): " << elapsedMcs(startFileTs, makeFileTs)
-                          << "mcs, make: " << elapsedMcs(startFileTs, makeFileTs);
+                LOG(INFO) << "incfs: Extracted " << libName
+                          << "(0 bytes): " << elapsedMcs(startFileTs, makeFileTs) << "mcs";
             }
             continue;
         }
 
-        // Write extracted data to new file
-        // NOTE: don't zero-initialize memory, it may take a while
-        auto libData = std::unique_ptr<uint8_t[]>(new uint8_t[uncompressedLen]);
-        if (!zipFile->uncompressEntry(entry, libData.get(), uncompressedLen)) {
-            LOG(ERROR) << "Failed to extract native lib zip entry: " << fileName;
-            return false;
-        }
-
-        auto extractFileTs = Clock::now();
-
-        const auto writeFd = mIncFs->openForSpecialOps(ifs->control, libFileId);
-        if (!writeFd.ok()) {
-            LOG(ERROR) << "Failed to open write fd for: " << targetLibPath << " errno: " << writeFd;
-            return false;
-        }
-
-        auto openFileTs = Clock::now();
-
-        const int numBlocks = (uncompressedLen + constants().blockSize - 1) / constants().blockSize;
-        instructions.clear();
-        instructions.reserve(numBlocks);
-        auto remainingData = std::span(libData.get(), uncompressedLen);
-        for (int i = 0; i < numBlocks; i++) {
-            const auto blockSize = std::min<uint16_t>(constants().blockSize, remainingData.size());
-            auto inst = IncFsDataBlock{
-                    .fileFd = writeFd.get(),
-                    .pageIndex = static_cast<IncFsBlockIndex>(i),
-                    .compression = INCFS_COMPRESSION_KIND_NONE,
-                    .kind = INCFS_BLOCK_KIND_DATA,
-                    .dataSize = blockSize,
-                    .data = reinterpret_cast<const char*>(remainingData.data()),
-            };
-            instructions.push_back(inst);
-            remainingData = remainingData.subspan(blockSize);
-        }
-        auto prepareInstsTs = Clock::now();
-
-        size_t res = mIncFs->writeBlocks(instructions);
-        if (res != instructions.size()) {
-            LOG(ERROR) << "Failed to write data into: " << targetLibPath;
-            return false;
-        }
+        jobQueue.emplace_back([this, zipFile, entry, ifs, libFileId,
+                               libPath = std::move(targetLibPath), makeFileTs]() mutable {
+            extractZipFile(ifs, zipFile.get(), entry, libFileId, libPath, makeFileTs);
+        });
 
         if (sEnablePerfLogging) {
-            auto endFileTs = Clock::now();
-            LOG(INFO) << "incfs: Extracted " << libName << "(" << compressedLen << " -> "
-                      << uncompressedLen << " bytes): " << elapsedMcs(startFileTs, endFileTs)
-                      << "mcs, make: " << elapsedMcs(startFileTs, makeFileTs)
-                      << " extract: " << elapsedMcs(makeFileTs, extractFileTs)
-                      << " open: " << elapsedMcs(extractFileTs, openFileTs)
-                      << " prepare: " << elapsedMcs(openFileTs, prepareInstsTs)
-                      << " write:" << elapsedMcs(prepareInstsTs, endFileTs);
+            auto prepareJobTs = Clock::now();
+            LOG(INFO) << "incfs: Processed " << libName << ": "
+                      << elapsedMcs(startFileTs, prepareJobTs)
+                      << "mcs, make file: " << elapsedMcs(startFileTs, makeFileTs)
+                      << " prepare job: " << elapsedMcs(makeFileTs, prepareJobTs);
         }
     }
 
+    auto processedTs = Clock::now();
+
+    if (!jobQueue.empty()) {
+        {
+            std::lock_guard lock(mJobMutex);
+            if (mRunning) {
+                auto& existingJobs = mJobQueue[storage];
+                if (existingJobs.empty()) {
+                    existingJobs = std::move(jobQueue);
+                } else {
+                    existingJobs.insert(existingJobs.end(), std::move_iterator(jobQueue.begin()),
+                                        std::move_iterator(jobQueue.end()));
+                }
+            }
+        }
+        mJobCondition.notify_all();
+    }
+
     if (sEnablePerfLogging) {
         auto end = Clock::now();
         LOG(INFO) << "incfs: configureNativeBinaries complete in " << elapsedMcs(start, end)
                   << "mcs, make dirs: " << elapsedMcs(start, mkDirsTs)
                   << " open zip: " << elapsedMcs(mkDirsTs, openZipTs)
-                  << " extract all: " << elapsedMcs(openZipTs, end);
+                  << " make files: " << elapsedMcs(openZipTs, processedTs)
+                  << " schedule jobs: " << elapsedMcs(processedTs, end);
     }
 
     return true;
 }
 
+void IncrementalService::extractZipFile(const IfsMountPtr& ifs, ZipArchiveHandle zipFile,
+                                        ZipEntry& entry, const incfs::FileId& libFileId,
+                                        std::string_view targetLibPath,
+                                        Clock::time_point scheduledTs) {
+    auto libName = path::basename(targetLibPath);
+    auto startedTs = Clock::now();
+
+    // Write extracted data to new file
+    // NOTE: don't zero-initialize memory, it may take a while for nothing
+    auto libData = std::unique_ptr<uint8_t[]>(new uint8_t[entry.uncompressed_length]);
+    if (ExtractToMemory(zipFile, &entry, libData.get(), entry.uncompressed_length)) {
+        LOG(ERROR) << "Failed to extract native lib zip entry: " << libName;
+        return;
+    }
+
+    auto extractFileTs = Clock::now();
+
+    const auto writeFd = mIncFs->openForSpecialOps(ifs->control, libFileId);
+    if (!writeFd.ok()) {
+        LOG(ERROR) << "Failed to open write fd for: " << targetLibPath << " errno: " << writeFd;
+        return;
+    }
+
+    auto openFileTs = Clock::now();
+    const int numBlocks =
+            (entry.uncompressed_length + constants().blockSize - 1) / constants().blockSize;
+    std::vector<IncFsDataBlock> instructions(numBlocks);
+    auto remainingData = std::span(libData.get(), entry.uncompressed_length);
+    for (int i = 0; i < numBlocks; i++) {
+        const auto blockSize = std::min<uint16_t>(constants().blockSize, remainingData.size());
+        instructions[i] = IncFsDataBlock{
+                .fileFd = writeFd.get(),
+                .pageIndex = static_cast<IncFsBlockIndex>(i),
+                .compression = INCFS_COMPRESSION_KIND_NONE,
+                .kind = INCFS_BLOCK_KIND_DATA,
+                .dataSize = blockSize,
+                .data = reinterpret_cast<const char*>(remainingData.data()),
+        };
+        remainingData = remainingData.subspan(blockSize);
+    }
+    auto prepareInstsTs = Clock::now();
+
+    size_t res = mIncFs->writeBlocks(instructions);
+    if (res != instructions.size()) {
+        LOG(ERROR) << "Failed to write data into: " << targetLibPath;
+        return;
+    }
+
+    if (sEnablePerfLogging) {
+        auto endFileTs = Clock::now();
+        LOG(INFO) << "incfs: Extracted " << libName << "(" << entry.compressed_length << " -> "
+                  << entry.uncompressed_length << " bytes): " << elapsedMcs(startedTs, endFileTs)
+                  << "mcs, scheduling delay: " << elapsedMcs(scheduledTs, startedTs)
+                  << " extract: " << elapsedMcs(startedTs, extractFileTs)
+                  << " open: " << elapsedMcs(extractFileTs, openFileTs)
+                  << " prepare: " << elapsedMcs(openFileTs, prepareInstsTs)
+                  << " write: " << elapsedMcs(prepareInstsTs, endFileTs);
+    }
+}
+
+bool IncrementalService::waitForNativeBinariesExtraction(StorageId storage) {
+    std::unique_lock lock(mJobMutex);
+    mJobCondition.wait(lock, [this, storage] {
+        return !mRunning ||
+                (mPendingJobsStorage != storage && mJobQueue.find(storage) == mJobQueue.end());
+    });
+    return mPendingJobsStorage != storage && mJobQueue.find(storage) == mJobQueue.end();
+}
+
+void IncrementalService::runJobProcessing() {
+    for (;;) {
+        std::unique_lock lock(mJobMutex);
+        mJobCondition.wait(lock, [this]() { return !mRunning || !mJobQueue.empty(); });
+        if (!mRunning) {
+            return;
+        }
+
+        auto it = mJobQueue.begin();
+        mPendingJobsStorage = it->first;
+        auto queue = std::move(it->second);
+        mJobQueue.erase(it);
+        lock.unlock();
+
+        for (auto&& job : queue) {
+            job();
+        }
+
+        lock.lock();
+        mPendingJobsStorage = kInvalidStorageId;
+        lock.unlock();
+        mJobCondition.notify_all();
+    }
+}
+
 void IncrementalService::registerAppOpsCallback(const std::string& packageName) {
     sp<IAppOpsCallback> listener;
     {
@@ -1328,7 +1398,8 @@
         listener = cb;
     }
 
-    mAppOpsManager->startWatchingMode(AppOpsManager::OP_GET_USAGE_STATS, String16(packageName.c_str()), listener);
+    mAppOpsManager->startWatchingMode(AppOpsManager::OP_GET_USAGE_STATS,
+                                      String16(packageName.c_str()), listener);
 }
 
 bool IncrementalService::unregisterAppOpsCallback(const std::string& packageName) {