diff options
Diffstat (limited to 'cmds/statsd/src/metrics/ValueMetricProducer.cpp')
-rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.cpp | 540 |
1 files changed, 342 insertions, 198 deletions
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index fa310dc707ec..5987a723a421 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -46,19 +46,22 @@ const int FIELD_ID_VALUE_METRICS = 7; const int FIELD_ID_TIME_BASE = 9; const int FIELD_ID_BUCKET_SIZE = 10; const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11; -const int FIELD_ID_DIMENSION_PATH_IN_CONDITION = 12; const int FIELD_ID_IS_ACTIVE = 14; // for ValueMetricDataWrapper const int FIELD_ID_DATA = 1; const int FIELD_ID_SKIPPED = 2; +// for SkippedBuckets const int FIELD_ID_SKIPPED_START_MILLIS = 3; const int FIELD_ID_SKIPPED_END_MILLIS = 4; +const int FIELD_ID_SKIPPED_DROP_EVENT = 5; +// for DumpEvent Proto +const int FIELD_ID_BUCKET_DROP_REASON = 1; +const int FIELD_ID_DROP_TIME = 2; // for ValueMetricData const int FIELD_ID_DIMENSION_IN_WHAT = 1; -const int FIELD_ID_DIMENSION_IN_CONDITION = 2; const int FIELD_ID_BUCKET_INFO = 3; const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4; -const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5; +const int FIELD_ID_SLICE_BY_STATE = 6; // for ValueBucketInfo const int FIELD_ID_VALUE_INDEX = 1; const int FIELD_ID_VALUE_LONG = 2; @@ -75,10 +78,17 @@ const Value ZERO_DOUBLE((int64_t)0); // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently ValueMetricProducer::ValueMetricProducer( const ConfigKey& key, const ValueMetric& metric, const int conditionIndex, + const vector<ConditionState>& initialConditionCache, const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex, const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs, - const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager) - : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, conditionWizard), + const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager, + const unordered_map<int, shared_ptr<Activation>>& eventActivationMap, + const unordered_map<int, vector<shared_ptr<Activation>>>& eventDeactivationMap, + const vector<int>& slicedStateAtoms, + const unordered_map<int, unordered_map<int, int64_t>>& stateGroupMap) + : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, initialConditionCache, + conditionWizard, eventActivationMap, eventDeactivationMap, slicedStateAtoms, + stateGroupMap), mWhatMatcherIndex(whatMatcherIndex), mEventMatcherWizard(matcherWizard), mPullerManager(pullerManager), @@ -100,11 +110,11 @@ ValueMetricProducer::ValueMetricProducer( mSkipZeroDiffOutput(metric.skip_zero_diff_output()), mUseZeroDefaultBase(metric.use_zero_default_base()), mHasGlobalBase(false), - mCurrentBucketIsInvalid(false), + mCurrentBucketIsSkipped(false), mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC : StatsdStats::kPullMaxDelayNs), mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()), - // Condition timer will be set in prepareFirstBucketLocked. + // Condition timer will be set later within the constructor after pulling events mConditionTimer(false, timeBaseNs) { int64_t bucketSizeMills = 0; if (metric.has_bucket()) { @@ -120,10 +130,7 @@ ValueMetricProducer::ValueMetricProducer( if (metric.has_dimensions_in_what()) { translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat); mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what()); - } - - if (metric.has_dimensions_in_condition()) { - translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition); + mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()); } if (metric.links().size() > 0) { @@ -134,11 +141,16 @@ ValueMetricProducer::ValueMetricProducer( translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields); mMetric2ConditionLinks.push_back(mc); } + mConditionSliced = true; } - mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0); - mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) || - HasPositionALL(metric.dimensions_in_condition()); + for (const auto& stateLink : metric.state_link()) { + Metric2State ms; + ms.stateAtomId = stateLink.state_atom_id(); + translateFieldMatcher(stateLink.fields_in_what(), &ms.metricFields); + translateFieldMatcher(stateLink.fields_in_state(), &ms.stateFields); + mMetric2StateLinks.push_back(ms); + } int64_t numBucketsForward = calcBucketsForwardCount(startTimeNs); mCurrentBucketNum += numBucketsForward; @@ -146,7 +158,7 @@ ValueMetricProducer::ValueMetricProducer( flushIfNeededLocked(startTimeNs); if (mIsPulled) { - mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(), + mPullerManager->RegisterReceiver(mPullTagId, mConfigKey, this, getCurrentBucketEndTimeNs(), mBucketSizeNs); } @@ -155,6 +167,11 @@ ValueMetricProducer::ValueMetricProducer( // Adjust start for partial bucket mCurrentBucketStartTimeNs = startTimeNs; mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs); + + // Now that activations are processed, start the condition timer if needed. + mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue, + mCurrentBucketStartTimeNs); + VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs); } @@ -162,18 +179,48 @@ ValueMetricProducer::ValueMetricProducer( ValueMetricProducer::~ValueMetricProducer() { VLOG("~ValueMetricProducer() called"); if (mIsPulled) { - mPullerManager->UnRegisterReceiver(mPullTagId, this); + mPullerManager->UnRegisterReceiver(mPullTagId, mConfigKey, this); } } -void ValueMetricProducer::prepareFirstBucketLocked() { - // Kicks off the puller immediately if condition is true and diff based. - if (mIsActive && mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) { - pullAndMatchEventsLocked(mCurrentBucketStartTimeNs, mCondition); +void ValueMetricProducer::onStateChanged(int64_t eventTimeNs, int32_t atomId, + const HashableDimensionKey& primaryKey, + const FieldValue& oldState, const FieldValue& newState) { + VLOG("ValueMetric %lld onStateChanged time %lld, State %d, key %s, %d -> %d", + (long long)mMetricId, (long long)eventTimeNs, atomId, primaryKey.toString().c_str(), + oldState.mValue.int_value, newState.mValue.int_value); + + // If old and new states are in the same StateGroup, then we do not need to + // pull for this state change. + FieldValue oldStateCopy = oldState; + FieldValue newStateCopy = newState; + mapStateValue(atomId, &oldStateCopy); + mapStateValue(atomId, &newStateCopy); + if (oldStateCopy == newStateCopy) { + return; } - // Now that activations are processed, start the condition timer if needed. - mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue, - mCurrentBucketStartTimeNs); + + // If condition is not true or metric is not active, we do not need to pull + // for this state change. + if (mCondition != ConditionState::kTrue || !mIsActive) { + return; + } + + bool isEventLate = eventTimeNs < mCurrentBucketStartTimeNs; + if (isEventLate) { + VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, + (long long)mCurrentBucketStartTimeNs); + invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); + return; + } + mStateChangePrimaryKey.first = atomId; + mStateChangePrimaryKey.second = primaryKey; + if (mIsPulled) { + pullAndMatchEventsLocked(eventTimeNs); + } + mStateChangePrimaryKey.first = 0; + mStateChangePrimaryKey.second = DEFAULT_DIMENSION_KEY; + flushIfNeededLocked(eventTimeNs); } void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition, @@ -183,11 +230,9 @@ void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) { StatsdStats::getInstance().noteBucketDropped(mMetricId); - // We are going to flush the data without doing a pull first so we need to invalidte the data. - bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue; - if (pullNeeded) { - invalidateCurrentBucket(); - } + + // The current partial bucket is not flushed and does not require a pull, + // so the data is still valid. flushIfNeededLocked(dropTimeNs); clearPastBucketsLocked(dropTimeNs); } @@ -213,10 +258,10 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, if (pullNeeded) { switch (dumpLatency) { case FAST: - invalidateCurrentBucket(); + invalidateCurrentBucket(dumpTimeNs, BucketDropReason::DUMP_REPORT_REQUESTED); break; case NO_TIME_CONSTRAINTS: - pullAndMatchEventsLocked(dumpTimeNs, mCondition); + pullAndMatchEventsLocked(dumpTimeNs); break; } } @@ -238,23 +283,26 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, writeDimensionPathToProto(mDimensionsInWhat, protoOutput); protoOutput->end(dimenPathToken); } - if (!mDimensionsInCondition.empty()) { - uint64_t dimenPathToken = - protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION); - writeDimensionPathToProto(mDimensionsInCondition, protoOutput); - protoOutput->end(dimenPathToken); - } } uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS); - for (const auto& pair : mSkippedBuckets) { + for (const auto& skippedBucket : mSkippedBuckets) { uint64_t wrapperToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS, - (long long)(NanoToMillis(pair.first))); + (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs))); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS, - (long long)(NanoToMillis(pair.second))); + (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs))); + for (const auto& dropEvent : skippedBucket.dropEvents) { + uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | + FIELD_ID_SKIPPED_DROP_EVENT); + protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason); + protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME, + (long long)(NanoToMillis(dropEvent.dropTimeNs))); + ; + protoOutput->end(dropEventToken); + } protoOutput->end(wrapperToken); } @@ -270,21 +318,17 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput); protoOutput->end(dimensionToken); - if (dimensionKey.hasDimensionKeyInCondition()) { - uint64_t dimensionInConditionToken = - protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); - writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set, - protoOutput); - protoOutput->end(dimensionInConditionToken); - } } else { writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(), FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput); - if (dimensionKey.hasDimensionKeyInCondition()) { - writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(), - FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set, - protoOutput); - } + } + + // Then fill slice_by_state. + for (auto state : dimensionKey.getStateValuesKey().getValues()) { + uint64_t stateToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | + FIELD_ID_SLICE_BY_STATE); + writeStateToProto(state, protoOutput); + protoOutput->end(stateToken); } // Then fill bucket_info (ValueBucketInfo). @@ -306,7 +350,7 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS, (long long)bucket.mConditionTrueNs); } - for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) { + for (int i = 0; i < (int)bucket.valueIndex.size(); i++) { int index = bucket.valueIndex[i]; const Value& value = bucket.values[i]; uint64_t valueToken = protoOutput->start( @@ -341,23 +385,34 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, } } -void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase() { - if (!mCurrentBucketIsInvalid) { - // Only report once per invalid bucket. +void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs, + const BucketDropReason reason) { + if (!mCurrentBucketIsSkipped) { + // Only report to StatsdStats once per invalid bucket. StatsdStats::getInstance().noteInvalidatedBucket(mMetricId); } - mCurrentBucketIsInvalid = true; + + skipCurrentBucket(dropTimeNs, reason); } -void ValueMetricProducer::invalidateCurrentBucket() { - invalidateCurrentBucketWithoutResetBase(); +void ValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs, + const BucketDropReason reason) { + invalidateCurrentBucketWithoutResetBase(dropTimeNs, reason); resetBase(); } +void ValueMetricProducer::skipCurrentBucket(const int64_t dropTimeNs, + const BucketDropReason reason) { + if (!maxDropEventsReached()) { + mCurrentSkippedBucket.dropEvents.emplace_back(buildDropEvent(dropTimeNs, reason)); + } + mCurrentBucketIsSkipped = true; +} + void ValueMetricProducer::resetBase() { - for (auto& slice : mCurrentSlicedBucket) { - for (auto& interval : slice.second) { - interval.hasBase = false; + for (auto& slice : mCurrentBaseInfo) { + for (auto& baseInfo : slice.second) { + baseInfo.hasBase = false; } } mHasGlobalBase = false; @@ -369,9 +424,10 @@ void ValueMetricProducer::resetBase() { // - ConditionTimer tracks changes based on AND of condition and active state. void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) { bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs; - if (ConditionState::kTrue == mCondition && isEventTooLate) { + if (isEventTooLate) { // Drop bucket because event arrived too late, ie. we are missing data for this bucket. - invalidateCurrentBucket(); + StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); + invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); } // Call parent method once we've verified the validity of current bucket. @@ -384,7 +440,7 @@ void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) // Pull on active state changes. if (!isEventTooLate) { if (mIsPulled) { - pullAndMatchEventsLocked(eventTimeNs, mCondition); + pullAndMatchEventsLocked(eventTimeNs); } // When active state changes from true to false, clear diff base but don't // reset other counters as we may accumulate more value in the bucket. @@ -404,65 +460,80 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse; bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs; - if (mIsActive) { - if (isEventTooLate) { - VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, - (long long)mCurrentBucketStartTimeNs); - StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId); - invalidateCurrentBucket(); - } else { - if (mCondition == ConditionState::kUnknown) { - // If the condition was unknown, we mark the bucket as invalid since the bucket will - // contain partial data. For instance, the condition change might happen close to - // the end of the bucket and we might miss lots of data. - // - // We still want to pull to set the base. - invalidateCurrentBucket(); - } + // If the config is not active, skip the event. + if (!mIsActive) { + mCondition = isEventTooLate ? ConditionState::kUnknown : newCondition; + return; + } - // Pull on condition changes. - bool conditionChanged = - (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse) - || (mCondition == ConditionState::kFalse && - newCondition == ConditionState::kTrue); - // We do not need to pull when we go from unknown to false. - // - // We also pull if the condition was already true in order to be able to flush the - // bucket at the end if needed. - // - // onConditionChangedLocked might happen on bucket boundaries if this is called before - // #onDataPulled. - if (mIsPulled && (conditionChanged || condition)) { - pullAndMatchEventsLocked(eventTimeNs, newCondition); - } + // If the event arrived late, mark the bucket as invalid and skip the event. + if (isEventTooLate) { + VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, + (long long)mCurrentBucketStartTimeNs); + StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); + StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId); + invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); + mCondition = ConditionState::kUnknown; + mConditionTimer.onConditionChanged(mCondition, eventTimeNs); + return; + } - // When condition change from true to false, clear diff base but don't - // reset other counters as we may accumulate more value in the bucket. - if (mUseDiff && mCondition == ConditionState::kTrue - && newCondition == ConditionState::kFalse) { - resetBase(); - } - } + // If the previous condition was unknown, mark the bucket as invalid + // because the bucket will contain partial data. For example, the condition + // change might happen close to the end of the bucket and we might miss a + // lot of data. + // + // We still want to pull to set the base. + if (mCondition == ConditionState::kUnknown) { + invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN); } - mCondition = isEventTooLate ? initialCondition(mConditionTrackerIndex) : newCondition; + // Pull and match for the following condition change cases: + // unknown/false -> true - condition changed + // true -> false - condition changed + // true -> true - old condition was true so we can flush the bucket at the + // end if needed. + // + // We don’t need to pull for unknown -> false or false -> false. + // + // onConditionChangedLocked might happen on bucket boundaries if this is + // called before #onDataPulled. + if (mIsPulled && + (newCondition == ConditionState::kTrue || mCondition == ConditionState::kTrue)) { + pullAndMatchEventsLocked(eventTimeNs); + } - if (mIsActive) { - flushIfNeededLocked(eventTimeNs); - mConditionTimer.onConditionChanged(mCondition, eventTimeNs); + // For metrics that use diff, when condition changes from true to false, + // clear diff base but don't reset other counts because we may accumulate + // more value in the bucket. + if (mUseDiff && + (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) { + resetBase(); } + + // Update condition state after pulling. + mCondition = newCondition; + + flushIfNeededLocked(eventTimeNs); + mConditionTimer.onConditionChanged(mCondition, eventTimeNs); } -void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs, - ConditionState condition) { +void ValueMetricProducer::prepareFirstBucketLocked() { + // Kicks off the puller immediately if condition is true and diff based. + if (mIsActive && mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) { + pullAndMatchEventsLocked(mCurrentBucketStartTimeNs); + } +} + +void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { vector<std::shared_ptr<LogEvent>> allData; - if (!mPullerManager->Pull(mPullTagId, &allData)) { + if (!mPullerManager->Pull(mPullTagId, mConfigKey, timestampNs, &allData)) { ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs); - invalidateCurrentBucket(); + invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED); return; } - accumulateEvents(allData, timestampNs, timestampNs, condition); + accumulateEvents(allData, timestampNs, timestampNs); } int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) { @@ -475,33 +546,33 @@ int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTime void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, bool pullSuccess, int64_t originalPullTimeNs) { std::lock_guard<std::mutex> lock(mMutex); - if (mCondition == ConditionState::kTrue) { - // If the pull failed, we won't be able to compute a diff. - if (!pullSuccess) { - invalidateCurrentBucket(); + if (mCondition == ConditionState::kTrue) { + // If the pull failed, we won't be able to compute a diff. + if (!pullSuccess) { + invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED); + } else { + bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs(); + if (isEventLate) { + // If the event is late, we are in the middle of a bucket. Just + // process the data without trying to snap the data to the nearest bucket. + accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs); } else { - bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs(); - if (isEventLate) { - // If the event is late, we are in the middle of a bucket. Just - // process the data without trying to snap the data to the nearest bucket. - accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs, mCondition); - } else { - // For scheduled pulled data, the effective event time is snap to the nearest - // bucket end. In the case of waking up from a deep sleep state, we will - // attribute to the previous bucket end. If the sleep was long but not very - // long, we will be in the immediate next bucket. Previous bucket may get a - // larger number as we pull at a later time than real bucket end. - // - // If the sleep was very long, we skip more than one bucket before sleep. In - // this case, if the diff base will be cleared and this new data will serve as - // new diff base. - int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1; - StatsdStats::getInstance().noteBucketBoundaryDelayNs( - mMetricId, originalPullTimeNs - bucketEndTime); - accumulateEvents(allData, originalPullTimeNs, bucketEndTime, mCondition); - } + // For scheduled pulled data, the effective event time is snap to the nearest + // bucket end. In the case of waking up from a deep sleep state, we will + // attribute to the previous bucket end. If the sleep was long but not very + // long, we will be in the immediate next bucket. Previous bucket may get a + // larger number as we pull at a later time than real bucket end. + // + // If the sleep was very long, we skip more than one bucket before sleep. In + // this case, if the diff base will be cleared and this new data will serve as + // new diff base. + int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1; + StatsdStats::getInstance().noteBucketBoundaryDelayNs( + mMetricId, originalPullTimeNs - bucketEndTime); + accumulateEvents(allData, originalPullTimeNs, bucketEndTime); } } + } // We can probably flush the bucket. Since we used bucketEndTime when calling // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed. @@ -509,18 +580,18 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven } void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, - int64_t originalPullTimeNs, int64_t eventElapsedTimeNs, - ConditionState condition) { + int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) { bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs; if (isEventLate) { VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs); StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); - invalidateCurrentBucket(); + invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); return; } - const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs; + const int64_t elapsedRealtimeNs = getElapsedRealtimeNs(); + const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs; StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); if (pullDelayNs > mMaxPullDelayNs) { ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId, @@ -528,15 +599,10 @@ void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<Log StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId); // We are missing one pull from the bucket which means we will not have a complete view of // what's going on. - invalidateCurrentBucket(); + invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED); return; } - if (allData.size() == 0) { - VLOG("Data pulled is empty"); - StatsdStats::getInstance().noteEmptyData(mPullTagId); - } - mMatchedMetricDimensionKeys.clear(); for (const auto& data : allData) { LogEvent localCopy = data->makeCopy(); @@ -546,14 +612,19 @@ void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<Log onMatchedLogEventLocked(mWhatMatcherIndex, localCopy); } } - // If the new pulled data does not contains some keys we track in our intervals, we need to - // reset the base. + // If a key that is: + // 1. Tracked in mCurrentSlicedBucket and + // 2. A superset of the current mStateChangePrimaryKey + // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys) + // then we need to reset the base. for (auto& slice : mCurrentSlicedBucket) { - bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first) - != mMatchedMetricDimensionKeys.end(); - if (!presentInPulledData) { - for (auto& interval : slice.second) { - interval.hasBase = false; + const auto& whatKey = slice.first.getDimensionKeyInWhat(); + bool presentInPulledData = + mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end(); + if (!presentInPulledData && whatKey.contains(mStateChangePrimaryKey.second)) { + auto it = mCurrentBaseInfo.find(whatKey); + for (auto& baseInfo : it->second) { + baseInfo.hasBase = false; } } } @@ -567,7 +638,7 @@ void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<Log // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key // might be missing from mCurrentSlicedBucket. if (hasReachedGuardRailLimit()) { - invalidateCurrentBucket(); + invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED); mCurrentSlicedBucket.clear(); } } @@ -582,10 +653,10 @@ void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { if (verbose) { for (const auto& it : mCurrentSlicedBucket) { for (const auto& interval : it.second) { - fprintf(out, "\t(what)%s\t(condition)%s (value)%s\n", - it.first.getDimensionKeyInWhat().toString().c_str(), - it.first.getDimensionKeyInCondition().toString().c_str(), - interval.value.toString().c_str()); + fprintf(out, "\t(what)%s\t(states)%s (value)%s\n", + it.first.getDimensionKeyInWhat().toString().c_str(), + it.first.getStateValuesKey().toString().c_str(), + interval.value.toString().c_str()); } } } @@ -653,6 +724,7 @@ bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) ret.setDouble(value.mValue.double_value); break; default: + return false; break; } return true; @@ -661,17 +733,30 @@ bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) return false; } -void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex, - const MetricDimensionKey& eventKey, - const ConditionKey& conditionKey, - bool condition, const LogEvent& event) { +void ValueMetricProducer::onMatchedLogEventInternalLocked( + const size_t matcherIndex, const MetricDimensionKey& eventKey, + const ConditionKey& conditionKey, bool condition, const LogEvent& event, + const map<int, HashableDimensionKey>& statePrimaryKeys) { + auto whatKey = eventKey.getDimensionKeyInWhat(); + auto stateKey = eventKey.getStateValuesKey(); + + // Skip this event if a state changed occurred for a different primary key. + auto it = statePrimaryKeys.find(mStateChangePrimaryKey.first); + // Check that both the atom id and the primary key are equal. + if (it != statePrimaryKeys.end() && it->second != mStateChangePrimaryKey.second) { + VLOG("ValueMetric skip event with primary key %s because state change primary key " + "is %s", + it->second.toString().c_str(), mStateChangePrimaryKey.second.toString().c_str()); + return; + } + int64_t eventTimeNs = event.GetElapsedTimestampNs(); if (eventTimeNs < mCurrentBucketStartTimeNs) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, (long long)mCurrentBucketStartTimeNs); return; } - mMatchedMetricDimensionKeys.insert(eventKey); + mMatchedMetricDimensionKeys.insert(whatKey); if (!mIsPulled) { // We cannot flush without doing a pull first. @@ -689,17 +774,35 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn bool shouldSkipForPulledMetric = mIsPulled && !mUseDiff && mCondition != ConditionState::kTrue; if (shouldSkipForPushMetric || shouldSkipForPulledMetric) { - VLOG("ValueMetric skip event because condition is false"); + VLOG("ValueMetric skip event because condition is false and we are not using diff (for " + "pulled metric)"); return; } if (hitGuardRailLocked(eventKey)) { return; } - vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey]; - if (multiIntervals.size() < mFieldMatchers.size()) { + + vector<BaseInfo>& baseInfos = mCurrentBaseInfo[whatKey]; + if (baseInfos.size() < mFieldMatchers.size()) { + VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size()); + baseInfos.resize(mFieldMatchers.size()); + } + + for (BaseInfo& baseInfo : baseInfos) { + if (!baseInfo.hasCurrentState) { + baseInfo.currentState = getUnknownStateKey(); + baseInfo.hasCurrentState = true; + } + } + + // We need to get the intervals stored with the previous state key so we can + // close these value intervals. + const auto oldStateKey = baseInfos[0].currentState; + vector<Interval>& intervals = mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)]; + if (intervals.size() < mFieldMatchers.size()) { VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size()); - multiIntervals.resize(mFieldMatchers.size()); + intervals.resize(mFieldMatchers.size()); } // We only use anomaly detection under certain cases. @@ -712,9 +815,12 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn for (int i = 0; i < (int)mFieldMatchers.size(); i++) { const Matcher& matcher = mFieldMatchers[i]; - Interval& interval = multiIntervals[i]; + BaseInfo& baseInfo = baseInfos[i]; + Interval& interval = intervals[i]; interval.valueIndex = i; Value value; + baseInfo.hasCurrentState = true; + baseInfo.currentState = stateKey; if (!getDoubleOrLong(event, matcher, value)) { VLOG("Failed to get value %d from event %s", i, event.ToString().c_str()); StatsdStats::getInstance().noteBadValueType(mMetricId); @@ -723,60 +829,61 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn interval.seenNewData = true; if (mUseDiff) { - if (!interval.hasBase) { + if (!baseInfo.hasBase) { if (mHasGlobalBase && mUseZeroDefaultBase) { // The bucket has global base. This key does not. // Optionally use zero as base. - interval.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE); - interval.hasBase = true; + baseInfo.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE); + baseInfo.hasBase = true; } else { // no base. just update base and return. - interval.base = value; - interval.hasBase = true; + baseInfo.base = value; + baseInfo.hasBase = true; // If we're missing a base, do not use anomaly detection on incomplete data useAnomalyDetection = false; - // Continue (instead of return) here in order to set interval.base and - // interval.hasBase for other intervals + // Continue (instead of return) here in order to set baseInfo.base and + // baseInfo.hasBase for other baseInfos continue; } } + Value diff; switch (mValueDirection) { case ValueMetric::INCREASING: - if (value >= interval.base) { - diff = value - interval.base; + if (value >= baseInfo.base) { + diff = value - baseInfo.base; } else if (mUseAbsoluteValueOnReset) { diff = value; } else { VLOG("Unexpected decreasing value"); StatsdStats::getInstance().notePullDataError(mPullTagId); - interval.base = value; + baseInfo.base = value; // If we've got bad data, do not use anomaly detection useAnomalyDetection = false; continue; } break; case ValueMetric::DECREASING: - if (interval.base >= value) { - diff = interval.base - value; + if (baseInfo.base >= value) { + diff = baseInfo.base - value; } else if (mUseAbsoluteValueOnReset) { diff = value; } else { VLOG("Unexpected increasing value"); StatsdStats::getInstance().notePullDataError(mPullTagId); - interval.base = value; + baseInfo.base = value; // If we've got bad data, do not use anomaly detection useAnomalyDetection = false; continue; } break; case ValueMetric::ANY: - diff = value - interval.base; + diff = value - baseInfo.base; break; default: break; } - interval.base = value; + baseInfo.base = value; value = diff; } @@ -806,7 +913,7 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn // Only trigger the tracker if all intervals are correct if (useAnomalyDetection) { // TODO: propgate proper values down stream when anomaly support doubles - long wholeBucketVal = multiIntervals[0].value.long_value; + long wholeBucketVal = intervals[0].value.long_value; auto prev = mCurrentFullBucket.find(eventKey); if (prev != mCurrentFullBucket.end()) { wholeBucketVal += prev->second; @@ -823,7 +930,7 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); if (eventTimeNs < currentBucketEndTimeNs) { - VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs, + VLOG("eventTime is %lld, less than current bucket end time %lld", (long long)eventTimeNs, (long long)(currentBucketEndTimeNs)); return; } @@ -844,25 +951,39 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, const int64_t& nextBucketStartTimeNs) { if (mCondition == ConditionState::kUnknown) { StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId); + invalidateCurrentBucketWithoutResetBase(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN); } + VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, + (int)mCurrentSlicedBucket.size()); + + int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); + int64_t bucketEndTime = fullBucketEndTimeNs; int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); - if (numBucketsForward > 1) { + + // Skip buckets if this is a pulled metric or a pushed metric that is diffed. + if (numBucketsForward > 1 && (mIsPulled || mUseDiff)) { + VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId); // Something went wrong. Maybe the device was sleeping for a long time. It is better // to mark the current bucket as invalid. The last pull might have been successful through. - invalidateCurrentBucketWithoutResetBase(); + invalidateCurrentBucketWithoutResetBase(eventTimeNs, + BucketDropReason::MULTIPLE_BUCKETS_SKIPPED); + // End the bucket at the next bucket start time so the entire interval is skipped. + bucketEndTime = nextBucketStartTimeNs; + } else if (eventTimeNs < fullBucketEndTimeNs) { + bucketEndTime = eventTimeNs; } - VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, - (int)mCurrentSlicedBucket.size()); - int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); - int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs; // Close the current bucket. int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime); bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs; - if (isBucketLargeEnough && !mCurrentBucketIsInvalid) { + if (!isBucketLargeEnough) { + skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL); + } + if (!mCurrentBucketIsSkipped) { + bool bucketHasData = false; // The current bucket is large enough to keep. for (const auto& slice : mCurrentSlicedBucket) { ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second); @@ -871,13 +992,34 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, if (bucket.valueIndex.size() > 0) { auto& bucketList = mPastBuckets[slice.first]; bucketList.push_back(bucket); + bucketHasData = true; } } - } else { - mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime); + if (!bucketHasData) { + skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA); + } + } + + if (mCurrentBucketIsSkipped) { + mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs; + mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime; + mSkippedBuckets.emplace_back(mCurrentSkippedBucket); } - appendToFullBucket(eventTimeNs, fullBucketEndTimeNs); + // This means that the current bucket was not flushed before a forced bucket split. + // This can happen if an app update or a dump report with include_current_partial_bucket is + // requested before we get a chance to flush the bucket due to receiving new data, either from + // the statsd socket or the StatsPullerManager. + if (bucketEndTime < nextBucketStartTimeNs) { + SkippedBucket bucketInGap; + bucketInGap.bucketStartTimeNs = bucketEndTime; + bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs; + bucketInGap.dropEvents.emplace_back( + buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA)); + mSkippedBuckets.emplace_back(bucketInGap); + } + + appendToFullBucket(eventTimeNs > fullBucketEndTimeNs); initCurrentSlicedBucket(nextBucketStartTimeNs); // Update the condition timer again, in case we skipped buckets. mConditionTimer.newBucketStart(nextBucketStartTimeNs); @@ -927,22 +1069,24 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) } else { it++; } + // TODO(b/157655103): remove mCurrentBaseInfo entries when obsolete } - mCurrentBucketIsInvalid = false; + mCurrentBucketIsSkipped = false; + mCurrentSkippedBucket.reset(); + // If we do not have a global base when the condition is true, // we will have incomplete bucket for the next bucket. if (mUseDiff && !mHasGlobalBase && mCondition) { - mCurrentBucketIsInvalid = false; + mCurrentBucketIsSkipped = false; } mCurrentBucketStartTimeNs = nextBucketStartTimeNs; VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, (long long)mCurrentBucketStartTimeNs); } -void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) { - bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs; - if (mCurrentBucketIsInvalid) { +void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { + if (mCurrentBucketIsSkipped) { if (isFullBucketReached) { // If the bucket is invalid, we ignore the full bucket since it contains invalid data. mCurrentFullBucket.clear(); |