summaryrefslogtreecommitdiff
path: root/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cmds/statsd/src/metrics/GaugeMetricProducer.cpp')
-rw-r--r--cmds/statsd/src/metrics/GaugeMetricProducer.cpp129
1 files changed, 66 insertions, 63 deletions
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 4f437d1af51a..020f4b638f4d 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -46,19 +46,21 @@ const int FIELD_ID_GAUGE_METRICS = 8;
const int FIELD_ID_TIME_BASE = 9;
const int FIELD_ID_BUCKET_SIZE = 10;
const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
-const int FIELD_ID_DIMENSION_PATH_IN_CONDITION = 12;
const int FIELD_ID_IS_ACTIVE = 14;
// for GaugeMetricDataWrapper
const int FIELD_ID_DATA = 1;
const int FIELD_ID_SKIPPED = 2;
+// for SkippedBuckets
const int FIELD_ID_SKIPPED_START_MILLIS = 3;
const int FIELD_ID_SKIPPED_END_MILLIS = 4;
+const int FIELD_ID_SKIPPED_DROP_EVENT = 5;
+// for DumpEvent Proto
+const int FIELD_ID_BUCKET_DROP_REASON = 1;
+const int FIELD_ID_DROP_TIME = 2;
// for GaugeMetricData
const int FIELD_ID_DIMENSION_IN_WHAT = 1;
-const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
const int FIELD_ID_BUCKET_INFO = 3;
const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
-const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5;
// for GaugeBucketInfo
const int FIELD_ID_ATOM = 3;
const int FIELD_ID_ELAPSED_ATOM_TIMESTAMP = 4;
@@ -68,11 +70,15 @@ const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 8;
GaugeMetricProducer::GaugeMetricProducer(
const ConfigKey& key, const GaugeMetric& metric, const int conditionIndex,
- const sp<ConditionWizard>& wizard, const int whatMatcherIndex,
- const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int triggerAtomId,
- const int atomId, const int64_t timeBaseNs, const int64_t startTimeNs,
- const sp<StatsPullerManager>& pullerManager)
- : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
+ const vector<ConditionState>& initialConditionCache, const sp<ConditionWizard>& wizard,
+ const int whatMatcherIndex, const sp<EventMatcherWizard>& matcherWizard,
+ const int pullTagId, const int triggerAtomId, const int atomId, const int64_t timeBaseNs,
+ const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager,
+ const unordered_map<int, shared_ptr<Activation>>& eventActivationMap,
+ const unordered_map<int, vector<shared_ptr<Activation>>>& eventDeactivationMap)
+ : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, initialConditionCache, wizard,
+ eventActivationMap, eventDeactivationMap, /*slicedStateAtoms=*/{},
+ /*stateGroupMap=*/{}),
mWhatMatcherIndex(whatMatcherIndex),
mEventMatcherWizard(matcherWizard),
mPullerManager(pullerManager),
@@ -113,10 +119,6 @@ GaugeMetricProducer::GaugeMetricProducer(
mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
}
- if (metric.has_dimensions_in_condition()) {
- translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition);
- }
-
if (metric.links().size() > 0) {
for (const auto& link : metric.links()) {
Metric2Condition mc;
@@ -125,19 +127,18 @@ GaugeMetricProducer::GaugeMetricProducer(
translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
mMetric2ConditionLinks.push_back(mc);
}
+ mConditionSliced = true;
}
- mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
- mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
- HasPositionALL(metric.dimensions_in_condition());
+ mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what());
flushIfNeededLocked(startTimeNs);
// Kicks off the puller immediately.
if (mIsPulled && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
- mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(),
+ mPullerManager->RegisterReceiver(mPullTagId, mConfigKey, this, getCurrentBucketEndTimeNs(),
mBucketSizeNs);
}
- // Adjust start for partial bucket
+ // Adjust start for partial first bucket and then pull if needed
mCurrentBucketStartTimeNs = startTimeNs;
VLOG("Gauge metric %lld created. bucket size %lld start_time: %lld sliced %d",
@@ -148,7 +149,7 @@ GaugeMetricProducer::GaugeMetricProducer(
GaugeMetricProducer::~GaugeMetricProducer() {
VLOG("~GaugeMetricProducer() called");
if (mIsPulled && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
- mPullerManager->UnRegisterReceiver(mPullTagId, this);
+ mPullerManager->UnRegisterReceiver(mPullTagId, mConfigKey, this);
}
}
@@ -162,10 +163,9 @@ void GaugeMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
(unsigned long)mCurrentSlicedBucket->size());
if (verbose) {
for (const auto& it : *mCurrentSlicedBucket) {
- fprintf(out, "\t(what)%s\t(condition)%s %d atoms\n",
- it.first.getDimensionKeyInWhat().toString().c_str(),
- it.first.getDimensionKeyInCondition().toString().c_str(),
- (int)it.second.size());
+ fprintf(out, "\t(what)%s\t(states)%s %d atoms\n",
+ it.first.getDimensionKeyInWhat().toString().c_str(),
+ it.first.getStateValuesKey().toString().c_str(), (int)it.second.size());
}
}
}
@@ -192,7 +192,7 @@ void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
- if (mPastBuckets.empty()) {
+ if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
return;
}
@@ -207,23 +207,25 @@ void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
protoOutput->end(dimenPathToken);
}
- if (!mDimensionsInCondition.empty()) {
- uint64_t dimenPathToken = protoOutput->start(
- FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION);
- writeDimensionPathToProto(mDimensionsInCondition, protoOutput);
- protoOutput->end(dimenPathToken);
- }
}
uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
- for (const auto& pair : mSkippedBuckets) {
+ for (const auto& skippedBucket : mSkippedBuckets) {
uint64_t wrapperToken =
protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
- (long long)(NanoToMillis(pair.first)));
+ (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs)));
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
- (long long)(NanoToMillis(pair.second)));
+ (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs)));
+
+ for (const auto& dropEvent : skippedBucket.dropEvents) {
+ uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
+ FIELD_ID_SKIPPED_DROP_EVENT);
+ protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME, (long long) (NanoToMillis(dropEvent.dropTimeNs)));
+ protoOutput->end(dropEventToken);
+ }
protoOutput->end(wrapperToken);
}
@@ -240,22 +242,9 @@ void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput);
protoOutput->end(dimensionToken);
-
- if (dimensionKey.hasDimensionKeyInCondition()) {
- uint64_t dimensionInConditionToken = protoOutput->start(
- FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
- writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(),
- str_set, protoOutput);
- protoOutput->end(dimensionInConditionToken);
- }
} else {
writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(),
FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
- if (dimensionKey.hasDimensionKeyInCondition()) {
- writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(),
- FIELD_ID_DIMENSION_LEAF_IN_CONDITION,
- str_set, protoOutput);
- }
}
// Then fill bucket_info (GaugeBucketInfo).
@@ -282,11 +271,9 @@ void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
protoOutput->end(atomsToken);
}
for (const auto& atom : bucket.mGaugeAtoms) {
- const int64_t elapsedTimestampNs =
- truncateTimestampIfNecessary(mAtomId, atom.mElapsedTimestamps);
- protoOutput->write(
- FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_ELAPSED_ATOM_TIMESTAMP,
- (long long)elapsedTimestampNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED |
+ FIELD_ID_ELAPSED_ATOM_TIMESTAMP,
+ (long long)atom.mElapsedTimestampNs);
}
}
protoOutput->end(bucketInfoToken);
@@ -335,18 +322,17 @@ void GaugeMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
return;
}
vector<std::shared_ptr<LogEvent>> allData;
- if (!mPullerManager->Pull(mPullTagId, &allData)) {
+ if (!mPullerManager->Pull(mPullTagId, mConfigKey, timestampNs, &allData)) {
ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
return;
}
const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
if (pullDelayNs > mMaxPullDelayNs) {
ALOGE("Pull finish too late for atom %d", mPullTagId);
StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
- StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
return;
}
- StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
for (const auto& data : allData) {
LogEvent localCopy = data->makeCopy();
localCopy.setElapsedTimestampNs(timestampNs);
@@ -429,6 +415,13 @@ void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
if (!pullSuccess || allData.size() == 0) {
return;
}
+ const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs;
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+ if (pullDelayNs > mMaxPullDelayNs) {
+ ALOGE("Pull finish too late for atom %d", mPullTagId);
+ StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
+ return;
+ }
for (const auto& data : allData) {
if (mEventMatcherWizard->matchLogEvent(
*data, mWhatMatcherIndex) == MatchingState::kMatched) {
@@ -449,6 +442,7 @@ bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
if (newTupleCount > mDimensionHardLimit) {
ALOGE("GaugeMetric %lld dropping data for dimension key %s",
(long long)mMetricId, newKey.toString().c_str());
+ StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId);
return true;
}
}
@@ -458,8 +452,8 @@ bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
void GaugeMetricProducer::onMatchedLogEventInternalLocked(
const size_t matcherIndex, const MetricDimensionKey& eventKey,
- const ConditionKey& conditionKey, bool condition,
- const LogEvent& event) {
+ const ConditionKey& conditionKey, bool condition, const LogEvent& event,
+ const map<int, HashableDimensionKey>& statePrimaryKeys) {
if (condition == false) {
return;
}
@@ -488,7 +482,9 @@ void GaugeMetricProducer::onMatchedLogEventInternalLocked(
if ((*mCurrentSlicedBucket)[eventKey].size() >= mGaugeAtomsPerDimensionLimit) {
return;
}
- GaugeAtom gaugeAtom(getGaugeFields(event), eventTimeNs);
+
+ const int64_t truncatedElapsedTimestampNs = truncateTimestampIfNecessary(event);
+ GaugeAtom gaugeAtom(getGaugeFields(event), truncatedElapsedTimestampNs);
(*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom);
// Anomaly detection on gauge metric only works when there is one numeric
// field specified.
@@ -558,16 +554,16 @@ void GaugeMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
const int64_t& nextBucketStartTimeNs) {
int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
+ int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
GaugeBucket info;
info.mBucketStartNs = mCurrentBucketStartTimeNs;
- if (eventTimeNs < fullBucketEndTimeNs) {
- info.mBucketEndNs = eventTimeNs;
- } else {
- info.mBucketEndNs = fullBucketEndTimeNs;
- }
+ info.mBucketEndNs = bucketEndTime;
- if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
+ // Add bucket to mPastBuckets if bucket is large enough.
+ // Otherwise, drop the bucket data and add bucket metadata to mSkippedBuckets.
+ bool isBucketLargeEnough = info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
+ if (isBucketLargeEnough) {
for (const auto& slice : *mCurrentSlicedBucket) {
info.mGaugeAtoms = slice.second;
auto& bucketList = mPastBuckets[slice.first];
@@ -576,7 +572,13 @@ void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
slice.first.toString().c_str());
}
} else {
- mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
+ mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
+ mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime;
+ if (!maxDropEventsReached()) {
+ mCurrentSkippedBucket.dropEvents.emplace_back(
+ buildDropEvent(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL));
+ }
+ mSkippedBuckets.emplace_back(mCurrentSkippedBucket);
}
// If we have anomaly trackers, we need to update the partial bucket values.
@@ -595,6 +597,7 @@ void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
StatsdStats::getInstance().noteBucketCount(mMetricId);
mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
+ mCurrentSkippedBucket.reset();
}
size_t GaugeMetricProducer::byteSizeLocked() const {