diff options
Diffstat (limited to 'cmds/statsd/src/metrics/ValueMetricProducer.cpp')
-rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.cpp | 436 |
1 files changed, 295 insertions, 141 deletions
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index 41e55cb27f5e..a34df8aabea2 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -27,7 +27,7 @@ using android::util::FIELD_COUNT_REPEATED; using android::util::FIELD_TYPE_BOOL; -using android::util::FIELD_TYPE_FLOAT; +using android::util::FIELD_TYPE_DOUBLE; using android::util::FIELD_TYPE_INT32; using android::util::FIELD_TYPE_INT64; using android::util::FIELD_TYPE_MESSAGE; @@ -64,21 +64,31 @@ const int FIELD_ID_BUCKET_INFO = 3; const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4; const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5; // for ValueBucketInfo -const int FIELD_ID_VALUE = 3; +const int FIELD_ID_VALUE_INDEX = 1; +const int FIELD_ID_VALUE_LONG = 2; +const int FIELD_ID_VALUE_DOUBLE = 3; +const int FIELD_ID_VALUES = 9; const int FIELD_ID_BUCKET_NUM = 4; const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5; const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6; // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently -ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric& metric, +ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, + const ValueMetric& metric, const int conditionIndex, - const sp<ConditionWizard>& wizard, const int pullTagId, - const int64_t timeBaseNs, const int64_t startTimestampNs, - shared_ptr<StatsPullerManager> statsPullerManager) - : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard), - mValueField(metric.value_field()), - mStatsPullerManager(statsPullerManager), + const sp<ConditionWizard>& conditionWizard, + const int whatMatcherIndex, + const sp<EventMatcherWizard>& matcherWizard, + const int pullTagId, + const int64_t timeBaseNs, + const int64_t startTimeNs, + const sp<StatsPullerManager>& pullerManager) + : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, conditionWizard), + mWhatMatcherIndex(whatMatcherIndex), + mEventMatcherWizard(matcherWizard), + mPullerManager(pullerManager), mPullTagId(pullTagId), + mIsPulled(pullTagId != -1), mMinBucketSizeNs(metric.min_bucket_size_nanos()), mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != StatsdStats::kAtomDimensionKeySizeLimitMap.end() @@ -88,8 +98,11 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric StatsdStats::kAtomDimensionKeySizeLimitMap.end() ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second : StatsdStats::kDimensionKeySizeHardLimit), - mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()) { - // TODO: valuemetric for pushed events may need unlimited bucket length + mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()), + mAggregationType(metric.aggregation_type()), + mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)), + mValueDirection(metric.value_direction()), + mSkipZeroDiffOutput(metric.skip_zero_diff_output()) { int64_t bucketSizeMills = 0; if (metric.has_bucket()) { bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); @@ -98,6 +111,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric } mBucketSizeNs = bucketSizeMills * 1000000; + + translateFieldMatcher(metric.value_field(), &mFieldMatchers); + if (metric.has_dimensions_in_what()) { translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat); mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what()); @@ -117,37 +133,33 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric } } - if (mValueField.child_size() > 0) { - mField = mValueField.child(0).field(); - } mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0); mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) || - HasPositionALL(metric.dimensions_in_condition()); + HasPositionALL(metric.dimensions_in_condition()); - // Kicks off the puller immediately. - flushIfNeededLocked(startTimestampNs); - if (mPullTagId != -1) { - mStatsPullerManager->RegisterReceiver( - mPullTagId, this, mCurrentBucketStartTimeNs + mBucketSizeNs, mBucketSizeNs); - } + flushIfNeededLocked(startTimeNs); - VLOG("value metric %lld created. bucket size %lld start_time: %lld", - (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs); -} + if (mIsPulled) { + mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(), + mBucketSizeNs); + } -// for testing -ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric& metric, - const int conditionIndex, - const sp<ConditionWizard>& wizard, const int pullTagId, - const int64_t timeBaseNs, const int64_t startTimeNs) - : ValueMetricProducer(key, metric, conditionIndex, wizard, pullTagId, timeBaseNs, startTimeNs, - make_shared<StatsPullerManager>()) { + // Only do this for partial buckets like first bucket. All other buckets should use + // flushIfNeeded to adjust start and end to bucket boundaries. + // Adjust start for partial bucket + mCurrentBucketStartTimeNs = startTimeNs; + // Kicks off the puller immediately if condition is true and diff based. + if (mIsPulled && mCondition && mUseDiff) { + pullAndMatchEventsLocked(startTimeNs); + } + VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(), + (long long)mBucketSizeNs, (long long)mTimeBaseNs); } ValueMetricProducer::~ValueMetricProducer() { VLOG("~ValueMetricProducer() called"); - if (mPullTagId != -1) { - mStatsPullerManager->UnRegisterReceiver(mPullTagId, this); + if (mIsPulled) { + mPullerManager->UnRegisterReceiver(mPullTagId, this); } } @@ -169,6 +181,7 @@ void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) { void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, const bool include_current_partial_bucket, + const bool erase_data, std::set<string> *str_set, ProtoOutputStream* protoOutput) { VLOG("metric %lld dump report now...", (long long)mMetricId); @@ -186,14 +199,14 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, // Fills the dimension path if not slicing by ALL. if (!mSliceByPositionALL) { if (!mDimensionsInWhat.empty()) { - uint64_t dimenPathToken = protoOutput->start( - FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT); + uint64_t dimenPathToken = + protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT); writeDimensionPathToProto(mDimensionsInWhat, protoOutput); protoOutput->end(dimenPathToken); } if (!mDimensionsInCondition.empty()) { - uint64_t dimenPathToken = protoOutput->start( - FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION); + uint64_t dimenPathToken = + protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION); writeDimensionPathToProto(mDimensionsInCondition, protoOutput); protoOutput->end(dimenPathToken); } @@ -210,7 +223,6 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, (long long)(NanoToMillis(pair.second))); protoOutput->end(wrapperToken); } - mSkippedBuckets.clear(); for (const auto& pair : mPastBuckets) { const MetricDimensionKey& dimensionKey = pair.first; @@ -220,15 +232,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, // First fill dimension. if (mSliceByPositionALL) { - uint64_t dimensionToken = protoOutput->start( - FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); + uint64_t dimensionToken = + protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput); protoOutput->end(dimensionToken); if (dimensionKey.hasDimensionKeyInCondition()) { - uint64_t dimensionInConditionToken = protoOutput->start( - FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); - writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), - str_set, protoOutput); + uint64_t dimensionInConditionToken = + protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); + writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set, + protoOutput); protoOutput->end(dimensionInConditionToken); } } else { @@ -236,8 +248,8 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput); if (dimensionKey.hasDimensionKeyInCondition()) { writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(), - FIELD_ID_DIMENSION_LEAF_IN_CONDITION, - str_set, protoOutput); + FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set, + protoOutput); } } @@ -255,24 +267,43 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); } - - protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE, (long long)bucket.mValue); + for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) { + int index = bucket.valueIndex[i]; + const Value& value = bucket.values[i]; + uint64_t valueToken = protoOutput->start( + FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES); + protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, + index); + if (value.getType() == LONG) { + protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, + (long long)value.long_value); + VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs, + (long long)bucket.mBucketEndNs, index, (long long)value.long_value); + } else if (value.getType() == DOUBLE) { + protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, + value.double_value); + VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs, + (long long)bucket.mBucketEndNs, index, value.double_value); + } else { + VLOG("Wrong value type for ValueMetric output: %d", value.getType()); + } + protoOutput->end(valueToken); + } protoOutput->end(bucketInfoToken); - VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, - (long long)bucket.mBucketEndNs, (long long)bucket.mValue); } protoOutput->end(wrapperToken); } protoOutput->end(protoToken); VLOG("metric %lld dump report now...", (long long)mMetricId); - mPastBuckets.clear(); + if (erase_data) { + mPastBuckets.clear(); + mSkippedBuckets.clear(); + } } void ValueMetricProducer::onConditionChangedLocked(const bool condition, const int64_t eventTimeNs) { - mCondition = condition; - if (eventTimeNs < mCurrentBucketStartTimeNs) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, (long long)mCurrentBucketStartTimeNs); @@ -281,44 +312,72 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, flushIfNeededLocked(eventTimeNs); - if (mPullTagId != -1) { - vector<shared_ptr<LogEvent>> allData; - if (mStatsPullerManager->Pull(mPullTagId, eventTimeNs, &allData)) { - if (allData.size() == 0) { - return; + // Pull on condition changes. + if (mIsPulled && (mCondition != condition)) { + pullAndMatchEventsLocked(eventTimeNs); + } + + // when condition change from true to false, clear diff base + if (mUseDiff && mCondition && !condition) { + for (auto& slice : mCurrentSlicedBucket) { + for (auto& interval : slice.second) { + interval.hasBase = false; } - for (const auto& data : allData) { - onMatchedLogEventLocked(0, *data); + } + } + + mCondition = condition; +} + +void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { + vector<std::shared_ptr<LogEvent>> allData; + if (mPullerManager->Pull(mPullTagId, timestampNs, &allData)) { + if (allData.size() == 0) { + return; + } + for (const auto& data : allData) { + if (mEventMatcherWizard->matchLogEvent( + *data, mWhatMatcherIndex) == MatchingState::kMatched) { + onMatchedLogEventLocked(mWhatMatcherIndex, *data); } } - return; } } +int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) { + return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; +} + void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { std::lock_guard<std::mutex> lock(mMutex); - - if (mCondition == true || mConditionTrackerIndex < 0) { + if (mCondition) { if (allData.size() == 0) { + VLOG("Data pulled is empty"); return; } // For scheduled pulled data, the effective event time is snap to the nearest - // bucket boundary to make bucket finalize. + // bucket end. In the case of waking up from a deep sleep state, we will + // attribute to the previous bucket end. If the sleep was long but not very long, we + // will be in the immediate next bucket. Previous bucket may get a larger number as + // we pull at a later time than real bucket end. + // If the sleep was very long, we skip more than one bucket before sleep. In this case, + // if the diff base will be cleared and this new data will serve as new diff base. int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs(); - int64_t eventTime = mTimeBaseNs + - ((realEventTime - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; - - mCondition = false; - for (const auto& data : allData) { - data->setElapsedTimestampNs(eventTime - 1); - onMatchedLogEventLocked(0, *data); + int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1; + if (bucketEndTime < mCurrentBucketStartTimeNs) { + VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime, + (long long)mCurrentBucketStartTimeNs); + return; } - - mCondition = true; for (const auto& data : allData) { - data->setElapsedTimestampNs(eventTime); - onMatchedLogEventLocked(0, *data); + if (mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex) == + MatchingState::kMatched) { + data->setElapsedTimestampNs(bucketEndTime); + onMatchedLogEventLocked(mWhatMatcherIndex, *data); + } } + } else { + VLOG("No need to commit data on condition false."); } } @@ -331,10 +390,12 @@ void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { (unsigned long)mCurrentSlicedBucket.size()); if (verbose) { for (const auto& it : mCurrentSlicedBucket) { - fprintf(out, "\t(what)%s\t(condition)%s (value)%lld\n", - it.first.getDimensionKeyInWhat().toString().c_str(), - it.first.getDimensionKeyInCondition().toString().c_str(), - (unsigned long long)it.second.sum); + for (const auto& interval : it.second) { + fprintf(out, "\t(what)%s\t(condition)%s (value)%s\n", + it.first.getDimensionKeyInWhat().toString().c_str(), + it.first.getDimensionKeyInCondition().toString().c_str(), + interval.value.toString().c_str()); + } } } } @@ -350,8 +411,8 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount); // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. if (newTupleCount > mDimensionHardLimit) { - ALOGE("ValueMetric %lld dropping data for dimension key %s", - (long long)mMetricId, newKey.toString().c_str()); + ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId, + newKey.toString().c_str()); return true; } } @@ -359,10 +420,35 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { return false; } -void ValueMetricProducer::onMatchedLogEventInternalLocked( - const size_t matcherIndex, const MetricDimensionKey& eventKey, - const ConditionKey& conditionKey, bool condition, - const LogEvent& event) { +bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) { + for (const FieldValue& value : event.getValues()) { + if (value.mField.matches(matcher)) { + switch (value.mValue.type) { + case INT: + ret.setLong(value.mValue.int_value); + break; + case LONG: + ret.setLong(value.mValue.long_value); + break; + case FLOAT: + ret.setDouble(value.mValue.float_value); + break; + case DOUBLE: + ret.setDouble(value.mValue.double_value); + break; + default: + break; + } + return true; + } + } + return false; +} + +void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex, + const MetricDimensionKey& eventKey, + const ConditionKey& conditionKey, + bool condition, const LogEvent& event) { int64_t eventTimeNs = event.GetElapsedTimestampNs(); if (eventTimeNs < mCurrentBucketStartTimeNs) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, @@ -372,56 +458,101 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( flushIfNeededLocked(eventTimeNs); - if (hitGuardRailLocked(eventKey)) { + // For pulled data, we already check condition when we decide to pull or + // in onDataPulled. So take all of them. + // For pushed data, just check condition. + if (!(mIsPulled || condition)) { + VLOG("ValueMetric skip event because condition is false"); return; } - Interval& interval = mCurrentSlicedBucket[eventKey]; - int error = 0; - const int64_t value = event.GetLong(mField, &error); - if (error < 0) { + if (hitGuardRailLocked(eventKey)) { return; } + vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey]; + if (multiIntervals.size() < mFieldMatchers.size()) { + VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size()); + multiIntervals.resize(mFieldMatchers.size()); + } - if (mPullTagId != -1) { // for pulled events - if (mCondition == true) { - if (!interval.startUpdated) { - interval.start = value; - interval.startUpdated = true; - } else { - // skip it if there is already value recorded for the start - VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str()); + for (int i = 0; i < (int)mFieldMatchers.size(); i++) { + const Matcher& matcher = mFieldMatchers[i]; + Interval& interval = multiIntervals[i]; + interval.valueIndex = i; + Value value; + if (!getDoubleOrLong(event, matcher, value)) { + VLOG("Failed to get value %d from event %s", i, event.ToString().c_str()); + return; + } + + if (mUseDiff) { + // no base. just update base and return. + if (!interval.hasBase) { + interval.base = value; + interval.hasBase = true; + return; } - } else { - // Generally we expect value to be monotonically increasing. - // If not, take absolute value or drop it, based on config. - if (interval.startUpdated) { - if (value >= interval.start) { - interval.sum += (value - interval.start); - interval.hasValue = true; - } else { - if (mUseAbsoluteValueOnReset) { - interval.sum += value; - interval.hasValue = true; + Value diff; + switch (mValueDirection) { + case ValueMetric::INCREASING: + if (value >= interval.base) { + diff = value - interval.base; + } else if (mUseAbsoluteValueOnReset) { + diff = value; } else { - VLOG("Dropping data for atom %d, prev: %lld, now: %lld", mPullTagId, - (long long)interval.start, (long long)value); + VLOG("Unexpected decreasing value"); + StatsdStats::getInstance().notePullDataError(mPullTagId); + interval.base = value; + return; } - } - interval.startUpdated = false; - } else { - VLOG("No start for matching end %lld", (long long)value); - interval.tainted += 1; + break; + case ValueMetric::DECREASING: + if (interval.base >= value) { + diff = interval.base - value; + } else if (mUseAbsoluteValueOnReset) { + diff = value; + } else { + VLOG("Unexpected increasing value"); + StatsdStats::getInstance().notePullDataError(mPullTagId); + interval.base = value; + return; + } + break; + case ValueMetric::ANY: + diff = value - interval.base; + break; + default: + break; } + interval.base = value; + value = diff; } - } else { // for pushed events, only accumulate when condition is true - if (mCondition == true || mConditionTrackerIndex < 0) { - interval.sum += value; + + if (interval.hasValue) { + switch (mAggregationType) { + case ValueMetric::SUM: + // for AVG, we add up and take average when flushing the bucket + case ValueMetric::AVG: + interval.value += value; + break; + case ValueMetric::MIN: + interval.value = std::min(value, interval.value); + break; + case ValueMetric::MAX: + interval.value = std::max(value, interval.value); + break; + default: + break; + } + } else { + interval.value = value; interval.hasValue = true; } + interval.sampleSize += 1; } - long wholeBucketVal = interval.sum; + // TODO: propgate proper values down stream when anomaly support doubles + long wholeBucketVal = multiIntervals[0].value.long_value; auto prev = mCurrentFullBucket.find(eventKey); if (prev != mCurrentFullBucket.end()) { wholeBucketVal += prev->second; @@ -448,6 +579,12 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { if (numBucketsForward > 1) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); + // take base again in future good bucket. + for (auto& slice : mCurrentSlicedBucket) { + for (auto& interval : slice.second) { + interval.hasBase = false; + } + } } VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, (long long)mCurrentBucketStartTimeNs); @@ -458,37 +595,46 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { (int)mCurrentSlicedBucket.size()); int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); - ValueBucket info; - info.mBucketStartNs = mCurrentBucketStartTimeNs; - if (eventTimeNs < fullBucketEndTimeNs) { - info.mBucketEndNs = eventTimeNs; - } else { - info.mBucketEndNs = fullBucketEndTimeNs; - } + int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs; - if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { + if (bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { // The current bucket is large enough to keep. - int tainted = 0; for (const auto& slice : mCurrentSlicedBucket) { - tainted += slice.second.tainted; - tainted += slice.second.startUpdated; - if (slice.second.hasValue) { - info.mValue = slice.second.sum; - // it will auto create new vector of ValuebucketInfo if the key is not found. + ValueBucket bucket; + bucket.mBucketStartNs = mCurrentBucketStartTimeNs; + bucket.mBucketEndNs = bucketEndTime; + for (const auto& interval : slice.second) { + if (interval.hasValue) { + // skip the output if the diff is zero + if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) { + continue; + } + bucket.valueIndex.push_back(interval.valueIndex); + if (mAggregationType != ValueMetric::AVG) { + bucket.values.push_back(interval.value); + } else { + double sum = interval.value.type == LONG ? (double)interval.value.long_value + : interval.value.double_value; + bucket.values.push_back(Value((double)sum / interval.sampleSize)); + } + } + } + // it will auto create new vector of ValuebucketInfo if the key is not found. + if (bucket.valueIndex.size() > 0) { auto& bucketList = mPastBuckets[slice.first]; - bucketList.push_back(info); + bucketList.push_back(bucket); } } - VLOG("%d tainted pairs in the bucket", tainted); } else { - mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); + mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime); } if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker. // Accumulate partial buckets with current value and then send to anomaly tracker. if (mCurrentFullBucket.size() > 0) { for (const auto& slice : mCurrentSlicedBucket) { - mCurrentFullBucket[slice.first] += slice.second.sum; + // TODO: fix this when anomaly can accept double values + mCurrentFullBucket[slice.first] += slice.second[0].value.long_value; } for (const auto& slice : mCurrentFullBucket) { for (auto& tracker : mAnomalyTrackers) { @@ -503,7 +649,9 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { for (const auto& slice : mCurrentSlicedBucket) { for (auto& tracker : mAnomalyTrackers) { if (tracker != nullptr) { - tracker->addPastBucket(slice.first, slice.second.sum, mCurrentBucketNum); + // TODO: fix this when anomaly can accept double values + tracker->addPastBucket(slice.first, slice.second[0].value.long_value, + mCurrentBucketNum); } } } @@ -511,12 +659,18 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { } else { // Accumulate partial bucket. for (const auto& slice : mCurrentSlicedBucket) { - mCurrentFullBucket[slice.first] += slice.second.sum; + // TODO: fix this when anomaly can accept double values + mCurrentFullBucket[slice.first] += slice.second[0].value.long_value; } } // Reset counters - mCurrentSlicedBucket.clear(); + for (auto& slice : mCurrentSlicedBucket) { + for (auto& interval : slice.second) { + interval.hasValue = false; + interval.sampleSize = 0; + } + } } size_t ValueMetricProducer::byteSizeLocked() const { |