summaryrefslogtreecommitdiff
path: root/cmds/statsd/src/metrics/ValueMetricProducer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cmds/statsd/src/metrics/ValueMetricProducer.cpp')
-rw-r--r--cmds/statsd/src/metrics/ValueMetricProducer.cpp436
1 files changed, 295 insertions, 141 deletions
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 41e55cb27f5e..a34df8aabea2 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -27,7 +27,7 @@
using android::util::FIELD_COUNT_REPEATED;
using android::util::FIELD_TYPE_BOOL;
-using android::util::FIELD_TYPE_FLOAT;
+using android::util::FIELD_TYPE_DOUBLE;
using android::util::FIELD_TYPE_INT32;
using android::util::FIELD_TYPE_INT64;
using android::util::FIELD_TYPE_MESSAGE;
@@ -64,21 +64,31 @@ const int FIELD_ID_BUCKET_INFO = 3;
const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5;
// for ValueBucketInfo
-const int FIELD_ID_VALUE = 3;
+const int FIELD_ID_VALUE_INDEX = 1;
+const int FIELD_ID_VALUE_LONG = 2;
+const int FIELD_ID_VALUE_DOUBLE = 3;
+const int FIELD_ID_VALUES = 9;
const int FIELD_ID_BUCKET_NUM = 4;
const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
// ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
-ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric& metric,
+ValueMetricProducer::ValueMetricProducer(const ConfigKey& key,
+ const ValueMetric& metric,
const int conditionIndex,
- const sp<ConditionWizard>& wizard, const int pullTagId,
- const int64_t timeBaseNs, const int64_t startTimestampNs,
- shared_ptr<StatsPullerManager> statsPullerManager)
- : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
- mValueField(metric.value_field()),
- mStatsPullerManager(statsPullerManager),
+ const sp<ConditionWizard>& conditionWizard,
+ const int whatMatcherIndex,
+ const sp<EventMatcherWizard>& matcherWizard,
+ const int pullTagId,
+ const int64_t timeBaseNs,
+ const int64_t startTimeNs,
+ const sp<StatsPullerManager>& pullerManager)
+ : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, conditionWizard),
+ mWhatMatcherIndex(whatMatcherIndex),
+ mEventMatcherWizard(matcherWizard),
+ mPullerManager(pullerManager),
mPullTagId(pullTagId),
+ mIsPulled(pullTagId != -1),
mMinBucketSizeNs(metric.min_bucket_size_nanos()),
mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
StatsdStats::kAtomDimensionKeySizeLimitMap.end()
@@ -88,8 +98,11 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric
StatsdStats::kAtomDimensionKeySizeLimitMap.end()
? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second
: StatsdStats::kDimensionKeySizeHardLimit),
- mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()) {
- // TODO: valuemetric for pushed events may need unlimited bucket length
+ mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
+ mAggregationType(metric.aggregation_type()),
+ mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)),
+ mValueDirection(metric.value_direction()),
+ mSkipZeroDiffOutput(metric.skip_zero_diff_output()) {
int64_t bucketSizeMills = 0;
if (metric.has_bucket()) {
bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
@@ -98,6 +111,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric
}
mBucketSizeNs = bucketSizeMills * 1000000;
+
+ translateFieldMatcher(metric.value_field(), &mFieldMatchers);
+
if (metric.has_dimensions_in_what()) {
translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
@@ -117,37 +133,33 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric
}
}
- if (mValueField.child_size() > 0) {
- mField = mValueField.child(0).field();
- }
mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
- HasPositionALL(metric.dimensions_in_condition());
+ HasPositionALL(metric.dimensions_in_condition());
- // Kicks off the puller immediately.
- flushIfNeededLocked(startTimestampNs);
- if (mPullTagId != -1) {
- mStatsPullerManager->RegisterReceiver(
- mPullTagId, this, mCurrentBucketStartTimeNs + mBucketSizeNs, mBucketSizeNs);
- }
+ flushIfNeededLocked(startTimeNs);
- VLOG("value metric %lld created. bucket size %lld start_time: %lld",
- (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs);
-}
+ if (mIsPulled) {
+ mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(),
+ mBucketSizeNs);
+ }
-// for testing
-ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric& metric,
- const int conditionIndex,
- const sp<ConditionWizard>& wizard, const int pullTagId,
- const int64_t timeBaseNs, const int64_t startTimeNs)
- : ValueMetricProducer(key, metric, conditionIndex, wizard, pullTagId, timeBaseNs, startTimeNs,
- make_shared<StatsPullerManager>()) {
+ // Only do this for partial buckets like first bucket. All other buckets should use
+ // flushIfNeeded to adjust start and end to bucket boundaries.
+ // Adjust start for partial bucket
+ mCurrentBucketStartTimeNs = startTimeNs;
+ // Kicks off the puller immediately if condition is true and diff based.
+ if (mIsPulled && mCondition && mUseDiff) {
+ pullAndMatchEventsLocked(startTimeNs);
+ }
+ VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
+ (long long)mBucketSizeNs, (long long)mTimeBaseNs);
}
ValueMetricProducer::~ValueMetricProducer() {
VLOG("~ValueMetricProducer() called");
- if (mPullTagId != -1) {
- mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
+ if (mIsPulled) {
+ mPullerManager->UnRegisterReceiver(mPullTagId, this);
}
}
@@ -169,6 +181,7 @@ void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
const bool include_current_partial_bucket,
+ const bool erase_data,
std::set<string> *str_set,
ProtoOutputStream* protoOutput) {
VLOG("metric %lld dump report now...", (long long)mMetricId);
@@ -186,14 +199,14 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
// Fills the dimension path if not slicing by ALL.
if (!mSliceByPositionALL) {
if (!mDimensionsInWhat.empty()) {
- uint64_t dimenPathToken = protoOutput->start(
- FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
+ uint64_t dimenPathToken =
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
protoOutput->end(dimenPathToken);
}
if (!mDimensionsInCondition.empty()) {
- uint64_t dimenPathToken = protoOutput->start(
- FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION);
+ uint64_t dimenPathToken =
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION);
writeDimensionPathToProto(mDimensionsInCondition, protoOutput);
protoOutput->end(dimenPathToken);
}
@@ -210,7 +223,6 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
(long long)(NanoToMillis(pair.second)));
protoOutput->end(wrapperToken);
}
- mSkippedBuckets.clear();
for (const auto& pair : mPastBuckets) {
const MetricDimensionKey& dimensionKey = pair.first;
@@ -220,15 +232,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
// First fill dimension.
if (mSliceByPositionALL) {
- uint64_t dimensionToken = protoOutput->start(
- FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
+ uint64_t dimensionToken =
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput);
protoOutput->end(dimensionToken);
if (dimensionKey.hasDimensionKeyInCondition()) {
- uint64_t dimensionInConditionToken = protoOutput->start(
- FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
- writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(),
- str_set, protoOutput);
+ uint64_t dimensionInConditionToken =
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
+ writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set,
+ protoOutput);
protoOutput->end(dimensionInConditionToken);
}
} else {
@@ -236,8 +248,8 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
if (dimensionKey.hasDimensionKeyInCondition()) {
writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(),
- FIELD_ID_DIMENSION_LEAF_IN_CONDITION,
- str_set, protoOutput);
+ FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set,
+ protoOutput);
}
}
@@ -255,24 +267,43 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
(long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
}
-
- protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE, (long long)bucket.mValue);
+ for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) {
+ int index = bucket.valueIndex[i];
+ const Value& value = bucket.values[i];
+ uint64_t valueToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
+ protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX,
+ index);
+ if (value.getType() == LONG) {
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
+ (long long)value.long_value);
+ VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs,
+ (long long)bucket.mBucketEndNs, index, (long long)value.long_value);
+ } else if (value.getType() == DOUBLE) {
+ protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE,
+ value.double_value);
+ VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs,
+ (long long)bucket.mBucketEndNs, index, value.double_value);
+ } else {
+ VLOG("Wrong value type for ValueMetric output: %d", value.getType());
+ }
+ protoOutput->end(valueToken);
+ }
protoOutput->end(bucketInfoToken);
- VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
- (long long)bucket.mBucketEndNs, (long long)bucket.mValue);
}
protoOutput->end(wrapperToken);
}
protoOutput->end(protoToken);
VLOG("metric %lld dump report now...", (long long)mMetricId);
- mPastBuckets.clear();
+ if (erase_data) {
+ mPastBuckets.clear();
+ mSkippedBuckets.clear();
+ }
}
void ValueMetricProducer::onConditionChangedLocked(const bool condition,
const int64_t eventTimeNs) {
- mCondition = condition;
-
if (eventTimeNs < mCurrentBucketStartTimeNs) {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
(long long)mCurrentBucketStartTimeNs);
@@ -281,44 +312,72 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
flushIfNeededLocked(eventTimeNs);
- if (mPullTagId != -1) {
- vector<shared_ptr<LogEvent>> allData;
- if (mStatsPullerManager->Pull(mPullTagId, eventTimeNs, &allData)) {
- if (allData.size() == 0) {
- return;
+ // Pull on condition changes.
+ if (mIsPulled && (mCondition != condition)) {
+ pullAndMatchEventsLocked(eventTimeNs);
+ }
+
+ // when condition change from true to false, clear diff base
+ if (mUseDiff && mCondition && !condition) {
+ for (auto& slice : mCurrentSlicedBucket) {
+ for (auto& interval : slice.second) {
+ interval.hasBase = false;
}
- for (const auto& data : allData) {
- onMatchedLogEventLocked(0, *data);
+ }
+ }
+
+ mCondition = condition;
+}
+
+void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
+ vector<std::shared_ptr<LogEvent>> allData;
+ if (mPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
+ if (allData.size() == 0) {
+ return;
+ }
+ for (const auto& data : allData) {
+ if (mEventMatcherWizard->matchLogEvent(
+ *data, mWhatMatcherIndex) == MatchingState::kMatched) {
+ onMatchedLogEventLocked(mWhatMatcherIndex, *data);
}
}
- return;
}
}
+int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
+ return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
+}
+
void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
std::lock_guard<std::mutex> lock(mMutex);
-
- if (mCondition == true || mConditionTrackerIndex < 0) {
+ if (mCondition) {
if (allData.size() == 0) {
+ VLOG("Data pulled is empty");
return;
}
// For scheduled pulled data, the effective event time is snap to the nearest
- // bucket boundary to make bucket finalize.
+ // bucket end. In the case of waking up from a deep sleep state, we will
+ // attribute to the previous bucket end. If the sleep was long but not very long, we
+ // will be in the immediate next bucket. Previous bucket may get a larger number as
+ // we pull at a later time than real bucket end.
+ // If the sleep was very long, we skip more than one bucket before sleep. In this case,
+ // if the diff base will be cleared and this new data will serve as new diff base.
int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs();
- int64_t eventTime = mTimeBaseNs +
- ((realEventTime - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
-
- mCondition = false;
- for (const auto& data : allData) {
- data->setElapsedTimestampNs(eventTime - 1);
- onMatchedLogEventLocked(0, *data);
+ int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1;
+ if (bucketEndTime < mCurrentBucketStartTimeNs) {
+ VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime,
+ (long long)mCurrentBucketStartTimeNs);
+ return;
}
-
- mCondition = true;
for (const auto& data : allData) {
- data->setElapsedTimestampNs(eventTime);
- onMatchedLogEventLocked(0, *data);
+ if (mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex) ==
+ MatchingState::kMatched) {
+ data->setElapsedTimestampNs(bucketEndTime);
+ onMatchedLogEventLocked(mWhatMatcherIndex, *data);
+ }
}
+ } else {
+ VLOG("No need to commit data on condition false.");
}
}
@@ -331,10 +390,12 @@ void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
(unsigned long)mCurrentSlicedBucket.size());
if (verbose) {
for (const auto& it : mCurrentSlicedBucket) {
- fprintf(out, "\t(what)%s\t(condition)%s (value)%lld\n",
- it.first.getDimensionKeyInWhat().toString().c_str(),
- it.first.getDimensionKeyInCondition().toString().c_str(),
- (unsigned long long)it.second.sum);
+ for (const auto& interval : it.second) {
+ fprintf(out, "\t(what)%s\t(condition)%s (value)%s\n",
+ it.first.getDimensionKeyInWhat().toString().c_str(),
+ it.first.getDimensionKeyInCondition().toString().c_str(),
+ interval.value.toString().c_str());
+ }
}
}
}
@@ -350,8 +411,8 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
// 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
if (newTupleCount > mDimensionHardLimit) {
- ALOGE("ValueMetric %lld dropping data for dimension key %s",
- (long long)mMetricId, newKey.toString().c_str());
+ ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId,
+ newKey.toString().c_str());
return true;
}
}
@@ -359,10 +420,35 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
return false;
}
-void ValueMetricProducer::onMatchedLogEventInternalLocked(
- const size_t matcherIndex, const MetricDimensionKey& eventKey,
- const ConditionKey& conditionKey, bool condition,
- const LogEvent& event) {
+bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
+ for (const FieldValue& value : event.getValues()) {
+ if (value.mField.matches(matcher)) {
+ switch (value.mValue.type) {
+ case INT:
+ ret.setLong(value.mValue.int_value);
+ break;
+ case LONG:
+ ret.setLong(value.mValue.long_value);
+ break;
+ case FLOAT:
+ ret.setDouble(value.mValue.float_value);
+ break;
+ case DOUBLE:
+ ret.setDouble(value.mValue.double_value);
+ break;
+ default:
+ break;
+ }
+ return true;
+ }
+ }
+ return false;
+}
+
+void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex,
+ const MetricDimensionKey& eventKey,
+ const ConditionKey& conditionKey,
+ bool condition, const LogEvent& event) {
int64_t eventTimeNs = event.GetElapsedTimestampNs();
if (eventTimeNs < mCurrentBucketStartTimeNs) {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
@@ -372,56 +458,101 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(
flushIfNeededLocked(eventTimeNs);
- if (hitGuardRailLocked(eventKey)) {
+ // For pulled data, we already check condition when we decide to pull or
+ // in onDataPulled. So take all of them.
+ // For pushed data, just check condition.
+ if (!(mIsPulled || condition)) {
+ VLOG("ValueMetric skip event because condition is false");
return;
}
- Interval& interval = mCurrentSlicedBucket[eventKey];
- int error = 0;
- const int64_t value = event.GetLong(mField, &error);
- if (error < 0) {
+ if (hitGuardRailLocked(eventKey)) {
return;
}
+ vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey];
+ if (multiIntervals.size() < mFieldMatchers.size()) {
+ VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
+ multiIntervals.resize(mFieldMatchers.size());
+ }
- if (mPullTagId != -1) { // for pulled events
- if (mCondition == true) {
- if (!interval.startUpdated) {
- interval.start = value;
- interval.startUpdated = true;
- } else {
- // skip it if there is already value recorded for the start
- VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str());
+ for (int i = 0; i < (int)mFieldMatchers.size(); i++) {
+ const Matcher& matcher = mFieldMatchers[i];
+ Interval& interval = multiIntervals[i];
+ interval.valueIndex = i;
+ Value value;
+ if (!getDoubleOrLong(event, matcher, value)) {
+ VLOG("Failed to get value %d from event %s", i, event.ToString().c_str());
+ return;
+ }
+
+ if (mUseDiff) {
+ // no base. just update base and return.
+ if (!interval.hasBase) {
+ interval.base = value;
+ interval.hasBase = true;
+ return;
}
- } else {
- // Generally we expect value to be monotonically increasing.
- // If not, take absolute value or drop it, based on config.
- if (interval.startUpdated) {
- if (value >= interval.start) {
- interval.sum += (value - interval.start);
- interval.hasValue = true;
- } else {
- if (mUseAbsoluteValueOnReset) {
- interval.sum += value;
- interval.hasValue = true;
+ Value diff;
+ switch (mValueDirection) {
+ case ValueMetric::INCREASING:
+ if (value >= interval.base) {
+ diff = value - interval.base;
+ } else if (mUseAbsoluteValueOnReset) {
+ diff = value;
} else {
- VLOG("Dropping data for atom %d, prev: %lld, now: %lld", mPullTagId,
- (long long)interval.start, (long long)value);
+ VLOG("Unexpected decreasing value");
+ StatsdStats::getInstance().notePullDataError(mPullTagId);
+ interval.base = value;
+ return;
}
- }
- interval.startUpdated = false;
- } else {
- VLOG("No start for matching end %lld", (long long)value);
- interval.tainted += 1;
+ break;
+ case ValueMetric::DECREASING:
+ if (interval.base >= value) {
+ diff = interval.base - value;
+ } else if (mUseAbsoluteValueOnReset) {
+ diff = value;
+ } else {
+ VLOG("Unexpected increasing value");
+ StatsdStats::getInstance().notePullDataError(mPullTagId);
+ interval.base = value;
+ return;
+ }
+ break;
+ case ValueMetric::ANY:
+ diff = value - interval.base;
+ break;
+ default:
+ break;
}
+ interval.base = value;
+ value = diff;
}
- } else { // for pushed events, only accumulate when condition is true
- if (mCondition == true || mConditionTrackerIndex < 0) {
- interval.sum += value;
+
+ if (interval.hasValue) {
+ switch (mAggregationType) {
+ case ValueMetric::SUM:
+ // for AVG, we add up and take average when flushing the bucket
+ case ValueMetric::AVG:
+ interval.value += value;
+ break;
+ case ValueMetric::MIN:
+ interval.value = std::min(value, interval.value);
+ break;
+ case ValueMetric::MAX:
+ interval.value = std::max(value, interval.value);
+ break;
+ default:
+ break;
+ }
+ } else {
+ interval.value = value;
interval.hasValue = true;
}
+ interval.sampleSize += 1;
}
- long wholeBucketVal = interval.sum;
+ // TODO: propgate proper values down stream when anomaly support doubles
+ long wholeBucketVal = multiIntervals[0].value.long_value;
auto prev = mCurrentFullBucket.find(eventKey);
if (prev != mCurrentFullBucket.end()) {
wholeBucketVal += prev->second;
@@ -448,6 +579,12 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
if (numBucketsForward > 1) {
VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
+ // take base again in future good bucket.
+ for (auto& slice : mCurrentSlicedBucket) {
+ for (auto& interval : slice.second) {
+ interval.hasBase = false;
+ }
+ }
}
VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
(long long)mCurrentBucketStartTimeNs);
@@ -458,37 +595,46 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
(int)mCurrentSlicedBucket.size());
int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
- ValueBucket info;
- info.mBucketStartNs = mCurrentBucketStartTimeNs;
- if (eventTimeNs < fullBucketEndTimeNs) {
- info.mBucketEndNs = eventTimeNs;
- } else {
- info.mBucketEndNs = fullBucketEndTimeNs;
- }
+ int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
- if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
+ if (bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
// The current bucket is large enough to keep.
- int tainted = 0;
for (const auto& slice : mCurrentSlicedBucket) {
- tainted += slice.second.tainted;
- tainted += slice.second.startUpdated;
- if (slice.second.hasValue) {
- info.mValue = slice.second.sum;
- // it will auto create new vector of ValuebucketInfo if the key is not found.
+ ValueBucket bucket;
+ bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
+ bucket.mBucketEndNs = bucketEndTime;
+ for (const auto& interval : slice.second) {
+ if (interval.hasValue) {
+ // skip the output if the diff is zero
+ if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) {
+ continue;
+ }
+ bucket.valueIndex.push_back(interval.valueIndex);
+ if (mAggregationType != ValueMetric::AVG) {
+ bucket.values.push_back(interval.value);
+ } else {
+ double sum = interval.value.type == LONG ? (double)interval.value.long_value
+ : interval.value.double_value;
+ bucket.values.push_back(Value((double)sum / interval.sampleSize));
+ }
+ }
+ }
+ // it will auto create new vector of ValuebucketInfo if the key is not found.
+ if (bucket.valueIndex.size() > 0) {
auto& bucketList = mPastBuckets[slice.first];
- bucketList.push_back(info);
+ bucketList.push_back(bucket);
}
}
- VLOG("%d tainted pairs in the bucket", tainted);
} else {
- mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
+ mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime);
}
if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker.
// Accumulate partial buckets with current value and then send to anomaly tracker.
if (mCurrentFullBucket.size() > 0) {
for (const auto& slice : mCurrentSlicedBucket) {
- mCurrentFullBucket[slice.first] += slice.second.sum;
+ // TODO: fix this when anomaly can accept double values
+ mCurrentFullBucket[slice.first] += slice.second[0].value.long_value;
}
for (const auto& slice : mCurrentFullBucket) {
for (auto& tracker : mAnomalyTrackers) {
@@ -503,7 +649,9 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
for (const auto& slice : mCurrentSlicedBucket) {
for (auto& tracker : mAnomalyTrackers) {
if (tracker != nullptr) {
- tracker->addPastBucket(slice.first, slice.second.sum, mCurrentBucketNum);
+ // TODO: fix this when anomaly can accept double values
+ tracker->addPastBucket(slice.first, slice.second[0].value.long_value,
+ mCurrentBucketNum);
}
}
}
@@ -511,12 +659,18 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
} else {
// Accumulate partial bucket.
for (const auto& slice : mCurrentSlicedBucket) {
- mCurrentFullBucket[slice.first] += slice.second.sum;
+ // TODO: fix this when anomaly can accept double values
+ mCurrentFullBucket[slice.first] += slice.second[0].value.long_value;
}
}
// Reset counters
- mCurrentSlicedBucket.clear();
+ for (auto& slice : mCurrentSlicedBucket) {
+ for (auto& interval : slice.second) {
+ interval.hasValue = false;
+ interval.sampleSize = 0;
+ }
+ }
}
size_t ValueMetricProducer::byteSizeLocked() const {