Make StatsLog drop less.

+ Create a thread-safe LogEventQueue to buffer log events.

+ The socket listner thread will read from socket and write to the buffer as quickly as possible
  to minimize the data loss in socket.

+ All pushed data is fetched from the the buffer and processed in a dedicated thread. After an
  event is fetched from the queue, we no longer block the socket listener thread.

+ Report event queue stats via statsdstats, including the min and max queue event history span in
  the queue (to understand how slow statsd can be and how fast the events can be)

Bug: 119031518
Test: unit tests added in statsd_test

Change-Id: I6b65ed9a678935b2e24302ba4b36e69c157adde4
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index 52ecdc8..bfe2da9 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -132,34 +132,36 @@
     }                                                             \
 }
 
-StatsService::StatsService(const sp<Looper>& handlerLooper)
-    : mAnomalyAlarmMonitor(new AlarmMonitor(MIN_DIFF_TO_UPDATE_REGISTERED_ALARM_SECS,
-       [](const sp<IStatsCompanionService>& sc, int64_t timeMillis) {
-           if (sc != nullptr) {
-               sc->setAnomalyAlarm(timeMillis);
-               StatsdStats::getInstance().noteRegisteredAnomalyAlarmChanged();
-           }
-       },
-       [](const sp<IStatsCompanionService>& sc) {
-           if (sc != nullptr) {
-               sc->cancelAnomalyAlarm();
-               StatsdStats::getInstance().noteRegisteredAnomalyAlarmChanged();
-           }
-       })),
-   mPeriodicAlarmMonitor(new AlarmMonitor(MIN_DIFF_TO_UPDATE_REGISTERED_ALARM_SECS,
-      [](const sp<IStatsCompanionService>& sc, int64_t timeMillis) {
-           if (sc != nullptr) {
-               sc->setAlarmForSubscriberTriggering(timeMillis);
-               StatsdStats::getInstance().noteRegisteredPeriodicAlarmChanged();
-           }
-      },
-      [](const sp<IStatsCompanionService>& sc) {
-           if (sc != nullptr) {
-               sc->cancelAlarmForSubscriberTriggering();
-               StatsdStats::getInstance().noteRegisteredPeriodicAlarmChanged();
-           }
-
-      }))  {
+StatsService::StatsService(const sp<Looper>& handlerLooper, shared_ptr<LogEventQueue> queue)
+    : mAnomalyAlarmMonitor(new AlarmMonitor(
+              MIN_DIFF_TO_UPDATE_REGISTERED_ALARM_SECS,
+              [](const sp<IStatsCompanionService>& sc, int64_t timeMillis) {
+                  if (sc != nullptr) {
+                      sc->setAnomalyAlarm(timeMillis);
+                      StatsdStats::getInstance().noteRegisteredAnomalyAlarmChanged();
+                  }
+              },
+              [](const sp<IStatsCompanionService>& sc) {
+                  if (sc != nullptr) {
+                      sc->cancelAnomalyAlarm();
+                      StatsdStats::getInstance().noteRegisteredAnomalyAlarmChanged();
+                  }
+              })),
+      mPeriodicAlarmMonitor(new AlarmMonitor(
+              MIN_DIFF_TO_UPDATE_REGISTERED_ALARM_SECS,
+              [](const sp<IStatsCompanionService>& sc, int64_t timeMillis) {
+                  if (sc != nullptr) {
+                      sc->setAlarmForSubscriberTriggering(timeMillis);
+                      StatsdStats::getInstance().noteRegisteredPeriodicAlarmChanged();
+                  }
+              },
+              [](const sp<IStatsCompanionService>& sc) {
+                  if (sc != nullptr) {
+                      sc->cancelAlarmForSubscriberTriggering();
+                      StatsdStats::getInstance().noteRegisteredPeriodicAlarmChanged();
+                  }
+              })),
+      mEventQueue(queue) {
     mUidMap = UidMap::getInstance();
     mPullerManager = new StatsPullerManager();
     StatsPuller::SetUidMap(mUidMap);
@@ -201,11 +203,33 @@
     mConfigManager->AddListener(mProcessor);
 
     init_system_properties();
+
+    if (mEventQueue != nullptr) {
+        std::thread pushedEventThread([this] { readLogs(); });
+        pushedEventThread.detach();
+    }
 }
 
 StatsService::~StatsService() {
 }
 
+/* Runs on a dedicated thread to process pushed events. */
+void StatsService::readLogs() {
+    // Read forever..... long live statsd
+    while (1) {
+        // Block until an event is available.
+        auto event = mEventQueue->waitPop();
+        // Pass it to StatsLogProcess to all configs/metrics
+        // At this point, the LogEventQueue is not blocked, so that the socketListener
+        // can read events from the socket and write to buffer to avoid data drop.
+        mProcessor->OnLogEvent(event.get());
+        // The ShellSubscriber is only used by shell for local debugging.
+        if (mShellSubscriber != nullptr) {
+            mShellSubscriber->onLogEvent(*event);
+        }
+    }
+}
+
 void StatsService::init_system_properties() {
     mEngBuild = false;
     const prop_info* buildType = __system_property_find("ro.build.type");
@@ -1009,6 +1033,7 @@
     }
 }
 
+// Test only interface!!!
 void StatsService::OnLogEvent(LogEvent* event) {
     mProcessor->OnLogEvent(event);
     if (mShellSubscriber != nullptr) {