Healthcheck: proper job allocation and test.

Bug: 153874006
Test: atest PackageManagerShellCommandTest PackageManagerShellCommandIncrementalTest IncrementalServiceTest

Change-Id: Iede1f2297cc4f8e3c3f0acd43cee597f75dff179
diff --git a/services/incremental/ServiceWrappers.cpp b/services/incremental/ServiceWrappers.cpp
index a76aa62..99a35ad 100644
--- a/services/incremental/ServiceWrappers.cpp
+++ b/services/incremental/ServiceWrappers.cpp
@@ -25,6 +25,8 @@
 #include <binder/AppOpsManager.h>
 #include <utils/String16.h>
 
+#include <thread>
+
 #include "IncrementalServiceValidation.h"
 
 using namespace std::literals;
@@ -181,6 +183,88 @@
     }
 };
 
+static JNIEnv* getOrAttachJniEnv(JavaVM* jvm);
+
+class RealTimedQueueWrapper : public TimedQueueWrapper {
+public:
+    RealTimedQueueWrapper(JavaVM* jvm) {
+        mThread = std::thread([this, jvm]() {
+            (void)getOrAttachJniEnv(jvm);
+            runTimers();
+        });
+    }
+    ~RealTimedQueueWrapper() final {
+        CHECK(!mRunning) << "call stop first";
+        CHECK(!mThread.joinable()) << "call stop first";
+    }
+
+    void addJob(MountId id, Milliseconds after, Job what) final {
+        const auto now = Clock::now();
+        {
+            std::unique_lock lock(mMutex);
+            mJobs.insert(TimedJob{id, now + after, std::move(what)});
+        }
+        mCondition.notify_all();
+    }
+    void removeJobs(MountId id) final {
+        std::unique_lock lock(mMutex);
+        std::erase_if(mJobs, [id](auto&& item) { return item.id == id; });
+    }
+    void stop() final {
+        {
+            std::unique_lock lock(mMutex);
+            mRunning = false;
+        }
+        mCondition.notify_all();
+        mThread.join();
+        mJobs.clear();
+    }
+
+private:
+    void runTimers() {
+        static constexpr TimePoint kInfinityTs{Clock::duration::max()};
+        TimePoint nextJobTs = kInfinityTs;
+        std::unique_lock lock(mMutex);
+        for (;;) {
+            mCondition.wait_until(lock, nextJobTs, [this, nextJobTs]() {
+                const auto now = Clock::now();
+                const auto firstJobTs = !mJobs.empty() ? mJobs.begin()->when : kInfinityTs;
+                return !mRunning || firstJobTs <= now || firstJobTs < nextJobTs;
+            });
+            if (!mRunning) {
+                return;
+            }
+
+            const auto now = Clock::now();
+            auto it = mJobs.begin();
+            // Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
+            for (; it != mJobs.end() && it->when <= now; it = mJobs.begin()) {
+                auto job = std::move(it->what);
+                mJobs.erase(it);
+
+                lock.unlock();
+                job();
+                lock.lock();
+            }
+            nextJobTs = it != mJobs.end() ? it->when : kInfinityTs;
+        }
+    }
+
+    struct TimedJob {
+        MountId id;
+        TimePoint when;
+        Job what;
+        friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) {
+            return lhs.when < rhs.when;
+        }
+    };
+    bool mRunning = true;
+    std::set<TimedJob> mJobs;
+    std::condition_variable mCondition;
+    std::mutex mMutex;
+    std::thread mThread;
+};
+
 RealServiceManager::RealServiceManager(sp<IServiceManager> serviceManager, JNIEnv* env)
       : mServiceManager(std::move(serviceManager)), mJvm(RealJniWrapper::getJvm(env)) {}
 
@@ -228,6 +312,10 @@
     return std::make_unique<RealLooperWrapper>();
 }
 
+std::unique_ptr<TimedQueueWrapper> RealServiceManager::getTimedQueue() {
+    return std::make_unique<RealTimedQueueWrapper>(mJvm);
+}
+
 static JavaVM* getJavaVm(JNIEnv* env) {
     CHECK(env);
     JavaVM* jvm = nullptr;