diff options
Diffstat (limited to 'cmds/statsd/src/metrics/GaugeMetricProducer.cpp')
-rw-r--r-- | cmds/statsd/src/metrics/GaugeMetricProducer.cpp | 129 |
1 files changed, 66 insertions, 63 deletions
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp index 4f437d1af51a..020f4b638f4d 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp @@ -46,19 +46,21 @@ const int FIELD_ID_GAUGE_METRICS = 8; const int FIELD_ID_TIME_BASE = 9; const int FIELD_ID_BUCKET_SIZE = 10; const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11; -const int FIELD_ID_DIMENSION_PATH_IN_CONDITION = 12; const int FIELD_ID_IS_ACTIVE = 14; // for GaugeMetricDataWrapper const int FIELD_ID_DATA = 1; const int FIELD_ID_SKIPPED = 2; +// for SkippedBuckets const int FIELD_ID_SKIPPED_START_MILLIS = 3; const int FIELD_ID_SKIPPED_END_MILLIS = 4; +const int FIELD_ID_SKIPPED_DROP_EVENT = 5; +// for DumpEvent Proto +const int FIELD_ID_BUCKET_DROP_REASON = 1; +const int FIELD_ID_DROP_TIME = 2; // for GaugeMetricData const int FIELD_ID_DIMENSION_IN_WHAT = 1; -const int FIELD_ID_DIMENSION_IN_CONDITION = 2; const int FIELD_ID_BUCKET_INFO = 3; const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4; -const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5; // for GaugeBucketInfo const int FIELD_ID_ATOM = 3; const int FIELD_ID_ELAPSED_ATOM_TIMESTAMP = 4; @@ -68,11 +70,15 @@ const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 8; GaugeMetricProducer::GaugeMetricProducer( const ConfigKey& key, const GaugeMetric& metric, const int conditionIndex, - const sp<ConditionWizard>& wizard, const int whatMatcherIndex, - const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int triggerAtomId, - const int atomId, const int64_t timeBaseNs, const int64_t startTimeNs, - const sp<StatsPullerManager>& pullerManager) - : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard), + const vector<ConditionState>& initialConditionCache, const sp<ConditionWizard>& wizard, + const int whatMatcherIndex, const sp<EventMatcherWizard>& matcherWizard, + const int pullTagId, const int triggerAtomId, const int atomId, const int64_t timeBaseNs, + const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager, + const unordered_map<int, shared_ptr<Activation>>& eventActivationMap, + const unordered_map<int, vector<shared_ptr<Activation>>>& eventDeactivationMap) + : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, initialConditionCache, wizard, + eventActivationMap, eventDeactivationMap, /*slicedStateAtoms=*/{}, + /*stateGroupMap=*/{}), mWhatMatcherIndex(whatMatcherIndex), mEventMatcherWizard(matcherWizard), mPullerManager(pullerManager), @@ -113,10 +119,6 @@ GaugeMetricProducer::GaugeMetricProducer( mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what()); } - if (metric.has_dimensions_in_condition()) { - translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition); - } - if (metric.links().size() > 0) { for (const auto& link : metric.links()) { Metric2Condition mc; @@ -125,19 +127,18 @@ GaugeMetricProducer::GaugeMetricProducer( translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields); mMetric2ConditionLinks.push_back(mc); } + mConditionSliced = true; } - mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0); - mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) || - HasPositionALL(metric.dimensions_in_condition()); + mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()); flushIfNeededLocked(startTimeNs); // Kicks off the puller immediately. if (mIsPulled && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) { - mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(), + mPullerManager->RegisterReceiver(mPullTagId, mConfigKey, this, getCurrentBucketEndTimeNs(), mBucketSizeNs); } - // Adjust start for partial bucket + // Adjust start for partial first bucket and then pull if needed mCurrentBucketStartTimeNs = startTimeNs; VLOG("Gauge metric %lld created. bucket size %lld start_time: %lld sliced %d", @@ -148,7 +149,7 @@ GaugeMetricProducer::GaugeMetricProducer( GaugeMetricProducer::~GaugeMetricProducer() { VLOG("~GaugeMetricProducer() called"); if (mIsPulled && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) { - mPullerManager->UnRegisterReceiver(mPullTagId, this); + mPullerManager->UnRegisterReceiver(mPullTagId, mConfigKey, this); } } @@ -162,10 +163,9 @@ void GaugeMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { (unsigned long)mCurrentSlicedBucket->size()); if (verbose) { for (const auto& it : *mCurrentSlicedBucket) { - fprintf(out, "\t(what)%s\t(condition)%s %d atoms\n", - it.first.getDimensionKeyInWhat().toString().c_str(), - it.first.getDimensionKeyInCondition().toString().c_str(), - (int)it.second.size()); + fprintf(out, "\t(what)%s\t(states)%s %d atoms\n", + it.first.getDimensionKeyInWhat().toString().c_str(), + it.first.getStateValuesKey().toString().c_str(), (int)it.second.size()); } } } @@ -192,7 +192,7 @@ void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked()); - if (mPastBuckets.empty()) { + if (mPastBuckets.empty() && mSkippedBuckets.empty()) { return; } @@ -207,23 +207,25 @@ void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, writeDimensionPathToProto(mDimensionsInWhat, protoOutput); protoOutput->end(dimenPathToken); } - if (!mDimensionsInCondition.empty()) { - uint64_t dimenPathToken = protoOutput->start( - FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION); - writeDimensionPathToProto(mDimensionsInCondition, protoOutput); - protoOutput->end(dimenPathToken); - } } uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS); - for (const auto& pair : mSkippedBuckets) { + for (const auto& skippedBucket : mSkippedBuckets) { uint64_t wrapperToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS, - (long long)(NanoToMillis(pair.first))); + (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs))); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS, - (long long)(NanoToMillis(pair.second))); + (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs))); + + for (const auto& dropEvent : skippedBucket.dropEvents) { + uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | + FIELD_ID_SKIPPED_DROP_EVENT); + protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason); + protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME, (long long) (NanoToMillis(dropEvent.dropTimeNs))); + protoOutput->end(dropEventToken); + } protoOutput->end(wrapperToken); } @@ -240,22 +242,9 @@ void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput); protoOutput->end(dimensionToken); - - if (dimensionKey.hasDimensionKeyInCondition()) { - uint64_t dimensionInConditionToken = protoOutput->start( - FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); - writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), - str_set, protoOutput); - protoOutput->end(dimensionInConditionToken); - } } else { writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(), FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput); - if (dimensionKey.hasDimensionKeyInCondition()) { - writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(), - FIELD_ID_DIMENSION_LEAF_IN_CONDITION, - str_set, protoOutput); - } } // Then fill bucket_info (GaugeBucketInfo). @@ -282,11 +271,9 @@ void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->end(atomsToken); } for (const auto& atom : bucket.mGaugeAtoms) { - const int64_t elapsedTimestampNs = - truncateTimestampIfNecessary(mAtomId, atom.mElapsedTimestamps); - protoOutput->write( - FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_ELAPSED_ATOM_TIMESTAMP, - (long long)elapsedTimestampNs); + protoOutput->write(FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | + FIELD_ID_ELAPSED_ATOM_TIMESTAMP, + (long long)atom.mElapsedTimestampNs); } } protoOutput->end(bucketInfoToken); @@ -335,18 +322,17 @@ void GaugeMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { return; } vector<std::shared_ptr<LogEvent>> allData; - if (!mPullerManager->Pull(mPullTagId, &allData)) { + if (!mPullerManager->Pull(mPullTagId, mConfigKey, timestampNs, &allData)) { ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs); return; } const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs; + StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); if (pullDelayNs > mMaxPullDelayNs) { ALOGE("Pull finish too late for atom %d", mPullTagId); StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId); - StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); return; } - StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); for (const auto& data : allData) { LogEvent localCopy = data->makeCopy(); localCopy.setElapsedTimestampNs(timestampNs); @@ -429,6 +415,13 @@ void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven if (!pullSuccess || allData.size() == 0) { return; } + const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs; + StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); + if (pullDelayNs > mMaxPullDelayNs) { + ALOGE("Pull finish too late for atom %d", mPullTagId); + StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId); + return; + } for (const auto& data : allData) { if (mEventMatcherWizard->matchLogEvent( *data, mWhatMatcherIndex) == MatchingState::kMatched) { @@ -449,6 +442,7 @@ bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { if (newTupleCount > mDimensionHardLimit) { ALOGE("GaugeMetric %lld dropping data for dimension key %s", (long long)mMetricId, newKey.toString().c_str()); + StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId); return true; } } @@ -458,8 +452,8 @@ bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { void GaugeMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const MetricDimensionKey& eventKey, - const ConditionKey& conditionKey, bool condition, - const LogEvent& event) { + const ConditionKey& conditionKey, bool condition, const LogEvent& event, + const map<int, HashableDimensionKey>& statePrimaryKeys) { if (condition == false) { return; } @@ -488,7 +482,9 @@ void GaugeMetricProducer::onMatchedLogEventInternalLocked( if ((*mCurrentSlicedBucket)[eventKey].size() >= mGaugeAtomsPerDimensionLimit) { return; } - GaugeAtom gaugeAtom(getGaugeFields(event), eventTimeNs); + + const int64_t truncatedElapsedTimestampNs = truncateTimestampIfNecessary(event); + GaugeAtom gaugeAtom(getGaugeFields(event), truncatedElapsedTimestampNs); (*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom); // Anomaly detection on gauge metric only works when there is one numeric // field specified. @@ -558,16 +554,16 @@ void GaugeMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, const int64_t& nextBucketStartTimeNs) { int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); + int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs; GaugeBucket info; info.mBucketStartNs = mCurrentBucketStartTimeNs; - if (eventTimeNs < fullBucketEndTimeNs) { - info.mBucketEndNs = eventTimeNs; - } else { - info.mBucketEndNs = fullBucketEndTimeNs; - } + info.mBucketEndNs = bucketEndTime; - if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { + // Add bucket to mPastBuckets if bucket is large enough. + // Otherwise, drop the bucket data and add bucket metadata to mSkippedBuckets. + bool isBucketLargeEnough = info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs; + if (isBucketLargeEnough) { for (const auto& slice : *mCurrentSlicedBucket) { info.mGaugeAtoms = slice.second; auto& bucketList = mPastBuckets[slice.first]; @@ -576,7 +572,13 @@ void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, slice.first.toString().c_str()); } } else { - mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); + mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs; + mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime; + if (!maxDropEventsReached()) { + mCurrentSkippedBucket.dropEvents.emplace_back( + buildDropEvent(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL)); + } + mSkippedBuckets.emplace_back(mCurrentSkippedBucket); } // If we have anomaly trackers, we need to update the partial bucket values. @@ -595,6 +597,7 @@ void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, StatsdStats::getInstance().noteBucketCount(mMetricId); mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>(); mCurrentBucketStartTimeNs = nextBucketStartTimeNs; + mCurrentSkippedBucket.reset(); } size_t GaugeMetricProducer::byteSizeLocked() const { |