diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index b764ce5..abd2a35 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -50,8 +50,8 @@
 const int FIELD_ID_NAME = 2;
 
 StatsLogProcessor::StatsLogProcessor(const sp<UidMap>& uidMap,
-                                     const std::function<void(const vector<uint8_t>&)>& pushLog)
-    : mUidMap(uidMap), mPushLog(pushLog) {
+                                     const std::function<void(const ConfigKey&)>& sendBroadcast)
+    : mUidMap(uidMap), mSendBroadcast(sendBroadcast) {
 }
 
 StatsLogProcessor::~StatsLogProcessor() {
@@ -102,12 +102,27 @@
     }
 }
 
-vector<uint8_t> StatsLogProcessor::onDumpReport(const ConfigKey& key) {
+size_t StatsLogProcessor::GetMetricsSize(const ConfigKey& key) {
     auto it = mMetricsManagers.find(key);
     if (it == mMetricsManagers.end()) {
         ALOGW("Config source %s does not exist", key.ToString().c_str());
-        return vector<uint8_t>();
+        return 0;
     }
+    return it->second->byteSize();
+}
+
+void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outData) {
+    auto it = mMetricsManagers.find(key);
+    if (it == mMetricsManagers.end()) {
+        ALOGW("Config source %s does not exist", key.ToString().c_str());
+        return;
+    }
+
+    // This allows another broadcast to be sent within the rate-limit period if we get close to
+    // filling the buffer again soon.
+    mBroadcastTimesMutex.lock();
+    mLastBroadcastTimes.erase(key);
+    mBroadcastTimesMutex.unlock();
 
     ProtoOutputStream proto;
 
@@ -131,17 +146,18 @@
     uidMap.SerializeToArray(&uidMapBuffer[0], uidMapSize);
     proto.write(FIELD_TYPE_MESSAGE | FIELD_ID_UID_MAP, uidMapBuffer, uidMapSize);
 
-    vector<uint8_t> buffer(proto.size());
-    size_t pos = 0;
-    auto iter = proto.data();
-    while (iter.readBuffer() != NULL) {
-        size_t toRead = iter.currentToRead();
-        std::memcpy(&buffer[pos], iter.readBuffer(), toRead);
-        pos += toRead;
-        iter.rp()->move(toRead);
+    if (outData != nullptr) {
+        outData->clear();
+        outData->resize(proto.size());
+        size_t pos = 0;
+        auto iter = proto.data();
+        while (iter.readBuffer() != NULL) {
+            size_t toRead = iter.currentToRead();
+            std::memcpy(&((*outData)[pos]), iter.readBuffer(), toRead);
+            pos += toRead;
+            iter.rp()->move(toRead);
+        }
     }
-
-    return buffer;
 }
 
 void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
@@ -151,42 +167,34 @@
         mMetricsManagers.erase(it);
         mUidMap->OnConfigRemoved(key);
     }
-    auto flushTime = mLastFlushTimes.find(key);
-    if (flushTime != mLastFlushTimes.end()) {
-        mLastFlushTimes.erase(flushTime);
-    }
+
+    std::lock_guard<std::mutex> lock(mBroadcastTimesMutex);
+    mLastBroadcastTimes.erase(key);
 }
 
 void StatsLogProcessor::flushIfNecessary(uint64_t timestampNs,
                                          const ConfigKey& key,
                                          const unique_ptr<MetricsManager>& metricsManager) {
-    auto lastFlushNs = mLastFlushTimes.find(key);
-    if (lastFlushNs != mLastFlushTimes.end()) {
-        if (timestampNs - lastFlushNs->second < kMinFlushPeriod) {
-            return;
-        }
-    }
+    std::lock_guard<std::mutex> lock(mBroadcastTimesMutex);
 
     size_t totalBytes = metricsManager->byteSize();
-    if (totalBytes > kMaxSerializedBytes) {
-        flush();
-        mLastFlushTimes[key] = std::move(timestampNs);
+    if (totalBytes > .9 * kMaxSerializedBytes) { // Send broadcast so that receivers can pull data.
+        auto lastFlushNs = mLastBroadcastTimes.find(key);
+        if (lastFlushNs != mLastBroadcastTimes.end()) {
+            if (timestampNs - lastFlushNs->second < kMinBroadcastPeriod) {
+                return;
+            }
+        }
+        mLastBroadcastTimes[key] = timestampNs;
+        ALOGD("StatsD requesting broadcast for %s", key.ToString().c_str());
+        mSendBroadcast(key);
+    } else if (totalBytes > kMaxSerializedBytes) { // Too late. We need to start clearing data.
+        // We ignore the return value so we force each metric producer to clear its contents.
+        metricsManager->onDumpReport();
+        ALOGD("StatsD had to toss out metrics for %s", key.ToString().c_str());
     }
 }
 
-void StatsLogProcessor::flush() {
-    // TODO: Take ConfigKey as an argument and flush metrics related to the
-    // ConfigKey. Also, create a wrapper that holds a repeated field of
-    // StatsLogReport's.
-    /*
-    StatsLogReport logReport;
-    const int numBytes = logReport.ByteSize();
-    vector<uint8_t> logReportBuffer(numBytes);
-    logReport.SerializeToArray(&logReportBuffer[0], numBytes);
-    mPushLog(logReportBuffer);
-    */
-}
-
 }  // namespace statsd
 }  // namespace os
 }  // namespace android
diff --git a/cmds/statsd/src/StatsLogProcessor.h b/cmds/statsd/src/StatsLogProcessor.h
index f38d715..2091774 100644
--- a/cmds/statsd/src/StatsLogProcessor.h
+++ b/cmds/statsd/src/StatsLogProcessor.h
@@ -33,7 +33,7 @@
 class StatsLogProcessor : public ConfigListener {
 public:
     StatsLogProcessor(const sp<UidMap>& uidMap,
-                      const std::function<void(const vector<uint8_t>&)>& pushLog);
+                      const std::function<void(const ConfigKey&)>& sendBroadcast);
     virtual ~StatsLogProcessor();
 
     virtual void OnLogEvent(const LogEvent& event);
@@ -41,15 +41,16 @@
     void OnConfigUpdated(const ConfigKey& key, const StatsdConfig& config);
     void OnConfigRemoved(const ConfigKey& key);
 
-    vector<uint8_t> onDumpReport(const ConfigKey& key);
+    size_t GetMetricsSize(const ConfigKey& key);
 
-    /* Request a flush through a binder call. */
-    void flush();
+    void onDumpReport(const ConfigKey& key, vector<uint8_t>* outData);
 
 private:
+    mutable mutex mBroadcastTimesMutex;
+
     std::unordered_map<ConfigKey, std::unique_ptr<MetricsManager>> mMetricsManagers;
 
-    std::unordered_map<ConfigKey, long> mLastFlushTimes;
+    std::unordered_map<ConfigKey, long> mLastBroadcastTimes;
 
     sp<UidMap> mUidMap;  // Reference to the UidMap to lookup app name and version for each uid.
 
@@ -60,17 +61,18 @@
      */
     static const size_t kMaxSerializedBytes = 16 * 1024;
 
-    /* Check if the buffer size exceeds the max buffer size when the new entry is added, and flush
-       the logs to callback clients if true. */
+    /* Check if we should send a broadcast if approaching memory limits and if we're over, we
+     * actually delete the data. */
     void flushIfNecessary(uint64_t timestampNs,
                           const ConfigKey& key,
                           const unique_ptr<MetricsManager>& metricsManager);
 
-    std::function<void(const vector<uint8_t>&)> mPushLog;
+    // Function used to send a broadcast so that receiver for the config key can call getData
+    // to retrieve the stored data.
+    std::function<void(const ConfigKey& key)> mSendBroadcast;
 
-    /* Minimum period between two flushes in nanoseconds. Currently set to 10
-     * minutes. */
-    static const unsigned long long kMinFlushPeriod = 600 * NS_PER_SEC;
+    /* Minimum period between two broadcasts in nanoseconds. Currently set to 60 seconds. */
+    static const unsigned long long kMinBroadcastPeriod = 60 * NS_PER_SEC;
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index 8bc776f..779f60f 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -73,8 +73,18 @@
 {
     mUidMap = new UidMap();
     mConfigManager = new ConfigManager();
-    mProcessor = new StatsLogProcessor(mUidMap, [](const vector<uint8_t>& log) {
-        // TODO: Update how we send data out of StatsD.
+    mProcessor = new StatsLogProcessor(mUidMap, [this](const ConfigKey& key) {
+        auto sc = getStatsCompanionService();
+        auto receiver = mConfigManager->GetConfigReceiver(key);
+        if (sc == nullptr) {
+            ALOGD("Could not find StatsCompanionService");
+        } else if (receiver.first.size() == 0) {
+            ALOGD("Statscompanion could not find a broadcast receiver for %s",
+                  key.ToString().c_str());
+        } else {
+            sc->sendBroadcast(String16(receiver.first.c_str()),
+                              String16(receiver.second.c_str()));
+        }
     });
 
     mConfigManager->AddListener(mProcessor);
@@ -206,7 +216,11 @@
         }
 
         if (!args[0].compare(String8("send-broadcast"))) {
-            return cmd_trigger_broadcast(args);
+            return cmd_trigger_broadcast(out, args);
+        }
+
+        if (!args[0].compare(String8("print-stats"))) {
+            return cmd_print_stats(out);
         }
 
         if (!args[0].compare(String8("clear-config"))) {
@@ -259,16 +273,56 @@
     fprintf(out, "  NAME          The name of the configuration\n");
     fprintf(out, "\n");
     fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats send-broadcast PACKAGE CLASS\n");
-    fprintf(out, "  Send a broadcast that triggers one subscriber to fetch metrics.\n");
-    fprintf(out, "  PACKAGE        The name of the package to receive the broadcast.\n");
-    fprintf(out, "  CLASS          The name of the class to receive the broadcast.\n");
+    fprintf(out, "usage: adb shell cmd stats send-broadcast [UID] NAME\n");
+    fprintf(out, "  Send a broadcast that triggers the subscriber to fetch metrics.\n");
+    fprintf(out, "  UID           The uid of the configuration. It is only possible to pass\n");
+    fprintf(out, "                the UID parameter on eng builds. If UID is omitted the\n");
+    fprintf(out, "                calling uid is used.\n");
+    fprintf(out, "  NAME          The name of the configuration\n");
+    fprintf(out, "\n");
+    fprintf(out, "\n");
+    fprintf(out, "usage: adb shell cmd stats print-stats\n");
+    fprintf(out, "  Prints some basic stats.\n");
 }
 
-status_t StatsService::cmd_trigger_broadcast(Vector<String8>& args) {
+status_t StatsService::cmd_trigger_broadcast(FILE* out, Vector<String8>& args) {
+    string name;
+    bool good = false;
+    int uid;
+    const int argCount = args.size();
+    if (argCount == 2) {
+        // Automatically pick the UID
+        uid = IPCThreadState::self()->getCallingUid();
+        // TODO: What if this isn't a binder call? Should we fail?
+        name.assign(args[1].c_str(), args[1].size());
+        good = true;
+    } else if (argCount == 3) {
+        // If it's a userdebug or eng build, then the shell user can
+        // impersonate other uids.
+        if (mEngBuild) {
+            const char* s = args[1].c_str();
+            if (*s != '\0') {
+                char* end = NULL;
+                uid = strtol(s, &end, 0);
+                if (*end == '\0') {
+                    name.assign(args[2].c_str(), args[2].size());
+                    good = true;
+                }
+            }
+        } else {
+            fprintf(out,
+                    "The metrics can only be dumped for other UIDs on eng or userdebug "
+                            "builds.\n");
+        }
+    }
+    if (!good) {
+        print_cmd_help(out);
+        return UNKNOWN_ERROR;
+    }
+    auto receiver = mConfigManager->GetConfigReceiver(ConfigKey(uid, name));
     auto sc = getStatsCompanionService();
-    sc->sendBroadcast(String16(args[1]), String16(args[2]));
-    ALOGD("StatsService::trigger broadcast succeeded");
+    sc->sendBroadcast(String16(receiver.first.c_str()), String16(receiver.second.c_str()));
+    ALOGD("StatsService::trigger broadcast succeeded to %s, %s", args[1].c_str(), args[2].c_str());
     return NO_ERROR;
 }
 
@@ -373,7 +427,8 @@
             }
         }
         if (good) {
-            mProcessor->onDumpReport(ConfigKey(uid, name));
+            vector<uint8_t> data;
+            mProcessor->onDumpReport(ConfigKey(uid, name), &data);
             // TODO: print the returned StatsLogReport to file instead of printing to logcat.
             fprintf(out, "Dump report for Config [%d,%s]\n", uid, name.c_str());
             fprintf(out, "See the StatsLogReport in logcat...\n");
@@ -389,6 +444,15 @@
     }
 }
 
+status_t StatsService::cmd_print_stats(FILE* out) {
+    vector<ConfigKey> configs = mConfigManager->GetAllConfigKeys();
+    for (const ConfigKey& key : configs) {
+        fprintf(out, "Config %s uses %zu bytes\n", key.ToString().c_str(),
+                mProcessor->GetMetricsSize(key));
+    }
+    return NO_ERROR;
+}
+
 status_t StatsService::cmd_print_stats_log(FILE* out, const Vector<String8>& args) {
     long msec = 0;
 
@@ -573,10 +637,14 @@
     mProcessor->OnLogEvent(event);
 }
 
-Status StatsService::getData(const String16& key, vector<uint8_t>* output) {
+Status StatsService::getData(const String16& key, vector <uint8_t>* output) {
     IPCThreadState* ipc = IPCThreadState::self();
+    ALOGD("StatsService::getData with Pid %i, Uid %i", ipc->getCallingPid(),
+          ipc->getCallingUid());
     if (checkCallingPermission(String16(kPermissionDump))) {
-        // TODO: Implement this.
+        string keyStr = string(String8(key).string());
+        ConfigKey configKey(ipc->getCallingUid(), keyStr);
+        mProcessor->onDumpReport(configKey, output);
         return Status::ok();
     } else {
         return Status::fromExceptionCode(binder::Status::EX_SECURITY);
@@ -588,10 +656,9 @@
                                       const String16& package, const String16& cls,
                                       bool* success) {
     IPCThreadState* ipc = IPCThreadState::self();
-    int32_t* uid = reinterpret_cast<int32_t*>(ipc->getCallingUid());
     if (checkCallingPermission(String16(kPermissionDump))) {
         string keyString = string(String8(key).string());
-        ConfigKey configKey(*uid, keyString);
+        ConfigKey configKey(ipc->getCallingUid(), keyString);
         StatsdConfig cfg;
         cfg.ParseFromArray(&config[0], config.size());
         mConfigManager->UpdateConfig(configKey, cfg);
@@ -607,7 +674,8 @@
 Status StatsService::removeConfiguration(const String16& key, bool* success) {
     IPCThreadState* ipc = IPCThreadState::self();
     if (checkCallingPermission(String16(kPermissionDump))) {
-        // TODO: Implement this.
+        string keyStr = string(String8(key).string());
+        mConfigManager->RemoveConfig(ConfigKey(ipc->getCallingUid(), keyStr));
         return Status::ok();
     } else {
         *success = false;
@@ -615,7 +683,7 @@
     }
 }
 
-void StatsService::binderDied(const wp<IBinder>& who) {
+void StatsService::binderDied(const wp <IBinder>& who) {
     for (size_t i = 0; i < mCallbacks.size(); i++) {
         if (IInterface::asBinder(mCallbacks[i]) == who) {
             mCallbacks.removeAt(i);
diff --git a/cmds/statsd/src/StatsService.h b/cmds/statsd/src/StatsService.h
index 0163f94..888f97b 100644
--- a/cmds/statsd/src/StatsService.h
+++ b/cmds/statsd/src/StatsService.h
@@ -124,7 +124,7 @@
     /**
      * Trigger a broadcast.
      */
-    status_t cmd_trigger_broadcast(Vector<String8>& args);
+    status_t cmd_trigger_broadcast(FILE* out, Vector<String8>& args);
 
     /**
      * Handle the config sub-command.
@@ -132,6 +132,11 @@
     status_t cmd_config(FILE* in, FILE* out, FILE* err, Vector<String8>& args);
 
     /**
+     * Prints some basic stats to std out.
+     */
+    status_t cmd_print_stats(FILE* out);
+
+    /**
      * Print the event log.
      */
     status_t cmd_print_stats_log(FILE* out, const Vector<String8>& args);
diff --git a/cmds/statsd/src/config/ConfigManager.cpp b/cmds/statsd/src/config/ConfigManager.cpp
index 3319f6d..9680e4a 100644
--- a/cmds/statsd/src/config/ConfigManager.cpp
+++ b/cmds/statsd/src/config/ConfigManager.cpp
@@ -134,12 +134,34 @@
     }
 }
 
+vector<ConfigKey> ConfigManager::GetAllConfigKeys() {
+    vector<ConfigKey> ret;
+    for (auto it = mConfigs.cbegin(); it != mConfigs.cend(); ++it) {
+        ret.push_back(it->first);
+    }
+    return ret;
+}
+
+const pair<string, string> ConfigManager::GetConfigReceiver(const ConfigKey& key) {
+    auto it = mConfigReceivers.find(key);
+    if (it == mConfigReceivers.end()) {
+        return pair<string,string>();
+    } else {
+        return it->second;
+    }
+}
+
 void ConfigManager::Dump(FILE* out) {
     fprintf(out, "CONFIGURATIONS (%d)\n", (int)mConfigs.size());
     fprintf(out, "     uid name\n");
     for (unordered_map<ConfigKey, StatsdConfig>::const_iterator it = mConfigs.begin();
          it != mConfigs.end(); it++) {
         fprintf(out, "  %6d %s\n", it->first.GetUid(), it->first.GetName().c_str());
+        auto receiverIt = mConfigReceivers.find(it->first);
+        if (receiverIt != mConfigReceivers.end()) {
+            fprintf(out, "    -> received by %s, %s\n", receiverIt->second.first.c_str(),
+                    receiverIt->second.second.c_str());
+        }
         // TODO: Print the contents of the config too.
     }
 }
diff --git a/cmds/statsd/src/config/ConfigManager.h b/cmds/statsd/src/config/ConfigManager.h
index 5b612cc..01d7fb9 100644
--- a/cmds/statsd/src/config/ConfigManager.h
+++ b/cmds/statsd/src/config/ConfigManager.h
@@ -68,6 +68,16 @@
     void SetConfigReceiver(const ConfigKey& key, const string& pkg, const string& cls);
 
     /**
+     * Returns the package name and class name representing the broadcast receiver for this config.
+     */
+    const pair<string, string> GetConfigReceiver(const ConfigKey& key);
+
+    /**
+     * Returns all config keys registered.
+     */
+    vector<ConfigKey> GetAllConfigKeys();
+
+    /**
      * Erase any broadcast receiver associated with this config key.
      */
     void RemoveConfigReceiver(const ConfigKey& key);
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp
index 74ba40b..bd288a1 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp
@@ -121,7 +121,10 @@
     event.ToProto(*mProto);
     mProto->end(eventToken);
     mProto->end(wrapperToken);
-    // TODO: Find a proper way to derive the size of incoming LogEvent.
+
+    // TODO: Increment mByteSize with a real value. Until this feature is working, we assume 50
+    // bytes.
+    mByteSize += 50;
 }
 
 size_t EventMetricProducer::byteSize() {
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index c0930e3..c7982a8 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -61,12 +61,15 @@
 
     // TODO: Pass a timestamp as a parameter in onDumpReport and update all its
     // implementations.
+    // onDumpReport returns the proto-serialized output and clears the previously stored contents.
     virtual std::unique_ptr<std::vector<uint8_t>> onDumpReport() = 0;
 
     virtual bool isConditionSliced() const {
         return mConditionSliced;
     };
 
+    // Returns the memory in bytes currently used to store this metric's data. Does not change
+    // state.
     virtual size_t byteSize() = 0;
 
 protected:
diff --git a/cmds/statsd/src/metrics/MetricsManager.h b/cmds/statsd/src/metrics/MetricsManager.h
index 39c79f9..fb16779 100644
--- a/cmds/statsd/src/metrics/MetricsManager.h
+++ b/cmds/statsd/src/metrics/MetricsManager.h
@@ -46,6 +46,8 @@
     // Config source owner can call onDumpReport() to get all the metrics collected.
     std::vector<std::unique_ptr<std::vector<uint8_t>>> onDumpReport();
 
+    // Computes the total byte size of all metrics managed by a single config source.
+    // Does not change the state.
     size_t byteSize();
 
 private:
