Add NamedLatch to statsd

This is a sychronizing primitive that is similar to a latch, but a
thread must count down with an identifier. It will be used to make sure
the boot complete, uid map, and all pullers signals are received before
triggering the bucket split.

The latch's countDown operation takes in a string identifier, so that if
the same operation happens twice, it is only counted once.

Bug: 144099206
Test: atest statsd_test

Change-Id: I261a3e50eabbc4998ca30ddf2d67a9a1e788911e
diff --git a/cmds/statsd/Android.bp b/cmds/statsd/Android.bp
index b357904..6e8ceb7 100644
--- a/cmds/statsd/Android.bp
+++ b/cmds/statsd/Android.bp
@@ -104,6 +104,7 @@
         "src/subscriber/IncidentdReporter.cpp",
         "src/subscriber/SubscriberReporter.cpp",
         "src/uid_data.proto",
+        "src/utils/NamedLatch.cpp",
     ],
 
     local_include_dirs: [
@@ -361,6 +362,7 @@
         "tests/StatsService_test.cpp",
         "tests/storage/StorageManager_test.cpp",
         "tests/UidMap_test.cpp",
+        "tests/utils/NamedLatch_test.cpp",
     ],
 
     static_libs: [
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index dd1d400..ae7a8d0 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -118,8 +118,9 @@
                   }
               })),
       mEventQueue(queue),
-      mStatsCompanionServiceDeathRecipient(AIBinder_DeathRecipient_new(
-              StatsService::statsCompanionServiceDied)) {
+      mBootCompleteLatch({kBootCompleteTag, kUidMapReceivedTag, kAllPullersRegisteredTag}),
+      mStatsCompanionServiceDeathRecipient(
+              AIBinder_DeathRecipient_new(StatsService::statsCompanionServiceDied)) {
     mUidMap = UidMap::getInstance();
     mPullerManager = new StatsPullerManager();
     StatsPuller::SetUidMap(mUidMap);
@@ -164,6 +165,12 @@
         std::thread pushedEventThread([this] { readLogs(); });
         pushedEventThread.detach();
     }
+
+    std::thread bootCompletedThread([this] {
+        mBootCompleteLatch.wait();
+        VLOG("In the boot completed thread");
+    });
+    bootCompletedThread.detach();
 }
 
 StatsService::~StatsService() {
@@ -939,6 +946,7 @@
                        packageNames,
                        installers);
 
+    mBootCompleteLatch.countDown(kUidMapReceivedTag);
     VLOG("StatsService::informAllUidData UidData proto parsed successfully.");
     return Status::ok();
 }
@@ -1058,7 +1066,7 @@
     ENFORCE_UID(AID_SYSTEM);
 
     VLOG("StatsService::bootCompleted was called");
-
+    mBootCompleteLatch.countDown(kBootCompleteTag);
     return Status::ok();
 }
 
@@ -1227,7 +1235,7 @@
     ENFORCE_UID(AID_SYSTEM);
 
     VLOG("StatsService::allPullersFromBootRegistered was called");
-
+    mBootCompleteLatch.countDown(kAllPullersRegisteredTag);
     return Status::ok();
 }
 
diff --git a/cmds/statsd/src/StatsService.h b/cmds/statsd/src/StatsService.h
index 23d4c1b..79324d8 100644
--- a/cmds/statsd/src/StatsService.h
+++ b/cmds/statsd/src/StatsService.h
@@ -17,7 +17,14 @@
 #ifndef STATS_SERVICE_H
 #define STATS_SERVICE_H
 
+#include <aidl/android/os/BnStatsd.h>
+#include <aidl/android/os/IPendingIntentRef.h>
+#include <aidl/android/os/IPullAtomCallback.h>
 #include <gtest/gtest_prod.h>
+#include <utils/Looper.h>
+
+#include <mutex>
+
 #include "StatsLogProcessor.h"
 #include "anomaly/AlarmMonitor.h"
 #include "config/ConfigManager.h"
@@ -26,13 +33,7 @@
 #include "packages/UidMap.h"
 #include "shell/ShellSubscriber.h"
 #include "statscompanion_util.h"
-
-#include <aidl/android/os/BnStatsd.h>
-#include <aidl/android/os/IPendingIntentRef.h>
-#include <aidl/android/os/IPullAtomCallback.h>
-#include <utils/Looper.h>
-
-#include <mutex>
+#include "utils/NamedLatch.h"
 
 using namespace android;
 using namespace android::os;
@@ -385,6 +386,11 @@
     mutable mutex mShellSubscriberMutex;
     std::shared_ptr<LogEventQueue> mEventQueue;
 
+    NamedLatch mBootCompleteLatch;
+    static const inline string kBootCompleteTag = "BOOT_COMPLETE";
+    static const inline string kUidMapReceivedTag = "UID_MAP";
+    static const inline string kAllPullersRegisteredTag = "PULLERS_REGISTERED";
+
     ScopedAIBinder_DeathRecipient mStatsCompanionServiceDeathRecipient;
 
     FRIEND_TEST(StatsLogProcessorTest, TestActivationsPersistAcrossSystemServerRestart);
diff --git a/cmds/statsd/src/utils/NamedLatch.cpp b/cmds/statsd/src/utils/NamedLatch.cpp
new file mode 100644
index 0000000..6e77977
--- /dev/null
+++ b/cmds/statsd/src/utils/NamedLatch.cpp
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define DEBUG false  // STOPSHIP if true
+
+#include "NamedLatch.h"
+
+using namespace std;
+
+namespace android {
+namespace os {
+namespace statsd {
+
+NamedLatch::NamedLatch(const set<string>& eventNames) : mRemainingEventNames(eventNames) {
+}
+
+void NamedLatch::countDown(const string& eventName) {
+    bool notify = false;
+    {
+        lock_guard<mutex> lg(mMutex);
+        mRemainingEventNames.erase(eventName);
+        notify = mRemainingEventNames.empty();
+    }
+    if (notify) {
+        mConditionVariable.notify_all();
+    }
+}
+
+void NamedLatch::wait() const {
+    unique_lock<mutex> unique_lk(mMutex);
+    mConditionVariable.wait(unique_lk, [this] { return mRemainingEventNames.empty(); });
+}
+
+}  // namespace statsd
+}  // namespace os
+}  // namespace android
diff --git a/cmds/statsd/src/utils/NamedLatch.h b/cmds/statsd/src/utils/NamedLatch.h
new file mode 100644
index 0000000..70238370
--- /dev/null
+++ b/cmds/statsd/src/utils/NamedLatch.h
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <gtest/gtest_prod.h>
+
+#include <condition_variable>
+#include <mutex>
+#include <set>
+
+namespace android {
+namespace os {
+namespace statsd {
+
+/**
+ * This class provides a threading primitive similar to a latch.
+ * The primary difference is that it waits for named events to occur instead of waiting for
+ * N threads to reach a certain point.
+ *
+ * It uses a condition variable under the hood.
+ */
+class NamedLatch {
+public:
+    explicit NamedLatch(const std::set<std::string>& eventNames);
+
+    NamedLatch(const NamedLatch&) = delete;
+    NamedLatch& operator=(const NamedLatch&) = delete;
+
+    // Mark a specific event as completed. If this event has called countDown already or if the
+    // event was not specified in the constructor, the function is a no-op.
+    void countDown(const std::string& eventName);
+
+    // Blocks the calling thread until all events in eventNames have called countDown.
+    void wait() const;
+
+private:
+    mutable std::mutex mMutex;
+    mutable std::condition_variable mConditionVariable;
+    std::set<std::string> mRemainingEventNames;
+
+    FRIEND_TEST(NamedLatchTest, TestCountDownCalledBySameEventName);
+};
+}  // namespace statsd
+}  // namespace os
+}  // namespace android
diff --git a/cmds/statsd/tests/utils/NamedLatch_test.cpp b/cmds/statsd/tests/utils/NamedLatch_test.cpp
new file mode 100644
index 0000000..de48a13
--- /dev/null
+++ b/cmds/statsd/tests/utils/NamedLatch_test.cpp
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "utils/NamedLatch.h"
+
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <set>
+#include <thread>
+#include <vector>
+
+#ifdef __ANDROID__
+
+using namespace std;
+using std::this_thread::sleep_for;
+
+namespace android {
+namespace os {
+namespace statsd {
+
+TEST(NamedLatchTest, TestWait) {
+    int numEvents = 5;
+    string t1 = "t1", t2 = "t2", t3 = "t3", t4 = "t4", t5 = "t5";
+    set<string> eventNames = {t1, t2, t3, t4, t5};
+
+    NamedLatch latch(eventNames);
+    vector<thread> threads;
+    vector<bool> done(numEvents, false);
+
+    int i = 0;
+    for (const string& eventName : eventNames) {
+        threads.emplace_back([&done, &eventName, &latch, i] {
+            sleep_for(chrono::milliseconds(3));
+            done[i] = true;
+            latch.countDown(eventName);
+        });
+        i++;
+    }
+
+    latch.wait();
+
+    for (i = 0; i < numEvents; i++) {
+        EXPECT_EQ(done[i], 1);
+    }
+
+    for (i = 0; i < numEvents; i++) {
+        threads[i].join();
+    }
+}
+
+TEST(NamedLatchTest, TestNoWorkers) {
+    NamedLatch latch({});
+    latch.wait();
+    // Ensure that latch does not wait if no events need to countDown.
+}
+
+TEST(NamedLatchTest, TestCountDownCalledBySameEventName) {
+    string t1 = "t1", t2 = "t2";
+    set<string> eventNames = {t1, t2};
+
+    NamedLatch latch(eventNames);
+
+    thread waiterThread([&latch] { latch.wait(); });
+
+    latch.countDown(t1);
+    latch.countDown(t1);
+
+    // Ensure that the latch's remaining threads still has t2.
+    latch.mMutex.lock();
+    ASSERT_EQ(latch.mRemainingEventNames.size(), 1);
+    EXPECT_NE(latch.mRemainingEventNames.find(t2), latch.mRemainingEventNames.end());
+    latch.mMutex.unlock();
+
+    latch.countDown(t2);
+    waiterThread.join();
+}
+
+}  // namespace statsd
+}  // namespace os
+}  // namespace android
+#else
+GTEST_LOG_(INFO) << "This test does nothing.\n";
+#endif