Split buckets on boot complete

Also clean up a bit of code on splitting on app upgrades

Piggy-backed off the app upgrade tests, adding parameterized tests to
also test boot complete event.

Refactored some value metric test code to increase code reuse and
assertions.

Fixed a broken value metric test that had assertions commented out.

Refactored NamedLatch into MultiConditionTrigger to avoid creating a
thread before necessary.

Test: atest statsd_test
Test: push a simple test config, reboot, wait, get data. Made sure
the bucket was split
Bug: 144099206
Bug: 154511974

Change-Id: I73858b5db08e8cda762bd8091b30da8738d1fd88
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index 325cbc7..fc7b950 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -1052,8 +1052,8 @@
 void StatsLogProcessor::notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk,
                                          const int uid, const int64_t version) {
     std::lock_guard<std::mutex> lock(mMetricsMutex);
-    ALOGW("Received app upgrade");
-    for (auto it : mMetricsManagers) {
+    VLOG("Received app upgrade");
+    for (const auto& it : mMetricsManagers) {
         it.second->notifyAppUpgrade(eventTimeNs, apk, uid, version);
     }
 }
@@ -1061,20 +1061,28 @@
 void StatsLogProcessor::notifyAppRemoved(const int64_t& eventTimeNs, const string& apk,
                                          const int uid) {
     std::lock_guard<std::mutex> lock(mMetricsMutex);
-    ALOGW("Received app removed");
-    for (auto it : mMetricsManagers) {
+    VLOG("Received app removed");
+    for (const auto& it : mMetricsManagers) {
         it.second->notifyAppRemoved(eventTimeNs, apk, uid);
     }
 }
 
 void StatsLogProcessor::onUidMapReceived(const int64_t& eventTimeNs) {
     std::lock_guard<std::mutex> lock(mMetricsMutex);
-    ALOGW("Received uid map");
-    for (auto it : mMetricsManagers) {
+    VLOG("Received uid map");
+    for (const auto& it : mMetricsManagers) {
         it.second->onUidMapReceived(eventTimeNs);
     }
 }
 
+void StatsLogProcessor::onStatsdInitCompleted(const int64_t& elapsedTimeNs) {
+    std::lock_guard<std::mutex> lock(mMetricsMutex);
+    VLOG("Received boot completed signal");
+    for (const auto& it : mMetricsManagers) {
+        it.second->onStatsdInitCompleted(elapsedTimeNs);
+    }
+}
+
 void StatsLogProcessor::noteOnDiskData(const ConfigKey& key) {
     std::lock_guard<std::mutex> lock(mMetricsMutex);
     mOnDiskDataConfigs.insert(key);
diff --git a/cmds/statsd/src/StatsLogProcessor.h b/cmds/statsd/src/StatsLogProcessor.h
index 97512ed..ffd83ba 100644
--- a/cmds/statsd/src/StatsLogProcessor.h
+++ b/cmds/statsd/src/StatsLogProcessor.h
@@ -120,6 +120,11 @@
     /* Notify all MetricsManagers of uid map snapshots received */
     void onUidMapReceived(const int64_t& eventTimeNs) override;
 
+    /* Notify all metrics managers of boot completed
+     * This will force a bucket split when the boot is finished.
+     */
+    void onStatsdInitCompleted(const int64_t& elapsedTimeNs);
+
     // Reset all configs.
     void resetConfigs();
 
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index ae7a8d0..f8cdcff 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -118,7 +118,8 @@
                   }
               })),
       mEventQueue(queue),
-      mBootCompleteLatch({kBootCompleteTag, kUidMapReceivedTag, kAllPullersRegisteredTag}),
+      mBootCompleteTrigger({kBootCompleteTag, kUidMapReceivedTag, kAllPullersRegisteredTag},
+                           [this]() { mProcessor->onStatsdInitCompleted(getElapsedRealtimeNs()); }),
       mStatsCompanionServiceDeathRecipient(
               AIBinder_DeathRecipient_new(StatsService::statsCompanionServiceDied)) {
     mUidMap = UidMap::getInstance();
@@ -165,12 +166,6 @@
         std::thread pushedEventThread([this] { readLogs(); });
         pushedEventThread.detach();
     }
-
-    std::thread bootCompletedThread([this] {
-        mBootCompleteLatch.wait();
-        VLOG("In the boot completed thread");
-    });
-    bootCompletedThread.detach();
 }
 
 StatsService::~StatsService() {
@@ -946,7 +941,7 @@
                        packageNames,
                        installers);
 
-    mBootCompleteLatch.countDown(kUidMapReceivedTag);
+    mBootCompleteTrigger.markComplete(kUidMapReceivedTag);
     VLOG("StatsService::informAllUidData UidData proto parsed successfully.");
     return Status::ok();
 }
@@ -1066,7 +1061,7 @@
     ENFORCE_UID(AID_SYSTEM);
 
     VLOG("StatsService::bootCompleted was called");
-    mBootCompleteLatch.countDown(kBootCompleteTag);
+    mBootCompleteTrigger.markComplete(kBootCompleteTag);
     return Status::ok();
 }
 
@@ -1235,7 +1230,7 @@
     ENFORCE_UID(AID_SYSTEM);
 
     VLOG("StatsService::allPullersFromBootRegistered was called");
-    mBootCompleteLatch.countDown(kAllPullersRegisteredTag);
+    mBootCompleteTrigger.markComplete(kAllPullersRegisteredTag);
     return Status::ok();
 }
 
diff --git a/cmds/statsd/src/StatsService.h b/cmds/statsd/src/StatsService.h
index 79324d8..e27966b 100644
--- a/cmds/statsd/src/StatsService.h
+++ b/cmds/statsd/src/StatsService.h
@@ -33,7 +33,7 @@
 #include "packages/UidMap.h"
 #include "shell/ShellSubscriber.h"
 #include "statscompanion_util.h"
-#include "utils/NamedLatch.h"
+#include "utils/MultiConditionTrigger.h"
 
 using namespace android;
 using namespace android::os;
@@ -386,7 +386,7 @@
     mutable mutex mShellSubscriberMutex;
     std::shared_ptr<LogEventQueue> mEventQueue;
 
-    NamedLatch mBootCompleteLatch;
+    MultiConditionTrigger mBootCompleteTrigger;
     static const inline string kBootCompleteTag = "BOOT_COMPLETE";
     static const inline string kUidMapReceivedTag = "UID_MAP";
     static const inline string kAllPullersRegisteredTag = "PULLERS_REGISTERED";
@@ -399,11 +399,14 @@
     FRIEND_TEST(StatsServiceTest, TestAddConfig_invalid);
     FRIEND_TEST(StatsServiceTest, TestGetUidFromArgs);
     FRIEND_TEST(PartialBucketE2eTest, TestCountMetricNoSplitOnNewApp);
+    FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnBoot);
     FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnUpgrade);
     FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnRemoval);
     FRIEND_TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit);
+    FRIEND_TEST(PartialBucketE2eTest, TestValueMetricOnBootWithoutMinPartialBucket);
     FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket);
     FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket);
+    FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricOnBootWithoutMinPartialBucket);
     FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket);
     FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket);
 };
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.h b/cmds/statsd/src/metrics/CountMetricProducer.h
index a4711e8..f9a8842 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.h
+++ b/cmds/statsd/src/metrics/CountMetricProducer.h
@@ -109,10 +109,11 @@
     FRIEND_TEST(CountMetricProducerTest, TestEventsWithNonSlicedCondition);
     FRIEND_TEST(CountMetricProducerTest, TestEventsWithSlicedCondition);
     FRIEND_TEST(CountMetricProducerTest, TestAnomalyDetectionUnSliced);
-    FRIEND_TEST(CountMetricProducerTest, TestEventWithAppUpgrade);
-    FRIEND_TEST(CountMetricProducerTest, TestEventWithAppUpgradeInNextBucket);
     FRIEND_TEST(CountMetricProducerTest, TestFirstBucket);
     FRIEND_TEST(CountMetricProducerTest, TestOneWeekTimeUnit);
+
+    FRIEND_TEST(CountMetricProducerTest_PartialBucket, TestSplitInCurrentBucket);
+    FRIEND_TEST(CountMetricProducerTest_PartialBucket, TestSplitInNextBucket);
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.h b/cmds/statsd/src/metrics/DurationMetricProducer.h
index cc48f99..6f84076 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.h
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.h
@@ -154,12 +154,14 @@
     FRIEND_TEST(DurationMetricTrackerTest, TestNoCondition);
     FRIEND_TEST(DurationMetricTrackerTest, TestNonSlicedCondition);
     FRIEND_TEST(DurationMetricTrackerTest, TestNonSlicedConditionUnknownState);
-    FRIEND_TEST(DurationMetricTrackerTest, TestSumDurationWithUpgrade);
-    FRIEND_TEST(DurationMetricTrackerTest, TestSumDurationWithUpgradeInFollowingBucket);
-    FRIEND_TEST(DurationMetricTrackerTest, TestMaxDurationWithUpgrade);
-    FRIEND_TEST(DurationMetricTrackerTest, TestMaxDurationWithUpgradeInNextBucket);
     FRIEND_TEST(WakelockDurationE2eTest, TestAggregatedPredicates);
     FRIEND_TEST(DurationMetricTrackerTest, TestFirstBucket);
+
+    FRIEND_TEST(DurationMetricProducerTest_PartialBucket, TestSumDuration);
+    FRIEND_TEST(DurationMetricProducerTest_PartialBucket,
+                TestSumDurationWithSplitInFollowingBucket);
+    FRIEND_TEST(DurationMetricProducerTest_PartialBucket, TestMaxDuration);
+    FRIEND_TEST(DurationMetricProducerTest_PartialBucket, TestMaxDurationWithSplitInNextBucket);
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 79ec711..e2bff21 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -73,18 +73,23 @@
                       bool pullSuccess, int64_t originalPullTimeNs) override;
 
     // GaugeMetric needs to immediately trigger another pull when we create the partial bucket.
-    void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
-                          const int64_t version) override {
+    void notifyAppUpgrade(const int64_t& eventTimeNs) override {
         std::lock_guard<std::mutex> lock(mMutex);
 
         if (!mSplitBucketForAppUpgrade) {
             return;
         }
-        if (eventTimeNs > getCurrentBucketEndTimeNs()) {
-            // Flush full buckets on the normal path up to the latest bucket boundary.
-            flushIfNeededLocked(eventTimeNs);
+        flushLocked(eventTimeNs);
+        if (mIsPulled && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
+            pullAndMatchEventsLocked(eventTimeNs);
         }
-        flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
+    };
+
+    // GaugeMetric needs to immediately trigger another pull when we create the partial bucket.
+    void onStatsdInitCompleted(const int64_t& eventTimeNs) override {
+        std::lock_guard<std::mutex> lock(mMutex);
+
+        flushLocked(eventTimeNs);
         if (mIsPulled && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
             pullAndMatchEventsLocked(eventTimeNs);
         }
@@ -190,13 +195,14 @@
     FRIEND_TEST(GaugeMetricProducerTest, TestPulledEventsWithCondition);
     FRIEND_TEST(GaugeMetricProducerTest, TestPulledEventsWithSlicedCondition);
     FRIEND_TEST(GaugeMetricProducerTest, TestPulledEventsNoCondition);
-    FRIEND_TEST(GaugeMetricProducerTest, TestPushedEventsWithUpgrade);
-    FRIEND_TEST(GaugeMetricProducerTest, TestPulledWithUpgrade);
     FRIEND_TEST(GaugeMetricProducerTest, TestPulledWithAppUpgradeDisabled);
     FRIEND_TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection);
     FRIEND_TEST(GaugeMetricProducerTest, TestFirstBucket);
     FRIEND_TEST(GaugeMetricProducerTest, TestPullOnTrigger);
     FRIEND_TEST(GaugeMetricProducerTest, TestRemoveDimensionInOutput);
+
+    FRIEND_TEST(GaugeMetricProducerTest_PartialBucket, TestPushedEvents);
+    FRIEND_TEST(GaugeMetricProducerTest_PartialBucket, TestPulled);
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index 4550e65..7d33d5a 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -141,30 +141,25 @@
     }
 
     /**
-     * Forces this metric to split into a partial bucket right now. If we're past a full bucket, we
-     * first call the standard flushing code to flush up to the latest full bucket. Then we call
-     * the flush again when the end timestamp is forced to be now, and then after flushing, update
-     * the start timestamp to be now.
+     * Force a partial bucket split on app upgrade
      */
-    virtual void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
-                          const int64_t version) {
+    virtual void notifyAppUpgrade(const int64_t& eventTimeNs) {
         std::lock_guard<std::mutex> lock(mMutex);
-
-        if (eventTimeNs > getCurrentBucketEndTimeNs()) {
-            // Flush full buckets on the normal path up to the latest bucket boundary.
-            flushIfNeededLocked(eventTimeNs);
-        }
-        // Now flush a partial bucket.
-        flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
-        // Don't update the current bucket number so that the anomaly tracker knows this bucket
-        // is a partial bucket and can merge it with the previous bucket.
+        flushLocked(eventTimeNs);
     };
 
-    void notifyAppRemoved(const int64_t& eventTimeNs, const string& apk, const int uid) {
+    void notifyAppRemoved(const int64_t& eventTimeNs) {
         // Force buckets to split on removal also.
-        notifyAppUpgrade(eventTimeNs, apk, uid, 0);
+        notifyAppUpgrade(eventTimeNs);
     };
 
+    /**
+     * Force a partial bucket split on boot complete.
+     */
+    virtual void onStatsdInitCompleted(const int64_t& eventTimeNs) {
+        std::lock_guard<std::mutex> lock(mMutex);
+        flushLocked(eventTimeNs);
+    }
     // Consume the parsed stats log entry that already matched the "what" of the metric.
     void onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event) {
         std::lock_guard<std::mutex> lock(mMutex);
@@ -292,8 +287,7 @@
     // End: getters/setters
 protected:
     /**
-     * Flushes the current bucket if the eventTime is after the current bucket's end time. This will
-       also flush the current partial bucket in memory.
+     * Flushes the current bucket if the eventTime is after the current bucket's end time.
      */
     virtual void flushIfNeededLocked(const int64_t& eventTime){};
 
diff --git a/cmds/statsd/src/metrics/MetricsManager.cpp b/cmds/statsd/src/metrics/MetricsManager.cpp
index d832ed8..bcca1fd9 100644
--- a/cmds/statsd/src/metrics/MetricsManager.cpp
+++ b/cmds/statsd/src/metrics/MetricsManager.cpp
@@ -231,8 +231,8 @@
 void MetricsManager::notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
                                       const int64_t version) {
     // Inform all metric producers.
-    for (auto it : mAllMetricProducers) {
-        it->notifyAppUpgrade(eventTimeNs, apk, uid, version);
+    for (const auto& it : mAllMetricProducers) {
+        it->notifyAppUpgrade(eventTimeNs);
     }
     // check if we care this package
     if (std::find(mAllowedPkg.begin(), mAllowedPkg.end(), apk) != mAllowedPkg.end()) {
@@ -252,8 +252,8 @@
 void MetricsManager::notifyAppRemoved(const int64_t& eventTimeNs, const string& apk,
                                       const int uid) {
     // Inform all metric producers.
-    for (auto it : mAllMetricProducers) {
-        it->notifyAppRemoved(eventTimeNs, apk, uid);
+    for (const auto& it : mAllMetricProducers) {
+        it->notifyAppRemoved(eventTimeNs);
     }
     // check if we care this package
     if (std::find(mAllowedPkg.begin(), mAllowedPkg.end(), apk) != mAllowedPkg.end()) {
@@ -282,6 +282,13 @@
     initLogSourceWhiteList();
 }
 
+void MetricsManager::onStatsdInitCompleted(const int64_t& eventTimeNs) {
+    // Inform all metric producers.
+    for (const auto& it : mAllMetricProducers) {
+        it->onStatsdInitCompleted(eventTimeNs);
+    }
+}
+
 void MetricsManager::init() {
     for (const auto& producer : mAllMetricProducers) {
         producer->prepareFirstBucket();
diff --git a/cmds/statsd/src/metrics/MetricsManager.h b/cmds/statsd/src/metrics/MetricsManager.h
index 1fd6572..ef03d20 100644
--- a/cmds/statsd/src/metrics/MetricsManager.h
+++ b/cmds/statsd/src/metrics/MetricsManager.h
@@ -70,6 +70,8 @@
 
     void onUidMapReceived(const int64_t& eventTimeNs);
 
+    void onStatsdInitCompleted(const int64_t& elapsedTimeNs);
+
     void init();
 
     vector<int32_t> getPullAtomUids(int32_t atomId) override;
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index e9273dc..c8dc8cc 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -69,8 +69,7 @@
                       bool pullSuccess, int64_t originalPullTimeNs) override;
 
     // ValueMetric needs special logic if it's a pulled atom.
-    void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
-                          const int64_t version) override {
+    void notifyAppUpgrade(const int64_t& eventTimeNs) override {
         std::lock_guard<std::mutex> lock(mMutex);
         if (!mSplitBucketForAppUpgrade) {
             return;
@@ -81,6 +80,15 @@
         flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
     };
 
+    // ValueMetric needs special logic if it's a pulled atom.
+    void onStatsdInitCompleted(const int64_t& eventTimeNs) override {
+        std::lock_guard<std::mutex> lock(mMutex);
+        if (mIsPulled && mCondition) {
+            pullAndMatchEventsLocked(eventTimeNs);
+        }
+        flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
+    };
+
     void onStateChanged(int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey,
                         int oldState, int newState) override;
 
@@ -256,7 +264,6 @@
 
     FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection);
     FRIEND_TEST(ValueMetricProducerTest, TestBaseSetOnConditionChange);
-    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade);
     FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange);
     FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition);
     FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition);
@@ -269,10 +276,8 @@
     FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled);
     FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition);
     FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket);
-    FRIEND_TEST(ValueMetricProducerTest, TestFullBucketResetWhenLastBucketInvalid);
     FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithDiff);
     FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithoutDiff);
-    FRIEND_TEST(ValueMetricProducerTest, TestPartialBucketCreated);
     FRIEND_TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries);
     FRIEND_TEST(ValueMetricProducerTest, TestPulledData_noDiff_bucketBoundaryFalse);
     FRIEND_TEST(ValueMetricProducerTest, TestPulledData_noDiff_bucketBoundaryTrue);
@@ -283,15 +288,12 @@
     FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset);
     FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset);
     FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering);
-    FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade);
-    FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse);
     FRIEND_TEST(ValueMetricProducerTest, TestPulledWithAppUpgradeDisabled);
     FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateAvg);
     FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMax);
     FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMin);
     FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSum);
     FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithCondition);
-    FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade);
     FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition);
     FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullDelayExceeded);
     FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange);
@@ -313,6 +315,14 @@
     FRIEND_TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenGuardRailHit);
     FRIEND_TEST(ValueMetricProducerTest_BucketDrop,
                 TestInvalidBucketWhenAccumulateEventWrongBucket);
+
+    FRIEND_TEST(ValueMetricProducerTest_PartialBucket, TestBucketBoundariesOnPartialBucket);
+    FRIEND_TEST(ValueMetricProducerTest_PartialBucket, TestFullBucketResetWhenLastBucketInvalid);
+    FRIEND_TEST(ValueMetricProducerTest_PartialBucket, TestPartialBucketCreated);
+    FRIEND_TEST(ValueMetricProducerTest_PartialBucket, TestPushedEvents);
+    FRIEND_TEST(ValueMetricProducerTest_PartialBucket, TestPulledValue);
+    FRIEND_TEST(ValueMetricProducerTest_PartialBucket, TestPulledValueWhileConditionFalse);
+
     friend class ValueMetricProducerTestHelper;
 };
 
diff --git a/cmds/statsd/src/utils/MultiConditionTrigger.cpp b/cmds/statsd/src/utils/MultiConditionTrigger.cpp
new file mode 100644
index 0000000..43a6933
--- /dev/null
+++ b/cmds/statsd/src/utils/MultiConditionTrigger.cpp
@@ -0,0 +1,57 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define DEBUG false  // STOPSHIP if true
+
+#include "MultiConditionTrigger.h"
+
+#include <thread>
+
+using namespace std;
+
+namespace android {
+namespace os {
+namespace statsd {
+
+MultiConditionTrigger::MultiConditionTrigger(const set<string>& conditionNames,
+                                             function<void()> trigger)
+    : mRemainingConditionNames(conditionNames),
+      mTrigger(trigger),
+      mCompleted(mRemainingConditionNames.empty()) {
+    if (mCompleted) {
+        thread executorThread([this] { mTrigger(); });
+        executorThread.detach();
+    }
+}
+
+void MultiConditionTrigger::markComplete(const string& conditionName) {
+    bool doTrigger = false;
+    {
+        lock_guard<mutex> lg(mMutex);
+        if (mCompleted) {
+            return;
+        }
+        mRemainingConditionNames.erase(conditionName);
+        mCompleted = mRemainingConditionNames.empty();
+        doTrigger = mCompleted;
+    }
+    if (doTrigger) {
+        std::thread executorThread([this] { mTrigger(); });
+        executorThread.detach();
+    }
+}
+}  // namespace statsd
+}  // namespace os
+}  // namespace android
diff --git a/cmds/statsd/src/utils/MultiConditionTrigger.h b/cmds/statsd/src/utils/MultiConditionTrigger.h
new file mode 100644
index 0000000..51f6029
--- /dev/null
+++ b/cmds/statsd/src/utils/MultiConditionTrigger.h
@@ -0,0 +1,55 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <gtest/gtest_prod.h>
+
+#include <mutex>
+#include <set>
+
+namespace android {
+namespace os {
+namespace statsd {
+
+/**
+ * This class provides a utility to wait for a set of named conditions to occur.
+ *
+ * It will execute the trigger runnable in a detached thread once all conditions have been marked
+ * true.
+ */
+class MultiConditionTrigger {
+public:
+    explicit MultiConditionTrigger(const std::set<std::string>& conditionNames,
+                                   std::function<void()> trigger);
+
+    MultiConditionTrigger(const MultiConditionTrigger&) = delete;
+    MultiConditionTrigger& operator=(const MultiConditionTrigger&) = delete;
+
+    // Mark a specific condition as true. If this condition has called markComplete already or if
+    // the event was not specified in the constructor, the function is a no-op.
+    void markComplete(const std::string& eventName);
+
+private:
+    mutable std::mutex mMutex;
+    std::set<std::string> mRemainingConditionNames;
+    std::function<void()> mTrigger;
+    bool mCompleted;
+
+    FRIEND_TEST(MultiConditionTriggerTest, TestCountDownCalledBySameEventName);
+};
+}  // namespace statsd
+}  // namespace os
+}  // namespace android
diff --git a/cmds/statsd/src/utils/NamedLatch.cpp b/cmds/statsd/src/utils/NamedLatch.cpp
deleted file mode 100644
index 6e77977..0000000
--- a/cmds/statsd/src/utils/NamedLatch.cpp
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (C) 2020 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#define DEBUG false  // STOPSHIP if true
-
-#include "NamedLatch.h"
-
-using namespace std;
-
-namespace android {
-namespace os {
-namespace statsd {
-
-NamedLatch::NamedLatch(const set<string>& eventNames) : mRemainingEventNames(eventNames) {
-}
-
-void NamedLatch::countDown(const string& eventName) {
-    bool notify = false;
-    {
-        lock_guard<mutex> lg(mMutex);
-        mRemainingEventNames.erase(eventName);
-        notify = mRemainingEventNames.empty();
-    }
-    if (notify) {
-        mConditionVariable.notify_all();
-    }
-}
-
-void NamedLatch::wait() const {
-    unique_lock<mutex> unique_lk(mMutex);
-    mConditionVariable.wait(unique_lk, [this] { return mRemainingEventNames.empty(); });
-}
-
-}  // namespace statsd
-}  // namespace os
-}  // namespace android
diff --git a/cmds/statsd/src/utils/NamedLatch.h b/cmds/statsd/src/utils/NamedLatch.h
deleted file mode 100644
index 70238370..0000000
--- a/cmds/statsd/src/utils/NamedLatch.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (C) 2020 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <gtest/gtest_prod.h>
-
-#include <condition_variable>
-#include <mutex>
-#include <set>
-
-namespace android {
-namespace os {
-namespace statsd {
-
-/**
- * This class provides a threading primitive similar to a latch.
- * The primary difference is that it waits for named events to occur instead of waiting for
- * N threads to reach a certain point.
- *
- * It uses a condition variable under the hood.
- */
-class NamedLatch {
-public:
-    explicit NamedLatch(const std::set<std::string>& eventNames);
-
-    NamedLatch(const NamedLatch&) = delete;
-    NamedLatch& operator=(const NamedLatch&) = delete;
-
-    // Mark a specific event as completed. If this event has called countDown already or if the
-    // event was not specified in the constructor, the function is a no-op.
-    void countDown(const std::string& eventName);
-
-    // Blocks the calling thread until all events in eventNames have called countDown.
-    void wait() const;
-
-private:
-    mutable std::mutex mMutex;
-    mutable std::condition_variable mConditionVariable;
-    std::set<std::string> mRemainingEventNames;
-
-    FRIEND_TEST(NamedLatchTest, TestCountDownCalledBySameEventName);
-};
-}  // namespace statsd
-}  // namespace os
-}  // namespace android