ValueMetric overhaul
This is savaged from the large cl.
1. Simplify the logic in ValueMetricProducer.
1.1 for pull data on bucket boundary, we pull on bucket end, instead of
playing with timestamp twice.
1.2 for data that require diffing, we keep a rolling diff base that gets
updated.
1.3 Now we check condition in onMatchedLogEventInternalLocked for pushed atoms. For pulled atoms, check before commit time. This was very error prone in P and caused multiple bugs. It is much simpler now.
2. Treat pushed and pulled atoms the same way and share the same
aggregation types.
4. Allow decreasing values for diffing.
5. Allow diffing for pushed atoms.
6. For diff based aggregation, if the diff value is zero, we skip
output.
Bug: 117224984
Bug: 115683963
Bug: 117975256
Bug: 113268259
Test: unit test
Change-Id: I6ee306e9f6e5a166b392c443594704e7d2792ef5
diff --git a/cmds/statsd/src/FieldValue.cpp b/cmds/statsd/src/FieldValue.cpp
index fc1a61c..80ed807 100644
--- a/cmds/statsd/src/FieldValue.cpp
+++ b/cmds/statsd/src/FieldValue.cpp
@@ -18,6 +18,7 @@
#include "Log.h"
#include "FieldValue.h"
#include "HashableDimensionKey.h"
+#include "math.h"
namespace android {
namespace os {
@@ -174,6 +175,25 @@
}
}
+bool Value::isZero() const {
+ switch (type) {
+ case INT:
+ return int_value == 0;
+ case LONG:
+ return long_value == 0;
+ case FLOAT:
+ return fabs(float_value) <= std::numeric_limits<float>::epsilon();
+ case DOUBLE:
+ return fabs(double_value) <= std::numeric_limits<double>::epsilon();
+ case STRING:
+ return str_value.size() == 0;
+ case STORAGE:
+ return storage_value.size() == 0;
+ default:
+ return false;
+ }
+}
+
bool Value::operator==(const Value& that) const {
if (type != that.getType()) return false;
diff --git a/cmds/statsd/src/FieldValue.h b/cmds/statsd/src/FieldValue.h
index 77163f9..a5d00ac 100644
--- a/cmds/statsd/src/FieldValue.h
+++ b/cmds/statsd/src/FieldValue.h
@@ -331,6 +331,8 @@
std::string toString() const;
+ bool isZero() const;
+
Type getType() const {
return type;
}
diff --git a/cmds/statsd/src/guardrail/StatsdStats.cpp b/cmds/statsd/src/guardrail/StatsdStats.cpp
index d2919c5..a0d77d6 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.cpp
+++ b/cmds/statsd/src/guardrail/StatsdStats.cpp
@@ -362,11 +362,17 @@
lock_guard<std::mutex> lock(mLock);
auto& pullStats = mPulledAtomStats[pullAtomId];
pullStats.maxPullDelayNs = std::max(pullStats.maxPullDelayNs, pullDelayNs);
- pullStats.avgPullDelayNs = (pullStats.avgPullDelayNs * pullStats.numPullDelay + pullDelayNs) /
- (pullStats.numPullDelay + 1);
+ pullStats.avgPullDelayNs =
+ (pullStats.avgPullDelayNs * pullStats.numPullDelay + pullDelayNs) /
+ (pullStats.numPullDelay + 1);
pullStats.numPullDelay += 1;
}
+void StatsdStats::notePullDataError(int pullAtomId) {
+ lock_guard<std::mutex> lock(mLock);
+ mPulledAtomStats[pullAtomId].dataError++;
+}
+
void StatsdStats::noteAtomLogged(int atomId, int32_t timeSec) {
lock_guard<std::mutex> lock(mLock);
@@ -422,6 +428,7 @@
pullStats.second.avgPullDelayNs = 0;
pullStats.second.maxPullDelayNs = 0;
pullStats.second.numPullDelay = 0;
+ pullStats.second.dataError = 0;
}
}
@@ -530,11 +537,11 @@
dprintf(out,
"Atom %d->(total pull)%ld, (pull from cache)%ld, (min pull interval)%ld, (average "
"pull time nanos)%lld, (max pull time nanos)%lld, (average pull delay nanos)%lld, "
- "(max pull delay nanos)%lld\n",
+ "(max pull delay nanos)%lld, (data error)%ld\n",
(int)pair.first, (long)pair.second.totalPull, (long)pair.second.totalPullFromCache,
(long)pair.second.minPullIntervalSec, (long long)pair.second.avgPullTimeNs,
(long long)pair.second.maxPullTimeNs, (long long)pair.second.avgPullDelayNs,
- (long long)pair.second.maxPullDelayNs);
+ (long long)pair.second.maxPullDelayNs, pair.second.dataError);
}
if (mAnomalyAlarmRegisteredStats > 0) {
diff --git a/cmds/statsd/src/guardrail/StatsdStats.h b/cmds/statsd/src/guardrail/StatsdStats.h
index 777d865..2008abd 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.h
+++ b/cmds/statsd/src/guardrail/StatsdStats.h
@@ -279,6 +279,11 @@
void notePullFromCache(int pullAtomId);
/*
+ * Notify data error for pulled atom.
+ */
+ void notePullDataError(int pullAtomId);
+
+ /*
* Records time for actual pulling, not including those served from cache and not including
* statsd processing delays.
*/
@@ -329,6 +334,7 @@
int64_t avgPullDelayNs = 0;
int64_t maxPullDelayNs = 0;
long numPullDelay = 0;
+ long dataError = 0;
} PulledAtomStats;
private:
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 6367479..c8b1cf0 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -92,7 +92,9 @@
: StatsdStats::kDimensionKeySizeHardLimit),
mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
mAggregationType(metric.aggregation_type()),
- mValueType(metric.aggregation_type() == ValueMetric::AVG ? DOUBLE : LONG) {
+ mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)),
+ mValueDirection(metric.value_direction()),
+ mSkipZeroDiffOutput(metric.skip_zero_diff_output()) {
int64_t bucketSizeMills = 0;
if (metric.has_bucket()) {
bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
@@ -125,24 +127,25 @@
}
mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
- HasPositionALL(metric.dimensions_in_condition());
+ HasPositionALL(metric.dimensions_in_condition());
flushIfNeededLocked(startTimeNs);
- // Kicks off the puller immediately.
+
if (mIsPulled) {
mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(),
mBucketSizeNs);
}
- // TODO: Only do this for partial buckets like first bucket. All other buckets should use
+ // Only do this for partial buckets like first bucket. All other buckets should use
// flushIfNeeded to adjust start and end to bucket boundaries.
// Adjust start for partial bucket
mCurrentBucketStartTimeNs = startTimeNs;
- if (mIsPulled) {
+ // Kicks off the puller immediately if condition is true and diff based.
+ if (mIsPulled && mCondition && mUseDiff) {
pullLocked(startTimeNs);
}
- VLOG("value metric %lld created. bucket size %lld start_time: %lld",
- (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs);
+ VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
+ (long long)mBucketSizeNs, (long long)mTimeBaseNs);
}
ValueMetricProducer::~ValueMetricProducer() {
@@ -188,14 +191,14 @@
// Fills the dimension path if not slicing by ALL.
if (!mSliceByPositionALL) {
if (!mDimensionsInWhat.empty()) {
- uint64_t dimenPathToken = protoOutput->start(
- FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
+ uint64_t dimenPathToken =
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
protoOutput->end(dimenPathToken);
}
if (!mDimensionsInCondition.empty()) {
- uint64_t dimenPathToken = protoOutput->start(
- FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION);
+ uint64_t dimenPathToken =
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION);
writeDimensionPathToProto(mDimensionsInCondition, protoOutput);
protoOutput->end(dimenPathToken);
}
@@ -221,15 +224,15 @@
// First fill dimension.
if (mSliceByPositionALL) {
- uint64_t dimensionToken = protoOutput->start(
- FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
+ uint64_t dimensionToken =
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput);
protoOutput->end(dimensionToken);
if (dimensionKey.hasDimensionKeyInCondition()) {
- uint64_t dimensionInConditionToken = protoOutput->start(
- FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
- writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(),
- str_set, protoOutput);
+ uint64_t dimensionInConditionToken =
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
+ writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set,
+ protoOutput);
protoOutput->end(dimensionInConditionToken);
}
} else {
@@ -237,8 +240,8 @@
FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
if (dimensionKey.hasDimensionKeyInCondition()) {
writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(),
- FIELD_ID_DIMENSION_LEAF_IN_CONDITION,
- str_set, protoOutput);
+ FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set,
+ protoOutput);
}
}
@@ -256,15 +259,20 @@
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
(long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
}
- if (mValueType == LONG) {
+ if (bucket.value.getType() == LONG) {
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
- (long long)bucket.mValueLong);
+ (long long)bucket.value.long_value);
+ VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
+ (long long)bucket.mBucketEndNs, (long long)bucket.value.long_value);
+ } else if (bucket.value.getType() == DOUBLE) {
+ protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE,
+ bucket.value.double_value);
+ VLOG("\t bucket [%lld - %lld] count: %.2f", (long long)bucket.mBucketStartNs,
+ (long long)bucket.mBucketEndNs, bucket.value.double_value);
} else {
- protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, bucket.mValueDouble);
+ VLOG("Wrong value type for ValueMetric output: %d", bucket.value.getType());
}
protoOutput->end(bucketInfoToken);
- VLOG("\t bucket [%lld - %lld] count: %lld, %.2f", (long long)bucket.mBucketStartNs,
- (long long)bucket.mBucketEndNs, (long long)bucket.mValueLong, bucket.mValueDouble);
}
protoOutput->end(wrapperToken);
}
@@ -279,8 +287,6 @@
void ValueMetricProducer::onConditionChangedLocked(const bool condition,
const int64_t eventTimeNs) {
- mCondition = condition;
-
if (eventTimeNs < mCurrentBucketStartTimeNs) {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
(long long)mCurrentBucketStartTimeNs);
@@ -289,9 +295,19 @@
flushIfNeededLocked(eventTimeNs);
- if (mIsPulled) {
+ // Pull on condition changes.
+ if (mIsPulled && (mCondition != condition)) {
pullLocked(eventTimeNs);
}
+
+ // when condition change from true to false, clear diff base
+ if (mUseDiff && mCondition && !condition) {
+ for (auto& slice : mCurrentSlicedBucket) {
+ slice.second.hasBase = false;
+ }
+ }
+
+ mCondition = condition;
}
void ValueMetricProducer::pullLocked(const int64_t timestampNs) {
@@ -306,30 +322,33 @@
}
}
+int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
+ return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
+}
+
void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
std::lock_guard<std::mutex> lock(mMutex);
- if (mCondition == true || mConditionTrackerIndex < 0) {
+ if (mCondition) {
if (allData.size() == 0) {
return;
}
// For scheduled pulled data, the effective event time is snap to the nearest
- // bucket boundary to make bucket finalize.
+ // bucket end. In the case of waking up from a deep sleep state, we will
+ // attribute to the previous bucket end. If the sleep was long but not very long, we
+ // will be in the immediate next bucket. Previous bucket may get a larger number as
+ // we pull at a later time than real bucket end.
+ // If the sleep was very long, we skip more than one bucket before sleep. In this case,
+ // if the diff base will be cleared and this new data will serve as new diff base.
int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs();
- int64_t eventTime = mTimeBaseNs +
- ((realEventTime - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
-
- // close the end of the bucket
- mCondition = false;
- for (const auto& data : allData) {
- data->setElapsedTimestampNs(eventTime - 1);
- onMatchedLogEventLocked(0, *data);
+ int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1;
+ if (bucketEndTime < mCurrentBucketStartTimeNs) {
+ VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime,
+ (long long)mCurrentBucketStartTimeNs);
+ return;
}
-
- // start a new bucket
- mCondition = true;
for (const auto& data : allData) {
- data->setElapsedTimestampNs(eventTime);
+ data->setElapsedTimestampNs(bucketEndTime);
onMatchedLogEventLocked(0, *data);
}
}
@@ -363,8 +382,8 @@
StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
// 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
if (newTupleCount > mDimensionHardLimit) {
- ALOGE("ValueMetric %lld dropping data for dimension key %s",
- (long long)mMetricId, newKey.toString().c_str());
+ ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId,
+ newKey.toString().c_str());
return true;
}
}
@@ -393,10 +412,10 @@
return v;
}
-void ValueMetricProducer::onMatchedLogEventInternalLocked(
- const size_t matcherIndex, const MetricDimensionKey& eventKey,
- const ConditionKey& conditionKey, bool condition,
- const LogEvent& event) {
+void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex,
+ const MetricDimensionKey& eventKey,
+ const ConditionKey& conditionKey,
+ bool condition, const LogEvent& event) {
int64_t eventTimeNs = event.GetElapsedTimestampNs();
if (eventTimeNs < mCurrentBucketStartTimeNs) {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
@@ -406,6 +425,14 @@
flushIfNeededLocked(eventTimeNs);
+ // For pulled data, we already check condition when we decide to pull or
+ // in onDataPulled. So take all of them.
+ // For pushed data, just check condition.
+ if (!(mIsPulled || condition)) {
+ VLOG("ValueMetric skip event because condition is false");
+ return;
+ }
+
if (hitGuardRailLocked(eventKey)) {
return;
}
@@ -418,71 +445,70 @@
}
Value value = getDoubleOrLong(event.getValues()[mField - 1].mValue);
- Value diff;
- bool hasDiff = false;
- if (mIsPulled) {
- // Always require condition for pulled events. In the case of no condition, only pull
- // on bucket boundaries, in which we fake condition changes.
- if (mCondition == true) {
- if (!interval.startUpdated) {
- interval.start = value;
- interval.startUpdated = true;
- } else {
- // Skip it if there is already value recorded for the start. Happens when puller
- // takes too long to finish. In this case we take the previous value.
- VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str());
- }
- } else {
- // Generally we expect value to be monotonically increasing.
- // If not, take absolute value or drop it, based on config.
- if (interval.startUpdated) {
- if (value >= interval.start) {
- diff = (value - interval.start);
- hasDiff = true;
+ if (mUseDiff) {
+ // no base. just update base and return.
+ if (!interval.hasBase) {
+ interval.base = value;
+ interval.hasBase = true;
+ return;
+ }
+ Value diff;
+ switch (mValueDirection) {
+ case ValueMetric::INCREASING:
+ if (value >= interval.base) {
+ diff = value - interval.base;
+ } else if (mUseAbsoluteValueOnReset) {
+ diff = value;
} else {
- if (mUseAbsoluteValueOnReset) {
- diff = value;
- hasDiff = true;
- } else {
- VLOG("Dropping data for atom %d, prev: %s, now: %s", mPullTagId,
- interval.start.toString().c_str(), value.toString().c_str());
- }
+ VLOG("Unexpected decreasing value");
+ StatsdStats::getInstance().notePullDataError(mPullTagId);
+ interval.base = value;
+ return;
}
- interval.startUpdated = false;
- } else {
- VLOG("No start for matching end %s", value.toString().c_str());
- }
+ break;
+ case ValueMetric::DECREASING:
+ if (interval.base >= value) {
+ diff = interval.base - value;
+ } else if (mUseAbsoluteValueOnReset) {
+ diff = value;
+ } else {
+ VLOG("Unexpected increasing value");
+ StatsdStats::getInstance().notePullDataError(mPullTagId);
+ interval.base = value;
+ return;
+ }
+ break;
+ case ValueMetric::ANY:
+ diff = value - interval.base;
+ break;
+ default:
+ break;
+ }
+ interval.base = value;
+ value = diff;
+ }
+
+ if (interval.hasValue) {
+ switch (mAggregationType) {
+ case ValueMetric::SUM:
+ // for AVG, we add up and take average when flushing the bucket
+ case ValueMetric::AVG:
+ interval.value += value;
+ break;
+ case ValueMetric::MIN:
+ interval.value = std::min(value, interval.value);
+ break;
+ case ValueMetric::MAX:
+ interval.value = std::max(value, interval.value);
+ break;
+ default:
+ break;
}
} else {
- // for pushed events, only aggregate when sliced condition is true
- if (condition == true || mConditionTrackerIndex < 0) {
- diff = value;
- hasDiff = true;
- }
+ interval.value = value;
+ interval.hasValue = true;
}
- if (hasDiff) {
- if (interval.hasValue) {
- switch (mAggregationType) {
- case ValueMetric::SUM:
- // for AVG, we add up and take average when flushing the bucket
- case ValueMetric::AVG:
- interval.value += diff;
- break;
- case ValueMetric::MIN:
- interval.value = diff < interval.value ? diff : interval.value;
- break;
- case ValueMetric::MAX:
- interval.value = diff > interval.value ? diff : interval.value;
- break;
- default:
- break;
- }
- } else {
- interval.value = diff;
- interval.hasValue = true;
- }
- interval.sampleSize += 1;
- }
+ interval.sampleSize += 1;
// TODO: propgate proper values down stream when anomaly support doubles
long wholeBucketVal = interval.value.long_value;
@@ -512,6 +538,10 @@
if (numBucketsForward > 1) {
VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
+ // take base again in future good bucket.
+ for (auto& slice : mCurrentSlicedBucket) {
+ slice.second.hasBase = false;
+ }
}
VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
(long long)mCurrentBucketStartTimeNs);
@@ -534,8 +564,18 @@
// The current bucket is large enough to keep.
for (const auto& slice : mCurrentSlicedBucket) {
if (slice.second.hasValue) {
- info.mValueLong = slice.second.value.long_value;
- info.mValueDouble = (double)slice.second.value.long_value / slice.second.sampleSize;
+ // skip the output if the diff is zero
+ if (mSkipZeroDiffOutput && mUseDiff && slice.second.value.isZero()) {
+ continue;
+ }
+ if (mAggregationType != ValueMetric::AVG) {
+ info.value = slice.second.value;
+ } else {
+ double sum = slice.second.value.type == LONG
+ ? (double)slice.second.value.long_value
+ : slice.second.value.double_value;
+ info.value.setDouble(sum / slice.second.sampleSize);
+ }
// it will auto create new vector of ValuebucketInfo if the key is not found.
auto& bucketList = mPastBuckets[slice.first];
bucketList.push_back(info);
@@ -581,7 +621,10 @@
}
// Reset counters
- mCurrentSlicedBucket.clear();
+ for (auto& slice : mCurrentSlicedBucket) {
+ slice.second.hasValue = false;
+ slice.second.sampleSize = 0;
+ }
}
size_t ValueMetricProducer::byteSizeLocked() const {
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 8db2d95..3416afe 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -34,8 +34,7 @@
struct ValueBucket {
int64_t mBucketStartNs;
int64_t mBucketEndNs;
- int64_t mValueLong;
- double mValueDouble;
+ Value value;
};
class ValueMetricProducer : public virtual MetricProducer, public virtual PullDataReceiver {
@@ -54,35 +53,11 @@
void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
const int64_t version) override {
std::lock_guard<std::mutex> lock(mMutex);
-
- if (mIsPulled && (mCondition == true || mConditionTrackerIndex < 0)) {
- vector<shared_ptr<LogEvent>> allData;
- mPullerManager->Pull(mPullTagId, eventTimeNs, &allData);
- if (allData.size() == 0) {
- // This shouldn't happen since this valuemetric is not useful now.
- }
-
- // Pretend the pulled data occurs right before the app upgrade event.
- mCondition = false;
- for (const auto& data : allData) {
- data->setElapsedTimestampNs(eventTimeNs - 1);
- onMatchedLogEventLocked(0, *data);
- }
-
- flushCurrentBucketLocked(eventTimeNs);
- mCurrentBucketStartTimeNs = eventTimeNs;
-
- mCondition = true;
- for (const auto& data : allData) {
- data->setElapsedTimestampNs(eventTimeNs);
- onMatchedLogEventLocked(0, *data);
- }
- } else {
- // For pushed value metric or pulled metric where condition is not true,
- // we simply flush and reset the current bucket start.
- flushCurrentBucketLocked(eventTimeNs);
- mCurrentBucketStartTimeNs = eventTimeNs;
+ if (mIsPulled && mCondition) {
+ pullLocked(eventTimeNs - 1);
}
+ flushCurrentBucketLocked(eventTimeNs);
+ mCurrentBucketStartTimeNs = eventTimeNs;
};
protected:
@@ -117,6 +92,9 @@
void dropDataLocked(const int64_t dropTimeNs) override;
+ // Calculate previous bucket end time based on current time.
+ int64_t calcPreviousBucketEndTime(const int64_t currentTimeNs);
+
sp<StatsPullerManager> mPullerManager;
const FieldMatcher mValueField;
@@ -131,11 +109,10 @@
// internal state of a bucket.
typedef struct {
- // Pulled data always come in pair of <start, end>. This holds the value
- // for start. The diff (end - start) is taken as the real value.
- Value start;
- // Whether the start data point is updated
- bool startUpdated;
+ // Holds current base value of the dimension. Take diff and update if necessary.
+ Value base;
+ // Whether there is a base to diff to.
+ bool hasBase;
// Current value, depending on the aggregation type.
Value value;
// Number of samples collected.
@@ -172,7 +149,11 @@
const ValueMetric::AggregationType mAggregationType;
- const Type mValueType;
+ const bool mUseDiff;
+
+ const ValueMetric::ValueDirection mValueDirection;
+
+ const bool mSkipZeroDiffOutput;
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition);
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset);
@@ -187,13 +168,13 @@
FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition);
FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition);
FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2);
- FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3);
FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMin);
FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMax);
FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateAvg);
FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSum);
- FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSumSliced);
FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket);
+ FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime);
+ FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput);
};
} // namespace statsd
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index 8bfa360..4da3828 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -374,6 +374,7 @@
optional int64 max_pull_time_nanos = 6;
optional int64 average_pull_delay_nanos = 7;
optional int64 max_pull_delay_nanos = 8;
+ optional int64 data_error = 9;
}
repeated PulledAtomStats pulled_atom_stats = 10;
diff --git a/cmds/statsd/src/stats_log_util.cpp b/cmds/statsd/src/stats_log_util.cpp
index 44fa72e..504c586 100644
--- a/cmds/statsd/src/stats_log_util.cpp
+++ b/cmds/statsd/src/stats_log_util.cpp
@@ -63,6 +63,7 @@
const int FIELD_ID_MAX_PULL_TIME_NANOS = 6;
const int FIELD_ID_AVERAGE_PULL_DELAY_NANOS = 7;
const int FIELD_ID_MAX_PULL_DELAY_NANOS = 8;
+const int FIELD_ID_DATA_ERROR = 9;
namespace {
@@ -446,6 +447,7 @@
(long long)pair.second.avgPullDelayNs);
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_MAX_PULL_DELAY_NANOS,
(long long)pair.second.maxPullDelayNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DATA_ERROR, (long long)pair.second.dataError);
protoOutput->end(token);
}
diff --git a/cmds/statsd/src/stats_log_util.h b/cmds/statsd/src/stats_log_util.h
index b8f6850..61f31eb 100644
--- a/cmds/statsd/src/stats_log_util.h
+++ b/cmds/statsd/src/stats_log_util.h
@@ -21,6 +21,7 @@
#include "HashableDimensionKey.h"
#include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
#include "guardrail/StatsdStats.h"
+#include "statslog.h"
namespace android {
namespace os {
@@ -87,6 +88,10 @@
// Returns the truncated timestamp.
int64_t truncateTimestampNsToFiveMinutes(int64_t timestampNs);
+inline bool isPushedAtom(int atomId) {
+ return atomId <= util::kMaxPushedAtomId && atomId > 1;
+}
+
} // namespace statsd
} // namespace os
} // namespace android
diff --git a/cmds/statsd/src/statsd_config.proto b/cmds/statsd/src/statsd_config.proto
index d5f81a59..5c46a29 100644
--- a/cmds/statsd/src/statsd_config.proto
+++ b/cmds/statsd/src/statsd_config.proto
@@ -270,6 +270,17 @@
optional int64 min_bucket_size_nanos = 10;
optional bool use_absolute_value_on_reset = 11 [default = false];
+
+ optional bool use_diff = 12;
+
+ enum ValueDirection {
+ INCREASING = 1;
+ DECREASING = 2;
+ ANY = 3;
+ }
+ optional ValueDirection value_direction = 13 [default = INCREASING];
+
+ optional bool skip_zero_diff_output = 14 [default = true];
}
message Alert {
diff --git a/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp b/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp
index fed5a3f..095b401 100644
--- a/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp
+++ b/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp
@@ -49,6 +49,7 @@
CreateDimensions(android::util::TEMPERATURE, {2/* sensor name field */ });
valueMetric->set_bucket(FIVE_MINUTES);
valueMetric->set_use_absolute_value_on_reset(true);
+ valueMetric->set_skip_zero_diff_output(false);
return config;
}
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index 57aab97..ffa07081 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -47,7 +47,35 @@
const int64_t bucket4StartTimeNs = bucketStartTimeNs + 3 * bucketSizeNs;
const int64_t bucket5StartTimeNs = bucketStartTimeNs + 4 * bucketSizeNs;
const int64_t bucket6StartTimeNs = bucketStartTimeNs + 5 * bucketSizeNs;
-const int64_t eventUpgradeTimeNs = bucketStartTimeNs + 15 * NS_PER_SEC;
+double epsilon = 0.001;
+
+/*
+ * Tests that the first bucket works correctly
+ */
+TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime) {
+ ValueMetric metric;
+ metric.set_id(metricId);
+ metric.set_bucket(ONE_MINUTE);
+ metric.mutable_value_field()->set_field(tagId);
+ metric.mutable_value_field()->add_child()->set_field(2);
+
+ int64_t startTimeBase = 11;
+
+ sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+
+ // statsd started long ago.
+ // The metric starts in the middle of the bucket
+ ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
+ -1, startTimeBase, 22, pullerManager);
+
+ EXPECT_EQ(startTimeBase, valueProducer.calcPreviousBucketEndTime(60 * NS_PER_SEC + 10));
+ EXPECT_EQ(startTimeBase, valueProducer.calcPreviousBucketEndTime(60 * NS_PER_SEC + 10));
+ EXPECT_EQ(60 * NS_PER_SEC + startTimeBase,
+ valueProducer.calcPreviousBucketEndTime(2 * 60 * NS_PER_SEC));
+ EXPECT_EQ(2 * 60 * NS_PER_SEC + startTimeBase,
+ valueProducer.calcPreviousBucketEndTime(3 * 60 * NS_PER_SEC));
+}
/*
* Tests that the first bucket works correctly
@@ -90,7 +118,7 @@
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
event->write(tagId);
event->write(3);
event->init();
@@ -114,12 +142,11 @@
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- // startUpdated:true sum:0 start:11
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(11, curInterval.start.long_value);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(8, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(11, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(8, curInterval.value.long_value);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
allData.clear();
event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1);
@@ -131,12 +158,14 @@
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- // tartUpdated:false sum:12
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
+
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(23, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(12, curInterval.value.long_value);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+ EXPECT_EQ(8, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
allData.clear();
event = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1);
@@ -147,12 +176,14 @@
valueProducer.onDataPulled(allData);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- // startUpdated:false sum:12
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
+
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(36, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(13, curInterval.value.long_value);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(3UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(13, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
+ EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
/*
@@ -170,7 +201,7 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(false));
+ EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(true));
ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
tagId, bucketStartTimeNs, bucketStartTimeNs, pullerManager);
@@ -188,9 +219,9 @@
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(11, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(11, curInterval.start.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
allData.clear();
@@ -203,11 +234,11 @@
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(10, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(10, curInterval.value.long_value);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
allData.clear();
event = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1);
@@ -218,11 +249,13 @@
valueProducer.onDataPulled(allData);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(36, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(26, curInterval.value.long_value);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(26, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+ EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
/*
@@ -257,9 +290,9 @@
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(11, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(11, curInterval.start.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
allData.clear();
@@ -272,7 +305,8 @@
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(10, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
@@ -285,11 +319,11 @@
valueProducer.onDataPulled(allData);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(26, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(36, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(26, curInterval.value.long_value);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
}
/*
@@ -309,21 +343,10 @@
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- // should not take effect
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
- event->write(tagId);
- event->write(3);
- event->init();
- data->push_back(event);
- return true;
- }))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
- data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
event->write(tagId);
event->write(100);
event->init();
@@ -333,7 +356,7 @@
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
event->write(tagId);
event->write(120);
event->init();
@@ -349,8 +372,8 @@
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
// startUpdated:false sum:0 start:100
- EXPECT_EQ(100, curInterval.start.long_value);
- EXPECT_EQ(true, curInterval.startUpdated);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(100, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
@@ -366,20 +389,20 @@
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- // startUpdated:false sum:0 start:110
- EXPECT_EQ(110, curInterval.start.long_value);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(110, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(10, curInterval.value.long_value);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- // startUpdated:false sum:0 start:110
+ EXPECT_EQ(true, curInterval.hasValue);
EXPECT_EQ(10, curInterval.value.long_value);
- EXPECT_EQ(false, curInterval.startUpdated);
+ EXPECT_EQ(false, curInterval.hasBase);
}
TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade) {
@@ -401,9 +424,9 @@
valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
- valueProducer.notifyAppUpgrade(eventUpgradeTimeNs, "ANY.APP", 1, 1);
+ valueProducer.notifyAppUpgrade(bucketStartTimeNs + 150, "ANY.APP", 1, 1);
EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
- EXPECT_EQ(eventUpgradeTimeNs, valueProducer.mCurrentBucketStartTimeNs);
+ EXPECT_EQ(bucketStartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs);
shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 59 * NS_PER_SEC);
event2->write(1);
@@ -411,7 +434,7 @@
event2->init();
valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
- EXPECT_EQ(eventUpgradeTimeNs, valueProducer.mCurrentBucketStartTimeNs);
+ EXPECT_EQ(bucketStartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs);
// Next value should create a new bucket.
shared_ptr<LogEvent> event3 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 65 * NS_PER_SEC);
@@ -435,11 +458,11 @@
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Return(false))
+ .WillOnce(Return(true))
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 149);
event->write(tagId);
event->write(120);
event->init();
@@ -451,7 +474,7 @@
vector<shared_ptr<LogEvent>> allData;
allData.clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
event->write(tagId);
event->write(100);
event->init();
@@ -460,21 +483,21 @@
valueProducer.onDataPulled(allData);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
- valueProducer.notifyAppUpgrade(eventUpgradeTimeNs, "ANY.APP", 1, 1);
+ valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1);
EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
- EXPECT_EQ(eventUpgradeTimeNs, valueProducer.mCurrentBucketStartTimeNs);
- EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValueLong);
+ EXPECT_EQ(bucket2StartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs);
+ EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].value.long_value);
allData.clear();
- event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+ event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1);
event->write(tagId);
event->write(150);
event->init();
allData.push_back(event);
valueProducer.onDataPulled(allData);
- EXPECT_EQ(2UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
- EXPECT_EQ(bucket2StartTimeNs, valueProducer.mCurrentBucketStartTimeNs);
- EXPECT_EQ(30L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][1].mValueLong);
+ EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
+ EXPECT_EQ(bucket2StartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs);
+ EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].value.long_value);
}
TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse) {
@@ -490,11 +513,10 @@
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Return(false))
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
event->write(tagId);
event->write(100);
event->init();
@@ -504,7 +526,7 @@
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs - 100);
event->write(tagId);
event->write(120);
event->init();
@@ -523,7 +545,7 @@
EXPECT_EQ(bucket2StartTimeNs-50, valueProducer.mCurrentBucketStartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
EXPECT_EQ(bucketStartTimeNs, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketStartNs);
- EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValueLong);
+ EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].value.long_value);
EXPECT_FALSE(valueProducer.mCondition);
}
@@ -565,7 +587,7 @@
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) {
@@ -587,9 +609,7 @@
event1->init();
valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
// has 1 slice
- EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
- ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size());
valueProducer.onConditionChangedLocked(true, bucketStartTimeNs + 15);
shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
@@ -600,6 +620,7 @@
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
EXPECT_EQ(20, curInterval.value.long_value);
@@ -629,7 +650,7 @@
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(50, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(50, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
TEST(ValueMetricProducerTest, TestAnomalyDetection) {
@@ -727,7 +748,7 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(false));
+ EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(true));
ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
tagId, bucketStartTimeNs, bucketStartTimeNs, pullerManager);
@@ -747,9 +768,9 @@
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
// startUpdated:true sum:0 start:11
- EXPECT_EQ(true, curInterval.startUpdated);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(11, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(11, curInterval.start.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
// pull 2 at correct time
@@ -764,11 +785,11 @@
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
// tartUpdated:false sum:12
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(23, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(12, curInterval.value.long_value);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
// pull 3 come late.
// The previous bucket gets closed with error. (Has start value 23, no ending)
@@ -784,12 +805,12 @@
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
// startUpdated:false sum:12
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(36, curInterval.start.long_value);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(36, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
/*
@@ -810,12 +831,11 @@
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Return(false))
// condition becomes true
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
event->write(tagId);
event->write(100);
event->init();
@@ -826,7 +846,7 @@
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 20);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
event->write(tagId);
event->write(120);
event->init();
@@ -841,17 +861,17 @@
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- // startUpdated:false sum:0 start:100
- EXPECT_EQ(100, curInterval.start.long_value);
- EXPECT_EQ(true, curInterval.startUpdated);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(100, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
// pull on bucket boundary come late, condition change happens before it
valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(false, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(false, curInterval.hasBase);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(20, curInterval.value.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
// Now the alarm is delivered.
@@ -866,8 +886,9 @@
valueProducer.onDataPulled(allData);
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(false, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(false, curInterval.hasBase);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(20, curInterval.value.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
}
@@ -889,12 +910,11 @@
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Return(false))
// condition becomes true
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
event->write(tagId);
event->write(100);
event->init();
@@ -905,7 +925,7 @@
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 20);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
event->write(tagId);
event->write(120);
event->init();
@@ -916,7 +936,7 @@
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 30);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 25);
event->write(tagId);
event->write(130);
event->init();
@@ -932,24 +952,26 @@
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
// startUpdated:false sum:0 start:100
- EXPECT_EQ(100, curInterval.start.long_value);
- EXPECT_EQ(true, curInterval.startUpdated);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(100, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
// pull on bucket boundary come late, condition change happens before it
valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(false, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(false, curInterval.hasBase);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(20, curInterval.value.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
// condition changed to true again, before the pull alarm is delivered
valueProducer.onConditionChanged(true, bucket2StartTimeNs + 25);
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(130, curInterval.start.long_value);
- EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(130, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(20, curInterval.value.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
// Now the alarm is delivered, but it is considered late, it has no effect
@@ -963,89 +985,10 @@
valueProducer.onDataPulled(allData);
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(130, curInterval.start.long_value);
- EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
-}
-
-/*
- * Test pulled event with non sliced condition. The pull on boundary come late because the puller is
- * very slow.
- */
-TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3) {
- ValueMetric metric;
- metric.set_id(metricId);
- metric.set_bucket(ONE_MINUTE);
- metric.mutable_value_field()->set_field(tagId);
- metric.mutable_value_field()->add_child()->set_field(2);
- metric.set_condition(StringToId("SCREEN_ON"));
-
- sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
- sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
- EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
-
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Return(false))
- // condition becomes true
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
- data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
- event->write(tagId);
- event->write(100);
- event->init();
- data->push_back(event);
- return true;
- }))
- // condition becomes false
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
- data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 20);
- event->write(tagId);
- event->write(120);
- event->init();
- data->push_back(event);
- return true;
- }));
-
- ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs,
- bucketStartTimeNs, pullerManager);
- valueProducer.onConditionChanged(true, bucketStartTimeNs + 8);
-
- // has one slice
- EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
- ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- // startUpdated:false sum:0 start:100
- EXPECT_EQ(100, curInterval.start.long_value);
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
-
- // pull on bucket boundary come late, condition change happens before it.
- // But puller is very slow in this one, so the data come after bucket finish
- valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
- curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(false, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
-
- // Alarm is delivered in time, but the pull is very slow, and pullers are called in order,
- // so this one comes even later
- vector<shared_ptr<LogEvent>> allData;
- allData.clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 30);
- event->write(1);
- event->write(110);
- event->init();
- allData.push_back(event);
- valueProducer.onDataPulled(allData);
-
- curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(false, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(130, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(20, curInterval.value.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
}
@@ -1088,7 +1031,7 @@
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
TEST(ValueMetricProducerTest, TestPushedAggregateMax) {
@@ -1130,7 +1073,7 @@
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(20, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(20, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
TEST(ValueMetricProducerTest, TestPushedAggregateAvg) {
@@ -1175,7 +1118,7 @@
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(12.5, valueProducer.mPastBuckets.begin()->second.back().mValueDouble);
+ EXPECT_TRUE(std::abs(valueProducer.mPastBuckets.begin()->second.back().value.double_value - 12.5) < epsilon);
}
TEST(ValueMetricProducerTest, TestPushedAggregateSum) {
@@ -1217,67 +1160,75 @@
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(25, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(25, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
-TEST(ValueMetricProducerTest, TestPushedAggregateSumSliced) {
- string slicedConditionName = "UID";
- const int conditionTagId = 2;
+TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput) {
ValueMetric metric;
metric.set_id(metricId);
metric.set_bucket(ONE_MINUTE);
metric.mutable_value_field()->set_field(tagId);
- metric.mutable_value_field()->add_child()->set_field(1);
- metric.set_aggregation_type(ValueMetric::SUM);
-
- metric.set_condition(StringToId(slicedConditionName));
- MetricConditionLink* link = metric.add_links();
- link->set_condition(StringToId(slicedConditionName));
- buildSimpleAtomFieldMatcher(tagId, 2, link->mutable_fields_in_what());
- buildSimpleAtomFieldMatcher(conditionTagId, 2, link->mutable_fields_in_condition());
-
- LogEvent event1(tagId, bucketStartTimeNs + 10);
- event1.write(10); // value
- event1.write("111"); // uid
- event1.init();
- ConditionKey key1;
- key1[StringToId(slicedConditionName)] =
- {getMockedDimensionKey(conditionTagId, 2, "111")};
-
- LogEvent event2(tagId, bucketStartTimeNs + 20);
- event2.write(15);
- event2.write("222");
- event2.init();
- ConditionKey key2;
- key2[StringToId(slicedConditionName)] =
- {getMockedDimensionKey(conditionTagId, 2, "222")};
+ metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_aggregation_type(ValueMetric::MIN);
+ metric.set_use_diff(true);
sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
- EXPECT_CALL(*wizard, query(_, key1, _, _, _, _)).WillOnce(Return(ConditionState::kFalse));
- EXPECT_CALL(*wizard, query(_, key2, _, _, _, _)).WillOnce(Return(ConditionState::kTrue));
-
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
- ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, -1, bucketStartTimeNs,
+ ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, -1, bucketStartTimeNs,
bucketStartTimeNs, pullerManager);
- valueProducer.onMatchedLogEvent(1 /*log matcher index*/, event1);
-
+ shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ event1->write(1);
+ event1->write(10);
+ event1->init();
+ shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 15);
+ event2->write(1);
+ event2->write(15);
+ event2->init();
+ valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
+ // has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(10, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
- valueProducer.onMatchedLogEvent(1 /*log matcher index*/, event2);
+ valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(15, curInterval.value.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(5, curInterval.value.long_value);
+
+ // no change in data.
+ shared_ptr<LogEvent> event3 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 10);
+ event3->write(1);
+ event3->write(15);
+ event3->init();
+ valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event3);
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(15, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+
+ shared_ptr<LogEvent> event4 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 15);
+ event4->write(1);
+ event4->write(15);
+ event4->init();
+ valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event4);
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(15, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(15, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(5, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
} // namespace statsd