Support atoms subscription via shell cmd.

+ This feature is for supporting perfd.
+ Perfd is built outside android and doesnot depend on any android libraries.
  So the communication between perfd and statsd can only be via non-android IPCs
  (such as socket, pipe, file descriptors etc.)
+ Perfd runs as a shell user, so using the existing shell cmd is a natural choice.
+ The input is a simple config, and output is a stream of atoms proto binary data.

+ Also cleaned up the code so that we use file descriptor directly instead of creating another
  FILE*.

TODO: pulled atom subscription.

Bug: 110536553
Test: statsd_test and manually tested
Change-Id: I64b0061cc66b5f7648147885a2ac1af531c19917
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index 8e02f9c..f4c70be 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -257,12 +257,18 @@
     return it->second->byteSize();
 }
 
-void StatsLogProcessor::dumpStates(FILE* out, bool verbose) {
+void StatsLogProcessor::dumpStates(int out, bool verbose) {
     std::lock_guard<std::mutex> lock(mMetricsMutex);
-    fprintf(out, "MetricsManager count: %lu\n", (unsigned long)mMetricsManagers.size());
-    for (auto metricsManager : mMetricsManagers) {
-        metricsManager.second->dumpStates(out, verbose);
+    FILE* fout = fdopen(out, "w");
+    if (fout == NULL) {
+        return;
     }
+    fprintf(fout, "MetricsManager count: %lu\n", (unsigned long)mMetricsManagers.size());
+    for (auto metricsManager : mMetricsManagers) {
+        metricsManager.second->dumpStates(fout, verbose);
+    }
+
+    fclose(fout);
 }
 
 /*
diff --git a/cmds/statsd/src/StatsLogProcessor.h b/cmds/statsd/src/StatsLogProcessor.h
index df80b8e..4091d95 100644
--- a/cmds/statsd/src/StatsLogProcessor.h
+++ b/cmds/statsd/src/StatsLogProcessor.h
@@ -83,7 +83,7 @@
         return mUidMap;
     }
 
-    void dumpStates(FILE* out, bool verbose);
+    void dumpStates(int outFd, bool verbose);
 
     void informPullAlarmFired(const int64_t timestampNs);
 
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index 9a79345..2ef1169 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -34,13 +34,14 @@
 #include <dirent.h>
 #include <frameworks/base/cmds/statsd/src/statsd_config.pb.h>
 #include <private/android_filesystem_config.h>
-#include <utils/Looper.h>
-#include <utils/String16.h>
 #include <statslog.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/system_properties.h>
 #include <unistd.h>
+#include <utils/Looper.h>
+#include <utils/String16.h>
+#include <chrono>
 
 using namespace android;
 
@@ -214,30 +215,8 @@
             sp<IResultReceiver> resultReceiver =
                     IResultReceiver::asInterface(data.readStrongBinder());
 
-            FILE* fin = fdopen(in, "r");
-            FILE* fout = fdopen(out, "w");
-            FILE* ferr = fdopen(err, "w");
-
-            if (fin == NULL || fout == NULL || ferr == NULL) {
-                resultReceiver->send(NO_MEMORY);
-            } else {
-                err = command(fin, fout, ferr, args);
-                resultReceiver->send(err);
-            }
-
-            if (fin != NULL) {
-                fflush(fin);
-                fclose(fin);
-            }
-            if (fout != NULL) {
-                fflush(fout);
-                fclose(fout);
-            }
-            if (fout != NULL) {
-                fflush(ferr);
-                fclose(ferr);
-            }
-
+            err = command(in, out, err, args, resultReceiver);
+            resultReceiver->send(err);
             return NO_ERROR;
         }
         default: { return BnStatsManager::onTransact(code, data, reply, flags); }
@@ -251,10 +230,6 @@
     if (!checkCallingPermission(String16(kPermissionDump))) {
         return PERMISSION_DENIED;
     }
-    FILE* out = fdopen(fd, "w");
-    if (out == NULL) {
-        return NO_MEMORY;  // the fd is already open
-    }
 
     bool verbose = false;
     bool proto = false;
@@ -265,21 +240,20 @@
         proto = true;
     }
 
-    dump_impl(out, verbose, proto);
+    dump_impl(fd, verbose, proto);
 
-    fclose(out);
     return NO_ERROR;
 }
 
 /**
  * Write debugging data about statsd in text or proto format.
  */
-void StatsService::dump_impl(FILE* out, bool verbose, bool proto) {
+void StatsService::dump_impl(int out, bool verbose, bool proto) {
     if (proto) {
         vector<uint8_t> data;
         StatsdStats::getInstance().dumpStats(&data, false); // does not reset statsdStats.
         for (size_t i = 0; i < data.size(); i ++) {
-            fprintf(out, "%c", data[i]);
+            dprintf(out, "%c", data[i]);
         }
     } else {
         StatsdStats::getInstance().dumpStats(out);
@@ -290,7 +264,8 @@
 /**
  * Implementation of the adb shell cmd stats command.
  */
-status_t StatsService::command(FILE* in, FILE* out, FILE* err, Vector<String8>& args) {
+status_t StatsService::command(int in, int out, int err, Vector<String8>& args,
+                               sp<IResultReceiver> resultReceiver) {
     uid_t uid = IPCThreadState::self()->getCallingUid();
     if (uid != AID_ROOT && uid != AID_SHELL) {
         return PERMISSION_DENIED;
@@ -342,97 +317,106 @@
         if (!args[0].compare(String8("print-logs"))) {
             return cmd_print_logs(out, args);
         }
+        if (!args[0].compare(String8("data-subscribe"))) {
+            if (mShellSubscriber == nullptr) {
+                mShellSubscriber = new ShellSubscriber(mUidMap);
+            }
+            mShellSubscriber->startNewSubscription(in, out, resultReceiver);
+            return NO_ERROR;
+        }
     }
 
     print_cmd_help(out);
     return NO_ERROR;
 }
 
-void StatsService::print_cmd_help(FILE* out) {
-    fprintf(out,
+void StatsService::print_cmd_help(int out) {
+    dprintf(out,
             "usage: adb shell cmd stats print-stats-log [tag_required] "
             "[timestamp_nsec_optional]\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats meminfo\n");
-    fprintf(out, "\n");
-    fprintf(out, "  Prints the malloc debug information. You need to run the following first: \n");
-    fprintf(out, "   # adb shell stop\n");
-    fprintf(out, "   # adb shell setprop libc.debug.malloc.program statsd \n");
-    fprintf(out, "   # adb shell setprop libc.debug.malloc.options backtrace \n");
-    fprintf(out, "   # adb shell start\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats print-uid-map [PKG]\n");
-    fprintf(out, "\n");
-    fprintf(out, "  Prints the UID, app name, version mapping.\n");
-    fprintf(out, "  PKG           Optional package name to print the uids of the package\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats pull-source [int] \n");
-    fprintf(out, "\n");
-    fprintf(out, "  Prints the output of a pulled metrics source (int indicates source)\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats write-to-disk \n");
-    fprintf(out, "\n");
-    fprintf(out, "  Flushes all data on memory to disk.\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats log-app-breadcrumb [UID] LABEL STATE\n");
-    fprintf(out, "  Writes an AppBreadcrumbReported event to the statslog buffer.\n");
-    fprintf(out, "  UID           The uid to use. It is only possible to pass a UID\n");
-    fprintf(out, "                parameter on eng builds. If UID is omitted the calling\n");
-    fprintf(out, "                uid is used.\n");
-    fprintf(out, "  LABEL         Integer in [0, 15], as per atoms.proto.\n");
-    fprintf(out, "  STATE         Integer in [0, 3], as per atoms.proto.\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats config remove [UID] [NAME]\n");
-    fprintf(out, "usage: adb shell cmd stats config update [UID] NAME\n");
-    fprintf(out, "\n");
-    fprintf(out, "  Adds, updates or removes a configuration. The proto should be in\n");
-    fprintf(out, "  wire-encoded protobuf format and passed via stdin. If no UID and name is\n");
-    fprintf(out, "  provided, then all configs will be removed from memory and disk.\n");
-    fprintf(out, "\n");
-    fprintf(out, "  UID           The uid to use. It is only possible to pass the UID\n");
-    fprintf(out, "                parameter on eng builds. If UID is omitted the calling\n");
-    fprintf(out, "                uid is used.\n");
-    fprintf(out, "  NAME          The per-uid name to use\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n              *Note: If both UID and NAME are omitted then all configs will\n");
-    fprintf(out, "\n                     be removed from memory and disk!\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats dump-report [UID] NAME [--include_current_bucket] [--proto]\n");
-    fprintf(out, "  Dump all metric data for a configuration.\n");
-    fprintf(out, "  UID           The uid of the configuration. It is only possible to pass\n");
-    fprintf(out, "                the UID parameter on eng builds. If UID is omitted the\n");
-    fprintf(out, "                calling uid is used.\n");
-    fprintf(out, "  NAME          The name of the configuration\n");
-    fprintf(out, "  --proto       Print proto binary.\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats send-broadcast [UID] NAME\n");
-    fprintf(out, "  Send a broadcast that triggers the subscriber to fetch metrics.\n");
-    fprintf(out, "  UID           The uid of the configuration. It is only possible to pass\n");
-    fprintf(out, "                the UID parameter on eng builds. If UID is omitted the\n");
-    fprintf(out, "                calling uid is used.\n");
-    fprintf(out, "  NAME          The name of the configuration\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats print-stats\n");
-    fprintf(out, "  Prints some basic stats.\n");
-    fprintf(out, "  --proto       Print proto binary instead of string format.\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats clear-puller-cache\n");
-    fprintf(out, "  Clear cached puller data.\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats print-logs\n");
-    fprintf(out, "      Only works on eng build\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats meminfo\n");
+    dprintf(out, "\n");
+    dprintf(out, "  Prints the malloc debug information. You need to run the following first: \n");
+    dprintf(out, "   # adb shell stop\n");
+    dprintf(out, "   # adb shell setprop libc.debug.malloc.program statsd \n");
+    dprintf(out, "   # adb shell setprop libc.debug.malloc.options backtrace \n");
+    dprintf(out, "   # adb shell start\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats print-uid-map [PKG]\n");
+    dprintf(out, "\n");
+    dprintf(out, "  Prints the UID, app name, version mapping.\n");
+    dprintf(out, "  PKG           Optional package name to print the uids of the package\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats pull-source [int] \n");
+    dprintf(out, "\n");
+    dprintf(out, "  Prints the output of a pulled metrics source (int indicates source)\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats write-to-disk \n");
+    dprintf(out, "\n");
+    dprintf(out, "  Flushes all data on memory to disk.\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats log-app-breadcrumb [UID] LABEL STATE\n");
+    dprintf(out, "  Writes an AppBreadcrumbReported event to the statslog buffer.\n");
+    dprintf(out, "  UID           The uid to use. It is only possible to pass a UID\n");
+    dprintf(out, "                parameter on eng builds. If UID is omitted the calling\n");
+    dprintf(out, "                uid is used.\n");
+    dprintf(out, "  LABEL         Integer in [0, 15], as per atoms.proto.\n");
+    dprintf(out, "  STATE         Integer in [0, 3], as per atoms.proto.\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats config remove [UID] [NAME]\n");
+    dprintf(out, "usage: adb shell cmd stats config update [UID] NAME\n");
+    dprintf(out, "\n");
+    dprintf(out, "  Adds, updates or removes a configuration. The proto should be in\n");
+    dprintf(out, "  wire-encoded protobuf format and passed via stdin. If no UID and name is\n");
+    dprintf(out, "  provided, then all configs will be removed from memory and disk.\n");
+    dprintf(out, "\n");
+    dprintf(out, "  UID           The uid to use. It is only possible to pass the UID\n");
+    dprintf(out, "                parameter on eng builds. If UID is omitted the calling\n");
+    dprintf(out, "                uid is used.\n");
+    dprintf(out, "  NAME          The per-uid name to use\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n              *Note: If both UID and NAME are omitted then all configs will\n");
+    dprintf(out, "\n                     be removed from memory and disk!\n");
+    dprintf(out, "\n");
+    dprintf(out,
+            "usage: adb shell cmd stats dump-report [UID] NAME [--include_current_bucket] "
+            "[--proto]\n");
+    dprintf(out, "  Dump all metric data for a configuration.\n");
+    dprintf(out, "  UID           The uid of the configuration. It is only possible to pass\n");
+    dprintf(out, "                the UID parameter on eng builds. If UID is omitted the\n");
+    dprintf(out, "                calling uid is used.\n");
+    dprintf(out, "  NAME          The name of the configuration\n");
+    dprintf(out, "  --proto       Print proto binary.\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats send-broadcast [UID] NAME\n");
+    dprintf(out, "  Send a broadcast that triggers the subscriber to fetch metrics.\n");
+    dprintf(out, "  UID           The uid of the configuration. It is only possible to pass\n");
+    dprintf(out, "                the UID parameter on eng builds. If UID is omitted the\n");
+    dprintf(out, "                calling uid is used.\n");
+    dprintf(out, "  NAME          The name of the configuration\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats print-stats\n");
+    dprintf(out, "  Prints some basic stats.\n");
+    dprintf(out, "  --proto       Print proto binary instead of string format.\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats clear-puller-cache\n");
+    dprintf(out, "  Clear cached puller data.\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats print-logs\n");
+    dprintf(out, "      Only works on eng build\n");
 }
 
-status_t StatsService::cmd_trigger_broadcast(FILE* out, Vector<String8>& args) {
+status_t StatsService::cmd_trigger_broadcast(int out, Vector<String8>& args) {
     string name;
     bool good = false;
     int uid;
@@ -456,9 +440,9 @@
                 }
             }
         } else {
-            fprintf(out,
+            dprintf(out,
                     "The metrics can only be dumped for other UIDs on eng or userdebug "
-                            "builds.\n");
+                    "builds.\n");
         }
     }
     if (!good) {
@@ -481,7 +465,7 @@
     return NO_ERROR;
 }
 
-status_t StatsService::cmd_config(FILE* in, FILE* out, FILE* err, Vector<String8>& args) {
+status_t StatsService::cmd_config(int in, int out, int err, Vector<String8>& args) {
     const int argCount = args.size();
     if (argCount >= 2) {
         if (args[1] == "update" || args[1] == "remove") {
@@ -508,7 +492,7 @@
                         }
                     }
                 } else {
-                    fprintf(err,
+                    dprintf(err,
                             "The config can only be set for other UIDs on eng or userdebug "
                             "builds.\n");
                 }
@@ -526,21 +510,21 @@
                 char* endp;
                 int64_t configID = strtoll(name.c_str(), &endp, 10);
                 if (endp == name.c_str() || *endp != '\0') {
-                    fprintf(err, "Error parsing config ID.\n");
+                    dprintf(err, "Error parsing config ID.\n");
                     return UNKNOWN_ERROR;
                 }
 
                 // Read stream into buffer.
                 string buffer;
-                if (!android::base::ReadFdToString(fileno(in), &buffer)) {
-                    fprintf(err, "Error reading stream for StatsConfig.\n");
+                if (!android::base::ReadFdToString(in, &buffer)) {
+                    dprintf(err, "Error reading stream for StatsConfig.\n");
                     return UNKNOWN_ERROR;
                 }
 
                 // Parse buffer.
                 StatsdConfig config;
                 if (!config.ParseFromString(buffer)) {
-                    fprintf(err, "Error parsing proto stream for StatsConfig.\n");
+                    dprintf(err, "Error parsing proto stream for StatsConfig.\n");
                     return UNKNOWN_ERROR;
                 }
 
@@ -562,7 +546,7 @@
     return UNKNOWN_ERROR;
 }
 
-status_t StatsService::cmd_dump_report(FILE* out, FILE* err, const Vector<String8>& args) {
+status_t StatsService::cmd_dump_report(int out, int err, const Vector<String8>& args) {
     if (mProcessor != nullptr) {
         int argCount = args.size();
         bool good = false;
@@ -597,7 +581,7 @@
                     }
                 }
             } else {
-                fprintf(out,
+                dprintf(out,
                         "The metrics can only be dumped for other UIDs on eng or userdebug "
                         "builds.\n");
             }
@@ -608,11 +592,11 @@
                                      includeCurrentBucket, ADB_DUMP, &data);
             if (proto) {
                 for (size_t i = 0; i < data.size(); i ++) {
-                    fprintf(out, "%c", data[i]);
+                    dprintf(out, "%c", data[i]);
                 }
             } else {
-                fprintf(out, "Dump report for Config [%d,%s]\n", uid, name.c_str());
-                fprintf(out, "See the StatsLogReport in logcat...\n");
+                dprintf(out, "Dump report for Config [%d,%s]\n", uid, name.c_str());
+                dprintf(out, "See the StatsLogReport in logcat...\n");
             }
             return android::OK;
         } else {
@@ -621,12 +605,12 @@
             return UNKNOWN_ERROR;
         }
     } else {
-        fprintf(out, "Log processor does not exist...\n");
+        dprintf(out, "Log processor does not exist...\n");
         return UNKNOWN_ERROR;
     }
 }
 
-status_t StatsService::cmd_print_stats(FILE* out, const Vector<String8>& args) {
+status_t StatsService::cmd_print_stats(int out, const Vector<String8>& args) {
     int argCount = args.size();
     bool proto = false;
     if (!std::strcmp("--proto", args[argCount-1].c_str())) {
@@ -638,13 +622,13 @@
         vector<uint8_t> data;
         statsdStats.dumpStats(&data, false); // does not reset statsdStats.
         for (size_t i = 0; i < data.size(); i ++) {
-            fprintf(out, "%c", data[i]);
+            dprintf(out, "%c", data[i]);
         }
 
     } else {
         vector<ConfigKey> configs = mConfigManager->GetAllConfigKeys();
         for (const ConfigKey& key : configs) {
-            fprintf(out, "Config %s uses %zu bytes\n", key.ToString().c_str(),
+            dprintf(out, "Config %s uses %zu bytes\n", key.ToString().c_str(),
                     mProcessor->GetMetricsSize(key));
         }
         statsdStats.dumpStats(out);
@@ -652,29 +636,29 @@
     return NO_ERROR;
 }
 
-status_t StatsService::cmd_print_uid_map(FILE* out, const Vector<String8>& args) {
+status_t StatsService::cmd_print_uid_map(int out, const Vector<String8>& args) {
     if (args.size() > 1) {
         string pkg;
         pkg.assign(args[1].c_str(), args[1].size());
         auto uids = mUidMap->getAppUid(pkg);
-        fprintf(out, "%s -> [ ", pkg.c_str());
+        dprintf(out, "%s -> [ ", pkg.c_str());
         for (const auto& uid : uids) {
-            fprintf(out, "%d ", uid);
+            dprintf(out, "%d ", uid);
         }
-        fprintf(out, "]\n");
+        dprintf(out, "]\n");
     } else {
         mUidMap->printUidMap(out);
     }
     return NO_ERROR;
 }
 
-status_t StatsService::cmd_write_data_to_disk(FILE* out) {
-    fprintf(out, "Writing data to disk\n");
+status_t StatsService::cmd_write_data_to_disk(int out) {
+    dprintf(out, "Writing data to disk\n");
     mProcessor->WriteDataToDisk(ADB_DUMP);
     return NO_ERROR;
 }
 
-status_t StatsService::cmd_log_app_breadcrumb(FILE* out, const Vector<String8>& args) {
+status_t StatsService::cmd_log_app_breadcrumb(int out, const Vector<String8>& args) {
     bool good = false;
     int32_t uid;
     int32_t label;
@@ -695,13 +679,13 @@
             state = atoi(args[3].c_str());
             good = true;
         } else {
-            fprintf(out,
+            dprintf(out,
                     "Selecting a UID for writing AppBreadcrumb can only be done for other UIDs "
-                            "on eng or userdebug builds.\n");
+                    "on eng or userdebug builds.\n");
         }
     }
     if (good) {
-        fprintf(out, "Logging AppBreadcrumbReported(%d, %d, %d) to statslog.\n", uid, label, state);
+        dprintf(out, "Logging AppBreadcrumbReported(%d, %d, %d) to statslog.\n", uid, label, state);
         android::util::stats_write(android::util::APP_BREADCRUMB_REPORTED, uid, label, state);
     } else {
         print_cmd_help(out);
@@ -710,46 +694,46 @@
     return NO_ERROR;
 }
 
-status_t StatsService::cmd_print_pulled_metrics(FILE* out, const Vector<String8>& args) {
+status_t StatsService::cmd_print_pulled_metrics(int out, const Vector<String8>& args) {
     int s = atoi(args[1].c_str());
     vector<shared_ptr<LogEvent> > stats;
     if (mPullerManager->Pull(s, getElapsedRealtimeNs(), &stats)) {
         for (const auto& it : stats) {
-            fprintf(out, "Pull from %d: %s\n", s, it->ToString().c_str());
+            dprintf(out, "Pull from %d: %s\n", s, it->ToString().c_str());
         }
-        fprintf(out, "Pull from %d: Received %zu elements\n", s, stats.size());
+        dprintf(out, "Pull from %d: Received %zu elements\n", s, stats.size());
         return NO_ERROR;
     }
     return UNKNOWN_ERROR;
 }
 
-status_t StatsService::cmd_remove_all_configs(FILE* out) {
-    fprintf(out, "Removing all configs...\n");
+status_t StatsService::cmd_remove_all_configs(int out) {
+    dprintf(out, "Removing all configs...\n");
     VLOG("StatsService::cmd_remove_all_configs was called");
     mConfigManager->RemoveAllConfigs();
     StorageManager::deleteAllFiles(STATS_SERVICE_DIR);
     return NO_ERROR;
 }
 
-status_t StatsService::cmd_dump_memory_info(FILE* out) {
-    fprintf(out, "meminfo not available.\n");
+status_t StatsService::cmd_dump_memory_info(int out) {
+    dprintf(out, "meminfo not available.\n");
     return NO_ERROR;
 }
 
-status_t StatsService::cmd_clear_puller_cache(FILE* out) {
+status_t StatsService::cmd_clear_puller_cache(int out) {
     IPCThreadState* ipc = IPCThreadState::self();
     VLOG("StatsService::cmd_clear_puller_cache with Pid %i, Uid %i",
             ipc->getCallingPid(), ipc->getCallingUid());
     if (checkCallingPermission(String16(kPermissionDump))) {
         int cleared = mPullerManager->ForceClearPullerCache();
-        fprintf(out, "Puller removed %d cached data!\n", cleared);
+        dprintf(out, "Puller removed %d cached data!\n", cleared);
         return NO_ERROR;
     } else {
         return PERMISSION_DENIED;
     }
 }
 
-status_t StatsService::cmd_print_logs(FILE* out, const Vector<String8>& args) {
+status_t StatsService::cmd_print_logs(int out, const Vector<String8>& args) {
     IPCThreadState* ipc = IPCThreadState::self();
     VLOG("StatsService::cmd_print_logs with Pid %i, Uid %i", ipc->getCallingPid(),
          ipc->getCallingUid());
@@ -885,6 +869,9 @@
 
 void StatsService::OnLogEvent(LogEvent* event) {
     mProcessor->OnLogEvent(event);
+    if (mShellSubscriber != nullptr) {
+        mShellSubscriber->onLogEvent(*event);
+    }
 }
 
 Status StatsService::getData(int64_t key, const String16& packageName, vector<uint8_t>* output) {
diff --git a/cmds/statsd/src/StatsService.h b/cmds/statsd/src/StatsService.h
index 613f509..0618927 100644
--- a/cmds/statsd/src/StatsService.h
+++ b/cmds/statsd/src/StatsService.h
@@ -24,6 +24,7 @@
 #include "external/StatsPullerManager.h"
 #include "logd/LogListener.h"
 #include "packages/UidMap.h"
+#include "shell/ShellSubscriber.h"
 #include "statscompanion_util.h"
 
 #include <android/os/BnStatsManager.h>
@@ -54,7 +55,8 @@
 
     virtual status_t onTransact(uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags);
     virtual status_t dump(int fd, const Vector<String16>& args);
-    virtual status_t command(FILE* in, FILE* out, FILE* err, Vector<String8>& args);
+    virtual status_t command(int inFd, int outFd, int err, Vector<String8>& args,
+                             sp<IResultReceiver> resultReceiver);
 
     virtual Status systemRunning();
     virtual Status statsCompanionReady();
@@ -162,73 +164,73 @@
     /**
      * Text or proto output of dumpsys.
      */
-    void dump_impl(FILE* out, bool verbose, bool proto);
+    void dump_impl(int outFd, bool verbose, bool proto);
 
     /**
      * Print usage information for the commands
      */
-    void print_cmd_help(FILE* out);
+    void print_cmd_help(int out);
 
     /**
      * Trigger a broadcast.
      */
-    status_t cmd_trigger_broadcast(FILE* out, Vector<String8>& args);
+    status_t cmd_trigger_broadcast(int outFd, Vector<String8>& args);
 
     /**
      * Handle the config sub-command.
      */
-    status_t cmd_config(FILE* in, FILE* out, FILE* err, Vector<String8>& args);
+    status_t cmd_config(int inFd, int outFd, int err, Vector<String8>& args);
 
     /**
      * Prints some basic stats to std out.
      */
-    status_t cmd_print_stats(FILE* out, const Vector<String8>& args);
+    status_t cmd_print_stats(int outFd, const Vector<String8>& args);
 
     /**
      * Print the event log.
      */
-    status_t cmd_dump_report(FILE* out, FILE* err, const Vector<String8>& args);
+    status_t cmd_dump_report(int outFd, int err, const Vector<String8>& args);
 
     /**
      * Print the mapping of uids to package names.
      */
-    status_t cmd_print_uid_map(FILE* out, const Vector<String8>& args);
+    status_t cmd_print_uid_map(int outFd, const Vector<String8>& args);
 
     /**
      * Flush the data to disk.
      */
-    status_t cmd_write_data_to_disk(FILE* out);
+    status_t cmd_write_data_to_disk(int outFd);
 
     /**
      * Write an AppBreadcrumbReported event to the StatsLog buffer, as if calling
      * StatsLog.write(APP_BREADCRUMB_REPORTED).
      */
-    status_t cmd_log_app_breadcrumb(FILE* out, const Vector<String8>& args);
+    status_t cmd_log_app_breadcrumb(int outFd, const Vector<String8>& args);
 
     /**
      * Print contents of a pulled metrics source.
      */
-    status_t cmd_print_pulled_metrics(FILE* out, const Vector<String8>& args);
+    status_t cmd_print_pulled_metrics(int outFd, const Vector<String8>& args);
 
     /**
      * Removes all configs stored on disk and on memory.
      */
-    status_t cmd_remove_all_configs(FILE* out);
+    status_t cmd_remove_all_configs(int outFd);
 
     /*
      * Dump memory usage by statsd.
      */
-    status_t cmd_dump_memory_info(FILE* out);
+    status_t cmd_dump_memory_info(int outFd);
 
     /*
      * Clear all puller cached data
      */
-    status_t cmd_clear_puller_cache(FILE* out);
+    status_t cmd_clear_puller_cache(int outFd);
 
     /**
      * Print all stats logs received to logcat.
      */
-    status_t cmd_print_logs(FILE* out, const Vector<String8>& args);
+    status_t cmd_print_logs(int outFd, const Vector<String8>& args);
 
     /**
      * Adds a configuration after checking permissions and obtaining UID from binder call.
@@ -275,6 +277,8 @@
      */
     bool mEngBuild;
 
+    sp<ShellSubscriber> mShellSubscriber;
+
     FRIEND_TEST(StatsServiceTest, TestAddConfig_simple);
     FRIEND_TEST(StatsServiceTest, TestAddConfig_empty);
     FRIEND_TEST(StatsServiceTest, TestAddConfig_invalid);
diff --git a/cmds/statsd/src/guardrail/StatsdStats.cpp b/cmds/statsd/src/guardrail/StatsdStats.cpp
index 33f3917..bf0bfec 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.cpp
+++ b/cmds/statsd/src/guardrail/StatsdStats.cpp
@@ -400,36 +400,35 @@
     return string(timeBuffer);
 }
 
-void StatsdStats::dumpStats(FILE* out) const {
+void StatsdStats::dumpStats(int out) const {
     lock_guard<std::mutex> lock(mLock);
     time_t t = mStartTimeSec;
     struct tm* tm = localtime(&t);
     char timeBuffer[80];
     strftime(timeBuffer, sizeof(timeBuffer), "%Y-%m-%d %I:%M%p\n", tm);
-    fprintf(out, "Stats collection start second: %s\n", timeBuffer);
-    fprintf(out, "%lu Config in icebox: \n", (unsigned long)mIceBox.size());
+    dprintf(out, "Stats collection start second: %s\n", timeBuffer);
+    dprintf(out, "%lu Config in icebox: \n", (unsigned long)mIceBox.size());
     for (const auto& configStats : mIceBox) {
-        fprintf(out,
+        dprintf(out,
                 "Config {%d_%lld}: creation=%d, deletion=%d, reset=%d, #metric=%d, #condition=%d, "
                 "#matcher=%d, #alert=%d,  valid=%d\n",
                 configStats->uid, (long long)configStats->id, configStats->creation_time_sec,
                 configStats->deletion_time_sec, configStats->reset_time_sec,
-                configStats->metric_count,
-                configStats->condition_count, configStats->matcher_count, configStats->alert_count,
-                configStats->is_valid);
+                configStats->metric_count, configStats->condition_count, configStats->matcher_count,
+                configStats->alert_count, configStats->is_valid);
 
         for (const auto& broadcastTime : configStats->broadcast_sent_time_sec) {
-            fprintf(out, "\tbroadcast time: %d\n", broadcastTime);
+            dprintf(out, "\tbroadcast time: %d\n", broadcastTime);
         }
 
         for (const auto& dataDropTime : configStats->data_drop_time_sec) {
-            fprintf(out, "\tdata drop time: %d\n", dataDropTime);
+            dprintf(out, "\tdata drop time: %d\n", dataDropTime);
         }
     }
-    fprintf(out, "%lu Active Configs\n", (unsigned long)mConfigStats.size());
+    dprintf(out, "%lu Active Configs\n", (unsigned long)mConfigStats.size());
     for (auto& pair : mConfigStats) {
         auto& configStats = pair.second;
-        fprintf(out,
+        dprintf(out,
                 "Config {%d-%lld}: creation=%d, deletion=%d, #metric=%d, #condition=%d, "
                 "#matcher=%d, #alert=%d,  valid=%d\n",
                 configStats->uid, (long long)configStats->id, configStats->creation_time_sec,
@@ -437,81 +436,81 @@
                 configStats->condition_count, configStats->matcher_count, configStats->alert_count,
                 configStats->is_valid);
         for (const auto& annotation : configStats->annotations) {
-            fprintf(out, "\tannotation: %lld, %d\n", (long long)annotation.first,
+            dprintf(out, "\tannotation: %lld, %d\n", (long long)annotation.first,
                     annotation.second);
         }
 
         for (const auto& broadcastTime : configStats->broadcast_sent_time_sec) {
-            fprintf(out, "\tbroadcast time: %s(%lld)\n",
-                    buildTimeString(broadcastTime).c_str(), (long long)broadcastTime);
+            dprintf(out, "\tbroadcast time: %s(%lld)\n", buildTimeString(broadcastTime).c_str(),
+                    (long long)broadcastTime);
         }
 
         for (const auto& dataDropTime : configStats->data_drop_time_sec) {
-            fprintf(out, "\tdata drop time: %s(%lld)\n",
-                    buildTimeString(dataDropTime).c_str(), (long long)dataDropTime);
+            dprintf(out, "\tdata drop time: %s(%lld)\n", buildTimeString(dataDropTime).c_str(),
+                    (long long)dataDropTime);
         }
 
         for (const auto& dump : configStats->dump_report_stats) {
-            fprintf(out, "\tdump report time: %s(%lld) bytes: %lld\n",
+            dprintf(out, "\tdump report time: %s(%lld) bytes: %lld\n",
                     buildTimeString(dump.first).c_str(), (long long)dump.first,
                     (long long)dump.second);
         }
 
         for (const auto& stats : pair.second->matcher_stats) {
-            fprintf(out, "matcher %lld matched %d times\n", (long long)stats.first, stats.second);
+            dprintf(out, "matcher %lld matched %d times\n", (long long)stats.first, stats.second);
         }
 
         for (const auto& stats : pair.second->condition_stats) {
-            fprintf(out, "condition %lld max output tuple size %d\n", (long long)stats.first,
+            dprintf(out, "condition %lld max output tuple size %d\n", (long long)stats.first,
                     stats.second);
         }
 
         for (const auto& stats : pair.second->condition_stats) {
-            fprintf(out, "metrics %lld max output tuple size %d\n", (long long)stats.first,
+            dprintf(out, "metrics %lld max output tuple size %d\n", (long long)stats.first,
                     stats.second);
         }
 
         for (const auto& stats : pair.second->alert_stats) {
-            fprintf(out, "alert %lld declared %d times\n", (long long)stats.first, stats.second);
+            dprintf(out, "alert %lld declared %d times\n", (long long)stats.first, stats.second);
         }
     }
-    fprintf(out, "********Disk Usage stats***********\n");
+    dprintf(out, "********Disk Usage stats***********\n");
     StorageManager::printStats(out);
-    fprintf(out, "********Pushed Atom stats***********\n");
+    dprintf(out, "********Pushed Atom stats***********\n");
     const size_t atomCounts = mPushedAtomStats.size();
     for (size_t i = 2; i < atomCounts; i++) {
         if (mPushedAtomStats[i] > 0) {
-            fprintf(out, "Atom %lu->%d\n", (unsigned long)i, mPushedAtomStats[i]);
+            dprintf(out, "Atom %lu->%d\n", (unsigned long)i, mPushedAtomStats[i]);
         }
     }
 
-    fprintf(out, "********Pulled Atom stats***********\n");
+    dprintf(out, "********Pulled Atom stats***********\n");
     for (const auto& pair : mPulledAtomStats) {
-        fprintf(out, "Atom %d->%ld, %ld, %ld\n", (int)pair.first, (long)pair.second.totalPull,
+        dprintf(out, "Atom %d->%ld, %ld, %ld\n", (int)pair.first, (long)pair.second.totalPull,
                 (long)pair.second.totalPullFromCache, (long)pair.second.minPullIntervalSec);
     }
 
     if (mAnomalyAlarmRegisteredStats > 0) {
-        fprintf(out, "********AnomalyAlarmStats stats***********\n");
-        fprintf(out, "Anomaly alarm registrations: %d\n", mAnomalyAlarmRegisteredStats);
+        dprintf(out, "********AnomalyAlarmStats stats***********\n");
+        dprintf(out, "Anomaly alarm registrations: %d\n", mAnomalyAlarmRegisteredStats);
     }
 
     if (mPeriodicAlarmRegisteredStats > 0) {
-        fprintf(out, "********SubscriberAlarmStats stats***********\n");
-        fprintf(out, "Subscriber alarm registrations: %d\n", mPeriodicAlarmRegisteredStats);
+        dprintf(out, "********SubscriberAlarmStats stats***********\n");
+        dprintf(out, "Subscriber alarm registrations: %d\n", mPeriodicAlarmRegisteredStats);
     }
 
-    fprintf(out, "UID map stats: bytes=%d, changes=%d, deleted=%d, changes lost=%d\n",
+    dprintf(out, "UID map stats: bytes=%d, changes=%d, deleted=%d, changes lost=%d\n",
             mUidMapStats.bytes_used, mUidMapStats.changes, mUidMapStats.deleted_apps,
             mUidMapStats.dropped_changes);
 
     for (const auto& restart : mSystemServerRestartSec) {
-        fprintf(out, "System server restarts at %s(%lld)\n",
-            buildTimeString(restart).c_str(), (long long)restart);
+        dprintf(out, "System server restarts at %s(%lld)\n", buildTimeString(restart).c_str(),
+                (long long)restart);
     }
 
     for (const auto& loss : mLogLossStats) {
-        fprintf(out, "Log loss: %lld (wall clock sec) - %d (count)\n", (long long)loss.first,
+        dprintf(out, "Log loss: %lld (wall clock sec) - %d (count)\n", (long long)loss.first,
                 loss.second);
     }
 }
diff --git a/cmds/statsd/src/guardrail/StatsdStats.h b/cmds/statsd/src/guardrail/StatsdStats.h
index b5156da..a8188c8 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.h
+++ b/cmds/statsd/src/guardrail/StatsdStats.h
@@ -156,7 +156,7 @@
      * Report a config has been removed.
      */
     void noteConfigRemoved(const ConfigKey& key);
-   /**
+    /**
      * Report a config has been reset when ttl expires.
      */
     void noteConfigReset(const ConfigKey& key);
@@ -202,7 +202,6 @@
      */
     void noteMetricDimensionSize(const ConfigKey& key, const int64_t& id, int size);
 
-
     /**
      * Report the max size of output tuple of dimension in condition across dimensions in what.
      *
@@ -272,8 +271,8 @@
     void notePullFromCache(int pullAtomId);
 
     /*
-    * Records when system server restarts.
-    */
+     * Records when system server restarts.
+     */
     void noteSystemServerRestart(int32_t timeSec);
 
     /**
@@ -296,9 +295,9 @@
     void dumpStats(std::vector<uint8_t>* buffer, bool reset);
 
     /**
-     * Output statsd stats in human readable format to [out] file.
+     * Output statsd stats in human readable format to [out] file descriptor.
      */
-    void dumpStats(FILE* out) const;
+    void dumpStats(int outFd) const;
 
     typedef struct {
         long totalPull;
diff --git a/cmds/statsd/src/packages/UidMap.cpp b/cmds/statsd/src/packages/UidMap.cpp
index be94725..4325f0f 100644
--- a/cmds/statsd/src/packages/UidMap.cpp
+++ b/cmds/statsd/src/packages/UidMap.cpp
@@ -386,12 +386,12 @@
     StatsdStats::getInstance().setUidMapChanges(mChanges.size());
 }
 
-void UidMap::printUidMap(FILE* out) const {
+void UidMap::printUidMap(int out) const {
     lock_guard<mutex> lock(mMutex);
 
     for (const auto& kv : mMap) {
         if (!kv.second.deleted) {
-            fprintf(out, "%s, v%" PRId64 " (%i)\n", kv.first.second.c_str(), kv.second.versionCode,
+            dprintf(out, "%s, v%" PRId64 " (%i)\n", kv.first.second.c_str(), kv.second.versionCode,
                     kv.first.first);
         }
     }
diff --git a/cmds/statsd/src/packages/UidMap.h b/cmds/statsd/src/packages/UidMap.h
index 91f2030..4598369 100644
--- a/cmds/statsd/src/packages/UidMap.h
+++ b/cmds/statsd/src/packages/UidMap.h
@@ -103,7 +103,7 @@
 
     // Helper for debugging contents of this uid map. Can be triggered with:
     // adb shell cmd stats print-uid-map
-    void printUidMap(FILE* out) const;
+    void printUidMap(int outFd) const;
 
     // Commands for indicating to the map that a producer should be notified if an app is updated.
     // This allows the metric producer to distinguish when the same uid or app represents a
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
new file mode 100644
index 0000000..3cd49d7
--- /dev/null
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -0,0 +1,140 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define DEBUG true  // STOPSHIP if true
+#include "Log.h"
+
+#include "ShellSubscriber.h"
+
+#include "matchers/matcher_util.h"
+
+#include <android-base/file.h>
+
+using android::util::ProtoOutputStream;
+
+namespace android {
+namespace os {
+namespace statsd {
+
+void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver) {
+    VLOG("start new shell subscription");
+    {
+        std::lock_guard<std::mutex> lock(mMutex);
+        if (mResultReceiver != nullptr) {
+            VLOG("Only one shell subscriber is allowed.");
+            return;
+        }
+        mInput = in;
+        mOutput = out;
+        mResultReceiver = resultReceiver;
+        IInterface::asBinder(mResultReceiver)->linkToDeath(this);
+    }
+
+    // Spawn another thread to read the config updates from the input file descriptor
+    std::thread reader([in, this] { readConfig(in); });
+    reader.detach();
+
+    std::unique_lock<std::mutex> lk(mMutex);
+
+    mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
+    if (reader.joinable()) {
+        reader.join();
+    }
+}
+
+void ShellSubscriber::updateConfig(const ShellSubscription& config) {
+    std::lock_guard<std::mutex> lock(mMutex);
+    mPushedMatchers.clear();
+    for (const auto& pushed : config.pushed()) {
+        mPushedMatchers.push_back(pushed);
+        VLOG("adding matcher for atom %d", pushed.atom_id());
+    }
+}
+
+void ShellSubscriber::readConfig(int in) {
+    if (in <= 0) {
+        return;
+    }
+
+    while (1) {
+        size_t bufferSize = 0;
+        int result = 0;
+        if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) {
+            VLOG("Done reading");
+            break;
+        } else if (result < 0 || result != sizeof(bufferSize)) {
+            ALOGE("Error reading config size");
+            break;
+        }
+
+        vector<uint8_t> buffer(bufferSize);
+        if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) {
+            ShellSubscription config;
+            if (config.ParseFromArray(buffer.data(), bufferSize)) {
+                updateConfig(config);
+            } else {
+                ALOGE("error parsing the config");
+                break;
+            }
+        } else {
+            VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize);
+            break;
+        }
+    }
+}
+
+void ShellSubscriber::cleanUpLocked() {
+    // The file descriptors will be closed by binder.
+    mInput = 0;
+    mOutput = 0;
+    mResultReceiver = nullptr;
+    mPushedMatchers.clear();
+    VLOG("done clean up");
+}
+
+void ShellSubscriber::onLogEvent(const LogEvent& event) {
+    std::lock_guard<std::mutex> lock(mMutex);
+
+    if (mOutput <= 0) {
+        return;
+    }
+
+    for (const auto& matcher : mPushedMatchers) {
+        if (matchesSimple(*mUidMap, matcher, event)) {
+            // First write the payload size.
+            size_t bufferSize = mProto.size();
+            write(mOutput, &bufferSize, sizeof(bufferSize));
+
+            // Then write the payload.
+            event.ToProto(mProto);
+            mProto.flush(mOutput);
+            mProto.clear();
+            break;
+        }
+    }
+}
+
+void ShellSubscriber::binderDied(const wp<IBinder>& who) {
+    {
+        VLOG("Shell exits");
+        std::lock_guard<std::mutex> lock(mMutex);
+        cleanUpLocked();
+    }
+    mShellDied.notify_all();
+}
+
+}  // namespace statsd
+}  // namespace os
+}  // namespace android
\ No newline at end of file
diff --git a/cmds/statsd/src/shell/ShellSubscriber.h b/cmds/statsd/src/shell/ShellSubscriber.h
new file mode 100644
index 0000000..0ace35f
--- /dev/null
+++ b/cmds/statsd/src/shell/ShellSubscriber.h
@@ -0,0 +1,100 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "logd/LogEvent.h"
+
+#include <android/util/ProtoOutputStream.h>
+#include <binder/IResultReceiver.h>
+#include <condition_variable>
+#include <mutex>
+#include <string>
+#include <thread>
+#include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h"
+#include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
+#include "packages/UidMap.h"
+
+namespace android {
+namespace os {
+namespace statsd {
+
+/**
+ * Handles atoms subscription via shell cmd.
+ *
+ * A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client
+ * communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms.
+ * The atoms are sent back to the client in real time, as opposed to
+ * keeping the data in memory. Shell clients do not subscribe aggregated metrics, as they are
+ * responsible for doing the aggregation after receiving the atom events.
+ *
+ * Shell client pass ShellSubscription in the proto binary format. Client can update the
+ * subscription by sending a new subscription. The new subscription would replace the old one.
+ * Input data stream format is:
+ *
+ * |size_t|subscription proto|size_t|subscription proto|....
+ *
+ * statsd sends the events back in Atom proto binary format. Each Atom message is preceded
+ * with sizeof(size_t) bytes indicating the size of the proto message payload.
+ *
+ * The stream would be in the following format:
+ * |size_t|atom1 proto|size_t|atom2 proto|....
+ *
+ * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread
+ * until it exits.
+ */
+class ShellSubscriber : public virtual IBinder::DeathRecipient {
+public:
+    ShellSubscriber(sp<UidMap> uidMap) : mUidMap(uidMap){};
+
+    /**
+     * Start a new subscription.
+     */
+    void startNewSubscription(int inFd, int outFd, sp<IResultReceiver> resultReceiver);
+
+    void binderDied(const wp<IBinder>& who);
+
+    void onLogEvent(const LogEvent& event);
+
+private:
+    void readConfig(int in);
+
+    void updateConfig(const ShellSubscription& config);
+
+    void cleanUpLocked();
+
+    sp<UidMap> mUidMap;
+
+    // bool mWritten = false;
+
+    android::util::ProtoOutputStream mProto;
+
+    mutable std::mutex mMutex;
+
+    std::condition_variable mShellDied;  // semaphore for waiting until shell exits.
+
+    int mInput;  // The input file descriptor
+
+    int mOutput;  // The output file descriptor
+
+    sp<IResultReceiver> mResultReceiver;
+
+    std::vector<SimpleAtomMatcher> mPushedMatchers;
+};
+
+}  // namespace statsd
+}  // namespace os
+}  // namespace android
diff --git a/cmds/statsd/src/shell/shell_config.proto b/cmds/statsd/src/shell/shell_config.proto
new file mode 100644
index 0000000..516693d
--- /dev/null
+++ b/cmds/statsd/src/shell/shell_config.proto
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+package android.os.statsd;
+
+option java_package = "com.android.os";
+option java_outer_classname = "ShellConfig";
+
+import "frameworks/base/cmds/statsd/src/statsd_config.proto";
+
+message PulledAtomSubscription {
+    optional int32 atom_id = 1;
+
+    /* gap between two pulls in milliseconds */
+    optional int32 freq_millis = 2;
+}
+
+message ShellSubscription {
+    repeated SimpleAtomMatcher pushed = 1;
+    repeated PulledAtomSubscription pulled = 2;
+}
\ No newline at end of file
diff --git a/cmds/statsd/src/storage/StorageManager.cpp b/cmds/statsd/src/storage/StorageManager.cpp
index 3ebc8a4..2f19a02 100644
--- a/cmds/statsd/src/storage/StorageManager.cpp
+++ b/cmds/statsd/src/storage/StorageManager.cpp
@@ -392,13 +392,13 @@
     }
 }
 
-void StorageManager::printStats(FILE* out) {
-    printDirStats(out, STATS_SERVICE_DIR);
-    printDirStats(out, STATS_DATA_DIR);
+void StorageManager::printStats(int outFd) {
+    printDirStats(outFd, STATS_SERVICE_DIR);
+    printDirStats(outFd, STATS_DATA_DIR);
 }
 
-void StorageManager::printDirStats(FILE* out, const char* path) {
-    fprintf(out, "Printing stats of %s\n", path);
+void StorageManager::printDirStats(int outFd, const char* path) {
+    dprintf(outFd, "Printing stats of %s\n", path);
     unique_ptr<DIR, decltype(&closedir)> dir(opendir(path), closedir);
     if (dir == NULL) {
         VLOG("Path %s does not exist", path);
@@ -418,25 +418,22 @@
         int64_t timestamp = result[0];
         int64_t uid = result[1];
         int64_t configID = result[2];
-        fprintf(out, "\t #%d, Last updated: %lld, UID: %d, Config ID: %lld",
-                fileCount + 1,
-                (long long)timestamp,
-                (int)uid,
-                (long long)configID);
+        dprintf(outFd, "\t #%d, Last updated: %lld, UID: %d, Config ID: %lld", fileCount + 1,
+                (long long)timestamp, (int)uid, (long long)configID);
         string file_name = getFilePath(path, timestamp, uid, configID);
         ifstream file(file_name.c_str(), ifstream::in | ifstream::binary);
         if (file.is_open()) {
             file.seekg(0, ios::end);
             int fileSize = file.tellg();
             file.close();
-            fprintf(out, ", File Size: %d bytes", fileSize);
+            dprintf(outFd, ", File Size: %d bytes", fileSize);
             totalFileSize += fileSize;
         }
-        fprintf(out, "\n");
+        dprintf(outFd, "\n");
         fileCount++;
     }
-    fprintf(out, "\tTotal number of files: %d, Total size of files: %d bytes.\n",
-            fileCount, totalFileSize);
+    dprintf(outFd, "\tTotal number of files: %d, Total size of files: %d bytes.\n", fileCount,
+            totalFileSize);
 }
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/storage/StorageManager.h b/cmds/statsd/src/storage/StorageManager.h
index 4840f3c..8fbc89e 100644
--- a/cmds/statsd/src/storage/StorageManager.h
+++ b/cmds/statsd/src/storage/StorageManager.h
@@ -100,13 +100,13 @@
     /**
      * Prints disk usage statistics related to statsd.
      */
-    static void printStats(FILE* out);
+    static void printStats(int out);
 
 private:
     /**
      * Prints disk usage statistics about a directory related to statsd.
      */
-    static void printDirStats(FILE* out, const char* path);
+    static void printDirStats(int out, const char* path);
 };
 
 }  // namespace statsd