Slice by state in ValueMetricProducer

- Added #onStateChanged logic to ValueMetricProducer
- #onDumpReport fills in slice_by_state values in ValueMetricProducer
- Base information is tracked in a separate data structure and keyed by
dimensions_in_what HashableDimensionKey. The current state key is also
stored in base info so we can close the interval for this state once we
get a new state key.
- Added unit tests for ValueMetricProducer onStateChange

Other changes:
- mCondition parameter was removed from #pullAndMatchEventsLocked and
 #accumulateEvents because it is unused in these functions.
- The event key is initialized in each metric's internal
 onMatchedLogEvent function. This allows ValueMetric to skip events that
 have primary keys that do not match the current state change primary
 key.

Test: bit statsd_test:*
Change-Id: I565c6d251262be57abf10fdef243b8bdc01f3772
diff --git a/cmds/statsd/src/HashableDimensionKey.cpp b/cmds/statsd/src/HashableDimensionKey.cpp
index f9f11b2..109785f 100644
--- a/cmds/statsd/src/HashableDimensionKey.cpp
+++ b/cmds/statsd/src/HashableDimensionKey.cpp
@@ -152,6 +152,10 @@
     return false;
 }
 
+bool HashableDimensionKey::operator!=(const HashableDimensionKey& that) const {
+    return !((*this) == that);
+}
+
 bool HashableDimensionKey::operator==(const HashableDimensionKey& that) const {
     if (mValues.size() != that.getValues().size()) {
         return false;
diff --git a/cmds/statsd/src/HashableDimensionKey.h b/cmds/statsd/src/HashableDimensionKey.h
index b9b86ce..654e135 100644
--- a/cmds/statsd/src/HashableDimensionKey.h
+++ b/cmds/statsd/src/HashableDimensionKey.h
@@ -71,6 +71,8 @@
 
     std::string toString() const;
 
+    bool operator!=(const HashableDimensionKey& that) const;
+
     bool operator==(const HashableDimensionKey& that) const;
 
     bool operator<(const HashableDimensionKey& that) const;
diff --git a/cmds/statsd/src/StatsLogProcessor.h b/cmds/statsd/src/StatsLogProcessor.h
index 68a51ef..c569bc1 100644
--- a/cmds/statsd/src/StatsLogProcessor.h
+++ b/cmds/statsd/src/StatsLogProcessor.h
@@ -284,6 +284,10 @@
     FRIEND_TEST(DurationMetricE2eTest, TestWithCondition);
     FRIEND_TEST(DurationMetricE2eTest, TestWithSlicedCondition);
     FRIEND_TEST(DurationMetricE2eTest, TestWithActivationAndSlicedCondition);
+
+    FRIEND_TEST(ValueMetricE2eTest, TestInitWithSlicedState);
+    FRIEND_TEST(ValueMetricE2eTest, TestInitWithSlicedState_WithDimensions);
+    FRIEND_TEST(ValueMetricE2eTest, TestInitWithSlicedState_WithIncorrectDimensions);
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index c1f95ee..21ffff3 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -277,8 +277,8 @@
 
 void CountMetricProducer::onMatchedLogEventInternalLocked(
         const size_t matcherIndex, const MetricDimensionKey& eventKey,
-        const ConditionKey& conditionKey, bool condition,
-        const LogEvent& event) {
+        const ConditionKey& conditionKey, bool condition, const LogEvent& event,
+        const map<int, HashableDimensionKey>& statePrimaryKeys) {
     int64_t eventTimeNs = event.GetElapsedTimestampNs();
     flushIfNeededLocked(eventTimeNs);
 
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.h b/cmds/statsd/src/metrics/CountMetricProducer.h
index 7b6c7e0..a4711e8 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.h
+++ b/cmds/statsd/src/metrics/CountMetricProducer.h
@@ -59,8 +59,8 @@
 protected:
     void onMatchedLogEventInternalLocked(
             const size_t matcherIndex, const MetricDimensionKey& eventKey,
-            const ConditionKey& conditionKey, bool condition,
-            const LogEvent& event) override;
+            const ConditionKey& conditionKey, bool condition, const LogEvent& event,
+            const std::map<int, HashableDimensionKey>& statePrimaryKeys) override;
 
 private:
 
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index fee5e6e..35c6d37 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -541,8 +541,8 @@
 
 void DurationMetricProducer::onMatchedLogEventInternalLocked(
         const size_t matcherIndex, const MetricDimensionKey& eventKey,
-        const ConditionKey& conditionKeys, bool condition,
-        const LogEvent& event) {
+        const ConditionKey& conditionKeys, bool condition, const LogEvent& event,
+        const map<int, HashableDimensionKey>& statePrimaryKeys) {
     ALOGW("Not used in duration tracker.");
 }
 
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.h b/cmds/statsd/src/metrics/DurationMetricProducer.h
index 7457d7f..45908fb 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.h
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.h
@@ -59,8 +59,8 @@
 
     void onMatchedLogEventInternalLocked(
             const size_t matcherIndex, const MetricDimensionKey& eventKey,
-            const ConditionKey& conditionKeys, bool condition,
-            const LogEvent& event) override;
+            const ConditionKey& conditionKeys, bool condition, const LogEvent& event,
+            const std::map<int, HashableDimensionKey>& statePrimaryKeys) override;
 
 private:
     void handleStartEvent(const MetricDimensionKey& eventKey, const ConditionKey& conditionKeys,
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp
index 32eb077..6833f8d 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp
@@ -143,8 +143,8 @@
 
 void EventMetricProducer::onMatchedLogEventInternalLocked(
         const size_t matcherIndex, const MetricDimensionKey& eventKey,
-        const ConditionKey& conditionKey, bool condition,
-        const LogEvent& event) {
+        const ConditionKey& conditionKey, bool condition, const LogEvent& event,
+        const map<int, HashableDimensionKey>& statePrimaryKeys) {
     if (!condition) {
         return;
     }
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.h b/cmds/statsd/src/metrics/EventMetricProducer.h
index dca37e8..e8f2119 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.h
+++ b/cmds/statsd/src/metrics/EventMetricProducer.h
@@ -47,8 +47,8 @@
 private:
     void onMatchedLogEventInternalLocked(
             const size_t matcherIndex, const MetricDimensionKey& eventKey,
-            const ConditionKey& conditionKey, bool condition,
-            const LogEvent& event) override;
+            const ConditionKey& conditionKey, bool condition, const LogEvent& event,
+            const std::map<int, HashableDimensionKey>& statePrimaryKeys) override;
 
     void onDumpReportLocked(const int64_t dumpTimeNs,
                             const bool include_current_partial_bucket,
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 4ab6ec3..4ab6fd4 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -449,8 +449,8 @@
 
 void GaugeMetricProducer::onMatchedLogEventInternalLocked(
         const size_t matcherIndex, const MetricDimensionKey& eventKey,
-        const ConditionKey& conditionKey, bool condition,
-        const LogEvent& event) {
+        const ConditionKey& conditionKey, bool condition, const LogEvent& event,
+        const map<int, HashableDimensionKey>& statePrimaryKeys) {
     if (condition == false) {
         return;
     }
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 12dcaa4..284bcc5 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -95,8 +95,8 @@
 protected:
     void onMatchedLogEventInternalLocked(
             const size_t matcherIndex, const MetricDimensionKey& eventKey,
-            const ConditionKey& conditionKey, bool condition,
-            const LogEvent& event) override;
+            const ConditionKey& conditionKey, bool condition, const LogEvent& event,
+            const std::map<int, HashableDimensionKey>& statePrimaryKeys) override;
 
 private:
     void onDumpReportLocked(const int64_t dumpTimeNs,
diff --git a/cmds/statsd/src/metrics/MetricProducer.cpp b/cmds/statsd/src/metrics/MetricProducer.cpp
index cf1d2f3..5c29cb3 100644
--- a/cmds/statsd/src/metrics/MetricProducer.cpp
+++ b/cmds/statsd/src/metrics/MetricProducer.cpp
@@ -133,8 +133,8 @@
     HashableDimensionKey dimensionInWhat;
     filterValues(mDimensionsInWhat, event.getValues(), &dimensionInWhat);
     MetricDimensionKey metricKey(dimensionInWhat, stateValuesKey);
-    onMatchedLogEventInternalLocked(
-            matcherIndex, metricKey, conditionKey, condition, event);
+    onMatchedLogEventInternalLocked(matcherIndex, metricKey, conditionKey, condition, event,
+                                    statePrimaryKeys);
 }
 
 bool MetricProducer::evaluateActiveStateLocked(int64_t elapsedTimestampNs) {
@@ -269,6 +269,7 @@
                                          FieldValue* value) {
     if (!StateManager::getInstance().getStateValue(atomId, queryKey, value)) {
         value->mValue = Value(StateTracker::kStateUnknown);
+        value->mField.setTag(atomId);
         ALOGW("StateTracker not found for state atom %d", atomId);
         return;
     }
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index 30675fc..99f0c64 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -330,8 +330,8 @@
      */
     virtual void onMatchedLogEventInternalLocked(
             const size_t matcherIndex, const MetricDimensionKey& eventKey,
-            const ConditionKey& conditionKey, bool condition,
-            const LogEvent& event) = 0;
+            const ConditionKey& conditionKey, bool condition, const LogEvent& event,
+            const map<int, HashableDimensionKey>& statePrimaryKeys) = 0;
 
     // Consume the parsed stats log entry that already matched the "what" of the metric.
     virtual void onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event);
@@ -475,6 +475,10 @@
     FRIEND_TEST(StatsLogProcessorTest,
             TestActivationOnBootMultipleActivationsDifferentActivationTypes);
     FRIEND_TEST(StatsLogProcessorTest, TestActivationsPersistAcrossSystemServerRestart);
+
+    FRIEND_TEST(ValueMetricE2eTest, TestInitWithSlicedState);
+    FRIEND_TEST(ValueMetricE2eTest, TestInitWithSlicedState_WithDimensions);
+    FRIEND_TEST(ValueMetricE2eTest, TestInitWithSlicedState_WithIncorrectDimensions);
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/metrics/MetricsManager.h b/cmds/statsd/src/metrics/MetricsManager.h
index 1fda696..6d20822 100644
--- a/cmds/statsd/src/metrics/MetricsManager.h
+++ b/cmds/statsd/src/metrics/MetricsManager.h
@@ -289,6 +289,10 @@
     FRIEND_TEST(DurationMetricE2eTest, TestWithCondition);
     FRIEND_TEST(DurationMetricE2eTest, TestWithSlicedCondition);
     FRIEND_TEST(DurationMetricE2eTest, TestWithActivationAndSlicedCondition);
+
+    FRIEND_TEST(ValueMetricE2eTest, TestInitWithSlicedState);
+    FRIEND_TEST(ValueMetricE2eTest, TestInitWithSlicedState_WithDimensions);
+    FRIEND_TEST(ValueMetricE2eTest, TestInitWithSlicedState_WithIncorrectDimensions);
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index d8f399f..d2db6e9 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -66,6 +66,7 @@
 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
 const int FIELD_ID_BUCKET_INFO = 3;
 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
+const int FIELD_ID_SLICE_BY_STATE = 6;
 // for ValueBucketInfo
 const int FIELD_ID_VALUE_INDEX = 1;
 const int FIELD_ID_VALUE_LONG = 2;
@@ -146,6 +147,14 @@
         mConditionSliced = true;
     }
 
+    for (const auto& stateLink : metric.state_link()) {
+        Metric2State ms;
+        ms.stateAtomId = stateLink.state_atom_id();
+        translateFieldMatcher(stateLink.fields_in_what(), &ms.metricFields);
+        translateFieldMatcher(stateLink.fields_in_state(), &ms.stateFields);
+        mMetric2StateLinks.push_back(ms);
+    }
+
     int64_t numBucketsForward = calcBucketsForwardCount(startTimeNs);
     mCurrentBucketNum += numBucketsForward;
 
@@ -181,6 +190,33 @@
     }
 }
 
+void ValueMetricProducer::onStateChanged(int64_t eventTimeNs, int32_t atomId,
+                                         const HashableDimensionKey& primaryKey, int oldState,
+                                         int newState) {
+    VLOG("ValueMetric %lld onStateChanged time %lld, State %d, key %s, %d -> %d",
+         (long long)mMetricId, (long long)eventTimeNs, atomId, primaryKey.toString().c_str(),
+         oldState, newState);
+    // If condition is not true, we do not need to pull for this state change.
+    if (mCondition != ConditionState::kTrue) {
+        return;
+    }
+    bool isEventLate = eventTimeNs < mCurrentBucketStartTimeNs;
+    if (isEventLate) {
+        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
+             (long long)mCurrentBucketStartTimeNs);
+        invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
+        return;
+    }
+    mStateChangePrimaryKey.first = atomId;
+    mStateChangePrimaryKey.second = primaryKey;
+    if (mIsPulled) {
+        pullAndMatchEventsLocked(eventTimeNs);
+    }
+    mStateChangePrimaryKey.first = 0;
+    mStateChangePrimaryKey.second = DEFAULT_DIMENSION_KEY;
+    flushIfNeededLocked(eventTimeNs);
+}
+
 void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
                                                            const int64_t eventTime) {
     VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
@@ -281,6 +317,14 @@
                                            FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
         }
 
+        // Then fill slice_by_state.
+        for (auto state : dimensionKey.getStateValuesKey().getValues()) {
+            uint64_t stateToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
+                                                     FIELD_ID_SLICE_BY_STATE);
+            writeStateToProto(state, protoOutput);
+            protoOutput->end(stateToken);
+        }
+
         // Then fill bucket_info (ValueBucketInfo).
         for (const auto& bucket : pair.second) {
             uint64_t bucketInfoToken = protoOutput->start(
@@ -300,7 +344,7 @@
                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS,
                                    (long long)bucket.mConditionTrueNs);
             }
-            for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) {
+            for (int i = 0; i < (int)bucket.valueIndex.size(); i++) {
                 int index = bucket.valueIndex[i];
                 const Value& value = bucket.values[i];
                 uint64_t valueToken = protoOutput->start(
@@ -358,9 +402,10 @@
 }
 
 void ValueMetricProducer::resetBase() {
-    for (auto& slice : mCurrentSlicedBucket) {
-        for (auto& interval : slice.second) {
-            interval.hasBase = false;
+    for (auto& slice : mCurrentBaseInfo) {
+        for (auto& baseInfo : slice.second) {
+            baseInfo.hasBase = false;
+            baseInfo.hasCurrentState = false;
         }
     }
     mHasGlobalBase = false;
@@ -558,14 +603,20 @@
             onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
         }
     }
-    // If the new pulled data does not contains some keys we track in our intervals, we need to
-    // reset the base.
+    // If a key that is:
+    // 1. Tracked in mCurrentSlicedBucket and
+    // 2. A superset of the current mStateChangePrimaryKey
+    // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
+    // then we need to reset the base.
     for (auto& slice : mCurrentSlicedBucket) {
-        bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first)
-                != mMatchedMetricDimensionKeys.end();
-        if (!presentInPulledData) {
-            for (auto& interval : slice.second) {
-                interval.hasBase = false;
+        const auto& whatKey = slice.first.getDimensionKeyInWhat();
+        bool presentInPulledData =
+                mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
+        if (!presentInPulledData && whatKey.contains(mStateChangePrimaryKey.second)) {
+            auto it = mCurrentBaseInfo.find(whatKey);
+            for (auto& baseInfo : it->second) {
+                baseInfo.hasBase = false;
+                baseInfo.hasCurrentState = false;
             }
         }
     }
@@ -674,17 +725,30 @@
     return false;
 }
 
-void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex,
-                                                          const MetricDimensionKey& eventKey,
-                                                          const ConditionKey& conditionKey,
-                                                          bool condition, const LogEvent& event) {
+void ValueMetricProducer::onMatchedLogEventInternalLocked(
+        const size_t matcherIndex, const MetricDimensionKey& eventKey,
+        const ConditionKey& conditionKey, bool condition, const LogEvent& event,
+        const map<int, HashableDimensionKey>& statePrimaryKeys) {
+    auto whatKey = eventKey.getDimensionKeyInWhat();
+    auto stateKey = eventKey.getStateValuesKey();
+
+    // Skip this event if a state changed occurred for a different primary key.
+    auto it = statePrimaryKeys.find(mStateChangePrimaryKey.first);
+    // Check that both the atom id and the primary key are equal.
+    if (it != statePrimaryKeys.end() && it->second != mStateChangePrimaryKey.second) {
+        VLOG("ValueMetric skip event with primary key %s because state change primary key "
+             "is %s",
+             it->second.toString().c_str(), mStateChangePrimaryKey.second.toString().c_str());
+        return;
+    }
+
     int64_t eventTimeNs = event.GetElapsedTimestampNs();
     if (eventTimeNs < mCurrentBucketStartTimeNs) {
         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
              (long long)mCurrentBucketStartTimeNs);
         return;
     }
-    mMatchedMetricDimensionKeys.insert(eventKey);
+    mMatchedMetricDimensionKeys.insert(whatKey);
 
     if (!mIsPulled) {
         // We cannot flush without doing a pull first.
@@ -709,10 +773,26 @@
     if (hitGuardRailLocked(eventKey)) {
         return;
     }
-    vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey];
-    if (multiIntervals.size() < mFieldMatchers.size()) {
+    vector<BaseInfo>& baseInfos = mCurrentBaseInfo[whatKey];
+    if (baseInfos.size() < mFieldMatchers.size()) {
         VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
-        multiIntervals.resize(mFieldMatchers.size());
+        baseInfos.resize(mFieldMatchers.size());
+    }
+
+    for (auto baseInfo : baseInfos) {
+        if (!baseInfo.hasCurrentState) {
+            baseInfo.currentState = DEFAULT_DIMENSION_KEY;
+            baseInfo.hasCurrentState = true;
+        }
+    }
+
+    // We need to get the intervals stored with the previous state key so we can
+    // close these value intervals.
+    const auto oldStateKey = baseInfos[0].currentState;
+    vector<Interval>& intervals = mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)];
+    if (intervals.size() < mFieldMatchers.size()) {
+        VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
+        intervals.resize(mFieldMatchers.size());
     }
 
     // We only use anomaly detection under certain cases.
@@ -725,7 +805,8 @@
 
     for (int i = 0; i < (int)mFieldMatchers.size(); i++) {
         const Matcher& matcher = mFieldMatchers[i];
-        Interval& interval = multiIntervals[i];
+        BaseInfo& baseInfo = baseInfos[i];
+        Interval& interval = intervals[i];
         interval.valueIndex = i;
         Value value;
         if (!getDoubleOrLong(event, matcher, value)) {
@@ -736,60 +817,61 @@
         interval.seenNewData = true;
 
         if (mUseDiff) {
-            if (!interval.hasBase) {
+            if (!baseInfo.hasBase) {
                 if (mHasGlobalBase && mUseZeroDefaultBase) {
                     // The bucket has global base. This key does not.
                     // Optionally use zero as base.
-                    interval.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
-                    interval.hasBase = true;
+                    baseInfo.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
+                    baseInfo.hasBase = true;
                 } else {
                     // no base. just update base and return.
-                    interval.base = value;
-                    interval.hasBase = true;
+                    baseInfo.base = value;
+                    baseInfo.hasBase = true;
                     // If we're missing a base, do not use anomaly detection on incomplete data
                     useAnomalyDetection = false;
-                    // Continue (instead of return) here in order to set interval.base and
-                    // interval.hasBase for other intervals
+                    // Continue (instead of return) here in order to set baseInfo.base and
+                    // baseInfo.hasBase for other baseInfos
                     continue;
                 }
             }
+
             Value diff;
             switch (mValueDirection) {
                 case ValueMetric::INCREASING:
-                    if (value >= interval.base) {
-                        diff = value - interval.base;
+                    if (value >= baseInfo.base) {
+                        diff = value - baseInfo.base;
                     } else if (mUseAbsoluteValueOnReset) {
                         diff = value;
                     } else {
                         VLOG("Unexpected decreasing value");
                         StatsdStats::getInstance().notePullDataError(mPullTagId);
-                        interval.base = value;
+                        baseInfo.base = value;
                         // If we've got bad data, do not use anomaly detection
                         useAnomalyDetection = false;
                         continue;
                     }
                     break;
                 case ValueMetric::DECREASING:
-                    if (interval.base >= value) {
-                        diff = interval.base - value;
+                    if (baseInfo.base >= value) {
+                        diff = baseInfo.base - value;
                     } else if (mUseAbsoluteValueOnReset) {
                         diff = value;
                     } else {
                         VLOG("Unexpected increasing value");
                         StatsdStats::getInstance().notePullDataError(mPullTagId);
-                        interval.base = value;
+                        baseInfo.base = value;
                         // If we've got bad data, do not use anomaly detection
                         useAnomalyDetection = false;
                         continue;
                     }
                     break;
                 case ValueMetric::ANY:
-                    diff = value - interval.base;
+                    diff = value - baseInfo.base;
                     break;
                 default:
                     break;
             }
-            interval.base = value;
+            baseInfo.base = value;
             value = diff;
         }
 
@@ -814,12 +896,13 @@
             interval.hasValue = true;
         }
         interval.sampleSize += 1;
+        baseInfo.currentState = stateKey;
     }
 
     // Only trigger the tracker if all intervals are correct
     if (useAnomalyDetection) {
         // TODO: propgate proper values down stream when anomaly support doubles
-        long wholeBucketVal = multiIntervals[0].value.long_value;
+        long wholeBucketVal = intervals[0].value.long_value;
         auto prev = mCurrentFullBucket.find(eventKey);
         if (prev != mCurrentFullBucket.end()) {
             wholeBucketVal += prev->second;
@@ -953,6 +1036,7 @@
         } else {
             it++;
         }
+        // TODO: remove mCurrentBaseInfo entries when obsolete
     }
 
     mCurrentBucketIsInvalid = false;
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 4eae99b..19fb694 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -83,11 +83,14 @@
         flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
     };
 
+    void onStateChanged(int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey,
+                        int oldState, int newState) override;
+
 protected:
     void onMatchedLogEventInternalLocked(
             const size_t matcherIndex, const MetricDimensionKey& eventKey,
-            const ConditionKey& conditionKey, bool condition,
-            const LogEvent& event) override;
+            const ConditionKey& conditionKey, bool condition, const LogEvent& event,
+            const std::map<int, HashableDimensionKey>& statePrimaryKeys) override;
 
 private:
     void onDumpReportLocked(const int64_t dumpTimeNs,
@@ -144,7 +147,10 @@
     std::vector<Matcher> mFieldMatchers;
 
     // Value fields for matching.
-    std::set<MetricDimensionKey> mMatchedMetricDimensionKeys;
+    std::set<HashableDimensionKey> mMatchedMetricDimensionKeys;
+
+    // Holds the atom id, primary key pair from a state change.
+    pair<int32_t, HashableDimensionKey> mStateChangePrimaryKey;
 
     // tagId for pulled data. -1 if this is not pulled
     const int mPullTagId;
@@ -156,10 +162,6 @@
     typedef struct {
         // Index in multi value aggregation.
         int valueIndex;
-        // Holds current base value of the dimension. Take diff and update if necessary.
-        Value base;
-        // Whether there is a base to diff to.
-        bool hasBase;
         // Current value, depending on the aggregation type.
         Value value;
         // Number of samples collected.
@@ -171,8 +173,21 @@
         bool seenNewData = false;
     } Interval;
 
+    typedef struct {
+        // Holds current base value of the dimension. Take diff and update if necessary.
+        Value base;
+        // Whether there is a base to diff to.
+        bool hasBase;
+        // Last seen state value(s).
+        HashableDimensionKey currentState;
+        // Whether this dimensions in what key has a current state key.
+        bool hasCurrentState;
+    } BaseInfo;
+
     std::unordered_map<MetricDimensionKey, std::vector<Interval>> mCurrentSlicedBucket;
 
+    std::unordered_map<HashableDimensionKey, std::vector<BaseInfo>> mCurrentBaseInfo;
+
     std::unordered_map<MetricDimensionKey, int64_t> mCurrentFullBucket;
 
     // Save the past buckets and we can clear when the StatsLogReport is dumped.
@@ -285,6 +300,9 @@
     FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
     FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput);
     FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutputMultiValue);
+    FRIEND_TEST(ValueMetricProducerTest, TestSlicedState);
+    FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithMap);
+    FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithPrimaryField_WithDimensions);
     FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
     FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
     FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
diff --git a/cmds/statsd/src/metrics/metrics_manager_util.cpp b/cmds/statsd/src/metrics/metrics_manager_util.cpp
index 2ad8217..73c1212 100644
--- a/cmds/statsd/src/metrics/metrics_manager_util.cpp
+++ b/cmds/statsd/src/metrics/metrics_manager_util.cpp
@@ -18,11 +18,12 @@
 #include "Log.h"
 
 #include "metrics_manager_util.h"
-#include "MetricProducer.h"
 
 #include <inttypes.h>
 
 #include "atoms_info.h"
+#include "FieldValue.h"
+#include "MetricProducer.h"
 #include "condition/CombinationConditionTracker.h"
 #include "condition/SimpleConditionTracker.h"
 #include "condition/StateConditionTracker.h"
@@ -173,6 +174,14 @@
     return true;
 }
 
+bool handleMetricWithStateLink(const FieldMatcher& stateMatcher,
+                               const vector<Matcher>& dimensionsInWhat) {
+    vector<Matcher> stateMatchers;
+    translateFieldMatcher(stateMatcher, &stateMatchers);
+
+    return subsetDimensions(stateMatchers, dimensionsInWhat);
+}
+
 // Validates a metricActivation and populates state.
 // EventActivationMap and EventDeactivationMap are supplied to a MetricProducer
 //      to provide the producer with state about its activators and deactivators.
@@ -669,18 +678,41 @@
             }
         }
 
+        std::vector<int> slicedStateAtoms;
+        unordered_map<int, unordered_map<int, int64_t>> stateGroupMap;
+        if (metric.slice_by_state_size() > 0) {
+            if (!handleMetricWithStates(config, metric.slice_by_state(), stateAtomIdMap,
+                                        allStateGroupMaps, slicedStateAtoms, stateGroupMap)) {
+                return false;
+            }
+        } else {
+            if (metric.state_link_size() > 0) {
+                ALOGW("ValueMetric has a MetricStateLink but doesn't have a sliced state");
+                return false;
+            }
+        }
+
+        // Check that all metric state links are a subset of dimensions_in_what fields.
+        std::vector<Matcher> dimensionsInWhat;
+        translateFieldMatcher(metric.dimensions_in_what(), &dimensionsInWhat);
+        for (const auto& stateLink : metric.state_link()) {
+            if (!handleMetricWithStateLink(stateLink.fields_in_what(), dimensionsInWhat)) {
+                return false;
+            }
+        }
+
         unordered_map<int, shared_ptr<Activation>> eventActivationMap;
         unordered_map<int, vector<shared_ptr<Activation>>> eventDeactivationMap;
-        bool success = handleMetricActivation(config, metric.id(), metricIndex,
-                metricToActivationMap, logTrackerMap, activationAtomTrackerToMetricMap,
-                deactivationAtomTrackerToMetricMap, metricsWithActivation, eventActivationMap,
-                eventDeactivationMap);
+        bool success = handleMetricActivation(
+                config, metric.id(), metricIndex, metricToActivationMap, logTrackerMap,
+                activationAtomTrackerToMetricMap, deactivationAtomTrackerToMetricMap,
+                metricsWithActivation, eventActivationMap, eventDeactivationMap);
         if (!success) return false;
 
         sp<MetricProducer> valueProducer = new ValueMetricProducer(
                 key, metric, conditionIndex, wizard, trackerIndex, matcherWizard, pullTagId,
                 timeBaseTimeNs, currentTimeNs, pullerManager, eventActivationMap,
-                eventDeactivationMap);
+                eventDeactivationMap, slicedStateAtoms, stateGroupMap);
         allMetricProducers.push_back(valueProducer);
     }
 
diff --git a/cmds/statsd/src/state/StateManager.cpp b/cmds/statsd/src/state/StateManager.cpp
index 80d3983..ea776fa 100644
--- a/cmds/statsd/src/state/StateManager.cpp
+++ b/cmds/statsd/src/state/StateManager.cpp
@@ -28,13 +28,17 @@
     return sStateManager;
 }
 
+void StateManager::clear() {
+    mStateTrackers.clear();
+}
+
 void StateManager::onLogEvent(const LogEvent& event) {
     if (mStateTrackers.find(event.GetTagId()) != mStateTrackers.end()) {
         mStateTrackers[event.GetTagId()]->onLogEvent(event);
     }
 }
 
-bool StateManager::registerListener(int32_t atomId, wp<StateListener> listener) {
+bool StateManager::registerListener(const int32_t atomId, wp<StateListener> listener) {
     // Check if state tracker already exists.
     if (mStateTrackers.find(atomId) == mStateTrackers.end()) {
         // Create a new state tracker iff atom is a state atom.
@@ -50,7 +54,7 @@
     return true;
 }
 
-void StateManager::unregisterListener(int32_t atomId, wp<StateListener> listener) {
+void StateManager::unregisterListener(const int32_t atomId, wp<StateListener> listener) {
     std::unique_lock<std::mutex> lock(mMutex);
 
     // Hold the sp<> until the lock is released so that ~StateTracker() is
@@ -74,7 +78,7 @@
     lock.unlock();
 }
 
-bool StateManager::getStateValue(int32_t atomId, const HashableDimensionKey& key,
+bool StateManager::getStateValue(const int32_t atomId, const HashableDimensionKey& key,
                                  FieldValue* output) const {
     auto it = mStateTrackers.find(atomId);
     if (it != mStateTrackers.end()) {
diff --git a/cmds/statsd/src/state/StateManager.h b/cmds/statsd/src/state/StateManager.h
index a6053e6..8bc2461 100644
--- a/cmds/statsd/src/state/StateManager.h
+++ b/cmds/statsd/src/state/StateManager.h
@@ -40,30 +40,33 @@
     // Returns a pointer to the single, shared StateManager object.
     static StateManager& getInstance();
 
+    // Unregisters all listeners and removes all trackers from StateManager.
+    void clear();
+
     // Notifies the correct StateTracker of an event.
     void onLogEvent(const LogEvent& event);
 
     // Returns true if atomId is being tracked and is associated with a state
     // atom. StateManager notifies the correct StateTracker to register listener.
     // If the correct StateTracker does not exist, a new StateTracker is created.
-    bool registerListener(int32_t atomId, wp<StateListener> listener);
+    bool registerListener(const int32_t atomId, wp<StateListener> listener);
 
     // Notifies the correct StateTracker to unregister a listener
     // and removes the tracker if it no longer has any listeners.
-    void unregisterListener(int32_t atomId, wp<StateListener> listener);
+    void unregisterListener(const int32_t atomId, wp<StateListener> listener);
 
     // Returns true if the StateTracker exists and queries for the
     // original state value mapped to the given query key. The state value is
     // stored and output in a FieldValue class.
     // Returns false if the StateTracker doesn't exist.
-    bool getStateValue(int32_t atomId, const HashableDimensionKey& queryKey,
+    bool getStateValue(const int32_t atomId, const HashableDimensionKey& queryKey,
                        FieldValue* output) const;
 
     inline int getStateTrackersCount() const {
         return mStateTrackers.size();
     }
 
-    inline int getListenersCount(int32_t atomId) const {
+    inline int getListenersCount(const int32_t atomId) const {
         auto it = mStateTrackers.find(atomId);
         if (it != mStateTrackers.end()) {
             return it->second->getListenersCount();
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index 8b4d781..c45274e 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -147,12 +147,14 @@
 message ValueMetricData {
   optional DimensionsValue dimensions_in_what = 1;
 
-  optional DimensionsValue dimensions_in_condition = 2 [deprecated = true];
+  repeated StateValue slice_by_state = 6;
 
   repeated ValueBucketInfo bucket_info = 3;
 
   repeated DimensionsValue dimension_leaf_values_in_what = 4;
 
+  optional DimensionsValue dimensions_in_condition = 2 [deprecated = true];
+
   repeated DimensionsValue dimension_leaf_values_in_condition = 5 [deprecated = true];
 }
 
diff --git a/cmds/statsd/src/statsd_config.proto b/cmds/statsd/src/statsd_config.proto
index a22805b..736aa9b 100644
--- a/cmds/statsd/src/statsd_config.proto
+++ b/cmds/statsd/src/statsd_config.proto
@@ -290,12 +290,14 @@
 
   optional FieldMatcher dimensions_in_what = 5;
 
-  optional FieldMatcher dimensions_in_condition = 9 [deprecated = true];
+  repeated int64 slice_by_state = 18;
 
   optional TimeUnit bucket = 6;
 
   repeated MetricConditionLink links = 7;
 
+  repeated MetricStateLink state_link = 19;
+
   enum AggregationType {
     SUM = 1;
     MIN = 2;
@@ -325,6 +327,8 @@
   optional int32 max_pull_delay_sec = 16 [default = 10];
 
   optional bool split_bucket_for_app_upgrade = 17 [default = true];
+
+  optional FieldMatcher dimensions_in_condition = 9 [deprecated = true];
 }
 
 message Alert {