[incremental/pm] register progress listener

Incremental Serivce periodically polls loading progress and sends to
Package Manager Service. Package Manager provides APIs for other
interested parties to listen to the loading progress.

BUG: 165841827
Test: unit test
Change-Id: I44b9e17c2240b9efe53bc09fc728b6671f1f7dfe
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp
index 447ee55..10a508b 100644
--- a/services/incremental/IncrementalService.cpp
+++ b/services/incremental/IncrementalService.cpp
@@ -275,6 +275,7 @@
         mJni(sm.getJni()),
         mLooper(sm.getLooper()),
         mTimedQueue(sm.getTimedQueue()),
+        mProgressUpdateJobQueue(sm.getProgressUpdateJobQueue()),
         mFs(sm.getFs()),
         mIncrementalDir(rootDir) {
     CHECK(mVold) << "Vold service is unavailable";
@@ -283,6 +284,7 @@
     CHECK(mJni) << "JNI is unavailable";
     CHECK(mLooper) << "Looper is unavailable";
     CHECK(mTimedQueue) << "TimedQueue is unavailable";
+    CHECK(mProgressUpdateJobQueue) << "mProgressUpdateJobQueue is unavailable";
     CHECK(mFs) << "Fs is unavailable";
 
     mJobQueue.reserve(16);
@@ -308,6 +310,7 @@
     mJobProcessor.join();
     mCmdLooperThread.join();
     mTimedQueue->stop();
+    mProgressUpdateJobQueue->stop();
     // Ensure that mounts are destroyed while the service is still valid.
     mBindsByPath.clear();
     mMounts.clear();
@@ -1744,6 +1747,35 @@
     return (float)filledBlocks / (float)totalBlocks;
 }
 
+bool IncrementalService::updateLoadingProgress(
+        StorageId storage, const StorageLoadingProgressListener& progressListener) {
+    const auto progress = getLoadingProgress(storage);
+    if (progress < 0) {
+        // Failed to get progress from incfs, abort.
+        return false;
+    }
+    progressListener->onStorageLoadingProgressChanged(storage, progress);
+    if (progress > 1 - 0.001f) {
+        // Stop updating progress once it is fully loaded
+        return true;
+    }
+    static constexpr auto kProgressUpdateInterval = 1000ms;
+    addTimedJob(*mProgressUpdateJobQueue, storage, kProgressUpdateInterval /* repeat after 1s */,
+                [storage, progressListener, this]() {
+                    updateLoadingProgress(storage, progressListener);
+                });
+    return true;
+}
+
+bool IncrementalService::registerLoadingProgressListener(
+        StorageId storage, const StorageLoadingProgressListener& progressListener) {
+    return updateLoadingProgress(storage, progressListener);
+}
+
+bool IncrementalService::unregisterLoadingProgressListener(StorageId storage) {
+    return removeTimedJobs(*mProgressUpdateJobQueue, storage);
+}
+
 bool IncrementalService::perfLoggingEnabled() {
     static const bool enabled = base::GetBoolProperty("incremental.perflogging", false);
     return enabled;
@@ -1826,18 +1858,21 @@
     }
 }
 
-void IncrementalService::addTimedJob(MountId id, Milliseconds after, Job what) {
+bool IncrementalService::addTimedJob(TimedQueueWrapper& timedQueue, MountId id, Milliseconds after,
+                                     Job what) {
     if (id == kInvalidStorageId) {
-        return;
+        return false;
     }
-    mTimedQueue->addJob(id, after, std::move(what));
+    timedQueue.addJob(id, after, std::move(what));
+    return true;
 }
 
-void IncrementalService::removeTimedJobs(MountId id) {
+bool IncrementalService::removeTimedJobs(TimedQueueWrapper& timedQueue, MountId id) {
     if (id == kInvalidStorageId) {
-        return;
+        return false;
     }
-    mTimedQueue->removeJobs(id);
+    timedQueue.removeJobs(id);
+    return true;
 }
 
 IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id,
@@ -1879,7 +1914,7 @@
         mHealthPath.clear();
         unregisterFromPendingReads();
         resetHealthControl();
-        mService.removeTimedJobs(mId);
+        mService.removeTimedJobs(*mService.mTimedQueue, mId);
     }
 
     requestDestroy();
@@ -2169,7 +2204,8 @@
         }
         LOG(DEBUG) << id() << ": updateHealthStatus in " << double(checkBackAfter.count()) / 1000.0
                    << "secs";
-        mService.addTimedJob(id(), checkBackAfter, [this]() { updateHealthStatus(); });
+        mService.addTimedJob(*mService.mTimedQueue, id(), checkBackAfter,
+                             [this]() { updateHealthStatus(); });
     }
 
     // With kTolerance we are expecting these to execute before the next update.