summaryrefslogtreecommitdiff
path: root/cmds/statsd/src/metrics/ValueMetricProducer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cmds/statsd/src/metrics/ValueMetricProducer.cpp')
-rw-r--r--cmds/statsd/src/metrics/ValueMetricProducer.cpp540
1 files changed, 342 insertions, 198 deletions
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index fa310dc707ec..5987a723a421 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -46,19 +46,22 @@ const int FIELD_ID_VALUE_METRICS = 7;
const int FIELD_ID_TIME_BASE = 9;
const int FIELD_ID_BUCKET_SIZE = 10;
const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
-const int FIELD_ID_DIMENSION_PATH_IN_CONDITION = 12;
const int FIELD_ID_IS_ACTIVE = 14;
// for ValueMetricDataWrapper
const int FIELD_ID_DATA = 1;
const int FIELD_ID_SKIPPED = 2;
+// for SkippedBuckets
const int FIELD_ID_SKIPPED_START_MILLIS = 3;
const int FIELD_ID_SKIPPED_END_MILLIS = 4;
+const int FIELD_ID_SKIPPED_DROP_EVENT = 5;
+// for DumpEvent Proto
+const int FIELD_ID_BUCKET_DROP_REASON = 1;
+const int FIELD_ID_DROP_TIME = 2;
// for ValueMetricData
const int FIELD_ID_DIMENSION_IN_WHAT = 1;
-const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
const int FIELD_ID_BUCKET_INFO = 3;
const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
-const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5;
+const int FIELD_ID_SLICE_BY_STATE = 6;
// for ValueBucketInfo
const int FIELD_ID_VALUE_INDEX = 1;
const int FIELD_ID_VALUE_LONG = 2;
@@ -75,10 +78,17 @@ const Value ZERO_DOUBLE((int64_t)0);
// ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
ValueMetricProducer::ValueMetricProducer(
const ConfigKey& key, const ValueMetric& metric, const int conditionIndex,
+ const vector<ConditionState>& initialConditionCache,
const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex,
const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs,
- const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager)
- : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, conditionWizard),
+ const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager,
+ const unordered_map<int, shared_ptr<Activation>>& eventActivationMap,
+ const unordered_map<int, vector<shared_ptr<Activation>>>& eventDeactivationMap,
+ const vector<int>& slicedStateAtoms,
+ const unordered_map<int, unordered_map<int, int64_t>>& stateGroupMap)
+ : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, initialConditionCache,
+ conditionWizard, eventActivationMap, eventDeactivationMap, slicedStateAtoms,
+ stateGroupMap),
mWhatMatcherIndex(whatMatcherIndex),
mEventMatcherWizard(matcherWizard),
mPullerManager(pullerManager),
@@ -100,11 +110,11 @@ ValueMetricProducer::ValueMetricProducer(
mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
mUseZeroDefaultBase(metric.use_zero_default_base()),
mHasGlobalBase(false),
- mCurrentBucketIsInvalid(false),
+ mCurrentBucketIsSkipped(false),
mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
: StatsdStats::kPullMaxDelayNs),
mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()),
- // Condition timer will be set in prepareFirstBucketLocked.
+ // Condition timer will be set later within the constructor after pulling events
mConditionTimer(false, timeBaseNs) {
int64_t bucketSizeMills = 0;
if (metric.has_bucket()) {
@@ -120,10 +130,7 @@ ValueMetricProducer::ValueMetricProducer(
if (metric.has_dimensions_in_what()) {
translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
- }
-
- if (metric.has_dimensions_in_condition()) {
- translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition);
+ mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what());
}
if (metric.links().size() > 0) {
@@ -134,11 +141,16 @@ ValueMetricProducer::ValueMetricProducer(
translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
mMetric2ConditionLinks.push_back(mc);
}
+ mConditionSliced = true;
}
- mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
- mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
- HasPositionALL(metric.dimensions_in_condition());
+ for (const auto& stateLink : metric.state_link()) {
+ Metric2State ms;
+ ms.stateAtomId = stateLink.state_atom_id();
+ translateFieldMatcher(stateLink.fields_in_what(), &ms.metricFields);
+ translateFieldMatcher(stateLink.fields_in_state(), &ms.stateFields);
+ mMetric2StateLinks.push_back(ms);
+ }
int64_t numBucketsForward = calcBucketsForwardCount(startTimeNs);
mCurrentBucketNum += numBucketsForward;
@@ -146,7 +158,7 @@ ValueMetricProducer::ValueMetricProducer(
flushIfNeededLocked(startTimeNs);
if (mIsPulled) {
- mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(),
+ mPullerManager->RegisterReceiver(mPullTagId, mConfigKey, this, getCurrentBucketEndTimeNs(),
mBucketSizeNs);
}
@@ -155,6 +167,11 @@ ValueMetricProducer::ValueMetricProducer(
// Adjust start for partial bucket
mCurrentBucketStartTimeNs = startTimeNs;
mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs);
+
+ // Now that activations are processed, start the condition timer if needed.
+ mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue,
+ mCurrentBucketStartTimeNs);
+
VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
(long long)mBucketSizeNs, (long long)mTimeBaseNs);
}
@@ -162,18 +179,48 @@ ValueMetricProducer::ValueMetricProducer(
ValueMetricProducer::~ValueMetricProducer() {
VLOG("~ValueMetricProducer() called");
if (mIsPulled) {
- mPullerManager->UnRegisterReceiver(mPullTagId, this);
+ mPullerManager->UnRegisterReceiver(mPullTagId, mConfigKey, this);
}
}
-void ValueMetricProducer::prepareFirstBucketLocked() {
- // Kicks off the puller immediately if condition is true and diff based.
- if (mIsActive && mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) {
- pullAndMatchEventsLocked(mCurrentBucketStartTimeNs, mCondition);
+void ValueMetricProducer::onStateChanged(int64_t eventTimeNs, int32_t atomId,
+ const HashableDimensionKey& primaryKey,
+ const FieldValue& oldState, const FieldValue& newState) {
+ VLOG("ValueMetric %lld onStateChanged time %lld, State %d, key %s, %d -> %d",
+ (long long)mMetricId, (long long)eventTimeNs, atomId, primaryKey.toString().c_str(),
+ oldState.mValue.int_value, newState.mValue.int_value);
+
+ // If old and new states are in the same StateGroup, then we do not need to
+ // pull for this state change.
+ FieldValue oldStateCopy = oldState;
+ FieldValue newStateCopy = newState;
+ mapStateValue(atomId, &oldStateCopy);
+ mapStateValue(atomId, &newStateCopy);
+ if (oldStateCopy == newStateCopy) {
+ return;
}
- // Now that activations are processed, start the condition timer if needed.
- mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue,
- mCurrentBucketStartTimeNs);
+
+ // If condition is not true or metric is not active, we do not need to pull
+ // for this state change.
+ if (mCondition != ConditionState::kTrue || !mIsActive) {
+ return;
+ }
+
+ bool isEventLate = eventTimeNs < mCurrentBucketStartTimeNs;
+ if (isEventLate) {
+ VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
+ (long long)mCurrentBucketStartTimeNs);
+ invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
+ return;
+ }
+ mStateChangePrimaryKey.first = atomId;
+ mStateChangePrimaryKey.second = primaryKey;
+ if (mIsPulled) {
+ pullAndMatchEventsLocked(eventTimeNs);
+ }
+ mStateChangePrimaryKey.first = 0;
+ mStateChangePrimaryKey.second = DEFAULT_DIMENSION_KEY;
+ flushIfNeededLocked(eventTimeNs);
}
void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
@@ -183,11 +230,9 @@ void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition
void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) {
StatsdStats::getInstance().noteBucketDropped(mMetricId);
- // We are going to flush the data without doing a pull first so we need to invalidte the data.
- bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue;
- if (pullNeeded) {
- invalidateCurrentBucket();
- }
+
+ // The current partial bucket is not flushed and does not require a pull,
+ // so the data is still valid.
flushIfNeededLocked(dropTimeNs);
clearPastBucketsLocked(dropTimeNs);
}
@@ -213,10 +258,10 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
if (pullNeeded) {
switch (dumpLatency) {
case FAST:
- invalidateCurrentBucket();
+ invalidateCurrentBucket(dumpTimeNs, BucketDropReason::DUMP_REPORT_REQUESTED);
break;
case NO_TIME_CONSTRAINTS:
- pullAndMatchEventsLocked(dumpTimeNs, mCondition);
+ pullAndMatchEventsLocked(dumpTimeNs);
break;
}
}
@@ -238,23 +283,26 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
protoOutput->end(dimenPathToken);
}
- if (!mDimensionsInCondition.empty()) {
- uint64_t dimenPathToken =
- protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION);
- writeDimensionPathToProto(mDimensionsInCondition, protoOutput);
- protoOutput->end(dimenPathToken);
- }
}
uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
- for (const auto& pair : mSkippedBuckets) {
+ for (const auto& skippedBucket : mSkippedBuckets) {
uint64_t wrapperToken =
protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
- (long long)(NanoToMillis(pair.first)));
+ (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs)));
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
- (long long)(NanoToMillis(pair.second)));
+ (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs)));
+ for (const auto& dropEvent : skippedBucket.dropEvents) {
+ uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
+ FIELD_ID_SKIPPED_DROP_EVENT);
+ protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME,
+ (long long)(NanoToMillis(dropEvent.dropTimeNs)));
+ ;
+ protoOutput->end(dropEventToken);
+ }
protoOutput->end(wrapperToken);
}
@@ -270,21 +318,17 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput);
protoOutput->end(dimensionToken);
- if (dimensionKey.hasDimensionKeyInCondition()) {
- uint64_t dimensionInConditionToken =
- protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
- writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set,
- protoOutput);
- protoOutput->end(dimensionInConditionToken);
- }
} else {
writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(),
FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
- if (dimensionKey.hasDimensionKeyInCondition()) {
- writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(),
- FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set,
- protoOutput);
- }
+ }
+
+ // Then fill slice_by_state.
+ for (auto state : dimensionKey.getStateValuesKey().getValues()) {
+ uint64_t stateToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
+ FIELD_ID_SLICE_BY_STATE);
+ writeStateToProto(state, protoOutput);
+ protoOutput->end(stateToken);
}
// Then fill bucket_info (ValueBucketInfo).
@@ -306,7 +350,7 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS,
(long long)bucket.mConditionTrueNs);
}
- for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) {
+ for (int i = 0; i < (int)bucket.valueIndex.size(); i++) {
int index = bucket.valueIndex[i];
const Value& value = bucket.values[i];
uint64_t valueToken = protoOutput->start(
@@ -341,23 +385,34 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
}
}
-void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase() {
- if (!mCurrentBucketIsInvalid) {
- // Only report once per invalid bucket.
+void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs,
+ const BucketDropReason reason) {
+ if (!mCurrentBucketIsSkipped) {
+ // Only report to StatsdStats once per invalid bucket.
StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
}
- mCurrentBucketIsInvalid = true;
+
+ skipCurrentBucket(dropTimeNs, reason);
}
-void ValueMetricProducer::invalidateCurrentBucket() {
- invalidateCurrentBucketWithoutResetBase();
+void ValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,
+ const BucketDropReason reason) {
+ invalidateCurrentBucketWithoutResetBase(dropTimeNs, reason);
resetBase();
}
+void ValueMetricProducer::skipCurrentBucket(const int64_t dropTimeNs,
+ const BucketDropReason reason) {
+ if (!maxDropEventsReached()) {
+ mCurrentSkippedBucket.dropEvents.emplace_back(buildDropEvent(dropTimeNs, reason));
+ }
+ mCurrentBucketIsSkipped = true;
+}
+
void ValueMetricProducer::resetBase() {
- for (auto& slice : mCurrentSlicedBucket) {
- for (auto& interval : slice.second) {
- interval.hasBase = false;
+ for (auto& slice : mCurrentBaseInfo) {
+ for (auto& baseInfo : slice.second) {
+ baseInfo.hasBase = false;
}
}
mHasGlobalBase = false;
@@ -369,9 +424,10 @@ void ValueMetricProducer::resetBase() {
// - ConditionTimer tracks changes based on AND of condition and active state.
void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) {
bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs;
- if (ConditionState::kTrue == mCondition && isEventTooLate) {
+ if (isEventTooLate) {
// Drop bucket because event arrived too late, ie. we are missing data for this bucket.
- invalidateCurrentBucket();
+ StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
+ invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
}
// Call parent method once we've verified the validity of current bucket.
@@ -384,7 +440,7 @@ void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs)
// Pull on active state changes.
if (!isEventTooLate) {
if (mIsPulled) {
- pullAndMatchEventsLocked(eventTimeNs, mCondition);
+ pullAndMatchEventsLocked(eventTimeNs);
}
// When active state changes from true to false, clear diff base but don't
// reset other counters as we may accumulate more value in the bucket.
@@ -404,65 +460,80 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;
bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs;
- if (mIsActive) {
- if (isEventTooLate) {
- VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
- (long long)mCurrentBucketStartTimeNs);
- StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
- invalidateCurrentBucket();
- } else {
- if (mCondition == ConditionState::kUnknown) {
- // If the condition was unknown, we mark the bucket as invalid since the bucket will
- // contain partial data. For instance, the condition change might happen close to
- // the end of the bucket and we might miss lots of data.
- //
- // We still want to pull to set the base.
- invalidateCurrentBucket();
- }
+ // If the config is not active, skip the event.
+ if (!mIsActive) {
+ mCondition = isEventTooLate ? ConditionState::kUnknown : newCondition;
+ return;
+ }
- // Pull on condition changes.
- bool conditionChanged =
- (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)
- || (mCondition == ConditionState::kFalse &&
- newCondition == ConditionState::kTrue);
- // We do not need to pull when we go from unknown to false.
- //
- // We also pull if the condition was already true in order to be able to flush the
- // bucket at the end if needed.
- //
- // onConditionChangedLocked might happen on bucket boundaries if this is called before
- // #onDataPulled.
- if (mIsPulled && (conditionChanged || condition)) {
- pullAndMatchEventsLocked(eventTimeNs, newCondition);
- }
+ // If the event arrived late, mark the bucket as invalid and skip the event.
+ if (isEventTooLate) {
+ VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
+ (long long)mCurrentBucketStartTimeNs);
+ StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
+ StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
+ invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
+ mCondition = ConditionState::kUnknown;
+ mConditionTimer.onConditionChanged(mCondition, eventTimeNs);
+ return;
+ }
- // When condition change from true to false, clear diff base but don't
- // reset other counters as we may accumulate more value in the bucket.
- if (mUseDiff && mCondition == ConditionState::kTrue
- && newCondition == ConditionState::kFalse) {
- resetBase();
- }
- }
+ // If the previous condition was unknown, mark the bucket as invalid
+ // because the bucket will contain partial data. For example, the condition
+ // change might happen close to the end of the bucket and we might miss a
+ // lot of data.
+ //
+ // We still want to pull to set the base.
+ if (mCondition == ConditionState::kUnknown) {
+ invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
}
- mCondition = isEventTooLate ? initialCondition(mConditionTrackerIndex) : newCondition;
+ // Pull and match for the following condition change cases:
+ // unknown/false -> true - condition changed
+ // true -> false - condition changed
+ // true -> true - old condition was true so we can flush the bucket at the
+ // end if needed.
+ //
+ // We don’t need to pull for unknown -> false or false -> false.
+ //
+ // onConditionChangedLocked might happen on bucket boundaries if this is
+ // called before #onDataPulled.
+ if (mIsPulled &&
+ (newCondition == ConditionState::kTrue || mCondition == ConditionState::kTrue)) {
+ pullAndMatchEventsLocked(eventTimeNs);
+ }
- if (mIsActive) {
- flushIfNeededLocked(eventTimeNs);
- mConditionTimer.onConditionChanged(mCondition, eventTimeNs);
+ // For metrics that use diff, when condition changes from true to false,
+ // clear diff base but don't reset other counts because we may accumulate
+ // more value in the bucket.
+ if (mUseDiff &&
+ (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) {
+ resetBase();
}
+
+ // Update condition state after pulling.
+ mCondition = newCondition;
+
+ flushIfNeededLocked(eventTimeNs);
+ mConditionTimer.onConditionChanged(mCondition, eventTimeNs);
}
-void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs,
- ConditionState condition) {
+void ValueMetricProducer::prepareFirstBucketLocked() {
+ // Kicks off the puller immediately if condition is true and diff based.
+ if (mIsActive && mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) {
+ pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
+ }
+}
+
+void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
vector<std::shared_ptr<LogEvent>> allData;
- if (!mPullerManager->Pull(mPullTagId, &allData)) {
+ if (!mPullerManager->Pull(mPullTagId, mConfigKey, timestampNs, &allData)) {
ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
- invalidateCurrentBucket();
+ invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED);
return;
}
- accumulateEvents(allData, timestampNs, timestampNs, condition);
+ accumulateEvents(allData, timestampNs, timestampNs);
}
int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
@@ -475,33 +546,33 @@ int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTime
void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
bool pullSuccess, int64_t originalPullTimeNs) {
std::lock_guard<std::mutex> lock(mMutex);
- if (mCondition == ConditionState::kTrue) {
- // If the pull failed, we won't be able to compute a diff.
- if (!pullSuccess) {
- invalidateCurrentBucket();
+ if (mCondition == ConditionState::kTrue) {
+ // If the pull failed, we won't be able to compute a diff.
+ if (!pullSuccess) {
+ invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED);
+ } else {
+ bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
+ if (isEventLate) {
+ // If the event is late, we are in the middle of a bucket. Just
+ // process the data without trying to snap the data to the nearest bucket.
+ accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
} else {
- bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
- if (isEventLate) {
- // If the event is late, we are in the middle of a bucket. Just
- // process the data without trying to snap the data to the nearest bucket.
- accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs, mCondition);
- } else {
- // For scheduled pulled data, the effective event time is snap to the nearest
- // bucket end. In the case of waking up from a deep sleep state, we will
- // attribute to the previous bucket end. If the sleep was long but not very
- // long, we will be in the immediate next bucket. Previous bucket may get a
- // larger number as we pull at a later time than real bucket end.
- //
- // If the sleep was very long, we skip more than one bucket before sleep. In
- // this case, if the diff base will be cleared and this new data will serve as
- // new diff base.
- int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
- StatsdStats::getInstance().noteBucketBoundaryDelayNs(
- mMetricId, originalPullTimeNs - bucketEndTime);
- accumulateEvents(allData, originalPullTimeNs, bucketEndTime, mCondition);
- }
+ // For scheduled pulled data, the effective event time is snap to the nearest
+ // bucket end. In the case of waking up from a deep sleep state, we will
+ // attribute to the previous bucket end. If the sleep was long but not very
+ // long, we will be in the immediate next bucket. Previous bucket may get a
+ // larger number as we pull at a later time than real bucket end.
+ //
+ // If the sleep was very long, we skip more than one bucket before sleep. In
+ // this case, if the diff base will be cleared and this new data will serve as
+ // new diff base.
+ int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
+ StatsdStats::getInstance().noteBucketBoundaryDelayNs(
+ mMetricId, originalPullTimeNs - bucketEndTime);
+ accumulateEvents(allData, originalPullTimeNs, bucketEndTime);
}
}
+ }
// We can probably flush the bucket. Since we used bucketEndTime when calling
// #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
@@ -509,18 +580,18 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
}
void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
- int64_t originalPullTimeNs, int64_t eventElapsedTimeNs,
- ConditionState condition) {
+ int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) {
bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs;
if (isEventLate) {
VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
(long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
- invalidateCurrentBucket();
+ invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
return;
}
- const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs;
+ const int64_t elapsedRealtimeNs = getElapsedRealtimeNs();
+ const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs;
StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
if (pullDelayNs > mMaxPullDelayNs) {
ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
@@ -528,15 +599,10 @@ void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<Log
StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
// We are missing one pull from the bucket which means we will not have a complete view of
// what's going on.
- invalidateCurrentBucket();
+ invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED);
return;
}
- if (allData.size() == 0) {
- VLOG("Data pulled is empty");
- StatsdStats::getInstance().noteEmptyData(mPullTagId);
- }
-
mMatchedMetricDimensionKeys.clear();
for (const auto& data : allData) {
LogEvent localCopy = data->makeCopy();
@@ -546,14 +612,19 @@ void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<Log
onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
}
}
- // If the new pulled data does not contains some keys we track in our intervals, we need to
- // reset the base.
+ // If a key that is:
+ // 1. Tracked in mCurrentSlicedBucket and
+ // 2. A superset of the current mStateChangePrimaryKey
+ // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
+ // then we need to reset the base.
for (auto& slice : mCurrentSlicedBucket) {
- bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first)
- != mMatchedMetricDimensionKeys.end();
- if (!presentInPulledData) {
- for (auto& interval : slice.second) {
- interval.hasBase = false;
+ const auto& whatKey = slice.first.getDimensionKeyInWhat();
+ bool presentInPulledData =
+ mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
+ if (!presentInPulledData && whatKey.contains(mStateChangePrimaryKey.second)) {
+ auto it = mCurrentBaseInfo.find(whatKey);
+ for (auto& baseInfo : it->second) {
+ baseInfo.hasBase = false;
}
}
}
@@ -567,7 +638,7 @@ void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<Log
// incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
// might be missing from mCurrentSlicedBucket.
if (hasReachedGuardRailLimit()) {
- invalidateCurrentBucket();
+ invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED);
mCurrentSlicedBucket.clear();
}
}
@@ -582,10 +653,10 @@ void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
if (verbose) {
for (const auto& it : mCurrentSlicedBucket) {
for (const auto& interval : it.second) {
- fprintf(out, "\t(what)%s\t(condition)%s (value)%s\n",
- it.first.getDimensionKeyInWhat().toString().c_str(),
- it.first.getDimensionKeyInCondition().toString().c_str(),
- interval.value.toString().c_str());
+ fprintf(out, "\t(what)%s\t(states)%s (value)%s\n",
+ it.first.getDimensionKeyInWhat().toString().c_str(),
+ it.first.getStateValuesKey().toString().c_str(),
+ interval.value.toString().c_str());
}
}
}
@@ -653,6 +724,7 @@ bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret)
ret.setDouble(value.mValue.double_value);
break;
default:
+ return false;
break;
}
return true;
@@ -661,17 +733,30 @@ bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret)
return false;
}
-void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex,
- const MetricDimensionKey& eventKey,
- const ConditionKey& conditionKey,
- bool condition, const LogEvent& event) {
+void ValueMetricProducer::onMatchedLogEventInternalLocked(
+ const size_t matcherIndex, const MetricDimensionKey& eventKey,
+ const ConditionKey& conditionKey, bool condition, const LogEvent& event,
+ const map<int, HashableDimensionKey>& statePrimaryKeys) {
+ auto whatKey = eventKey.getDimensionKeyInWhat();
+ auto stateKey = eventKey.getStateValuesKey();
+
+ // Skip this event if a state changed occurred for a different primary key.
+ auto it = statePrimaryKeys.find(mStateChangePrimaryKey.first);
+ // Check that both the atom id and the primary key are equal.
+ if (it != statePrimaryKeys.end() && it->second != mStateChangePrimaryKey.second) {
+ VLOG("ValueMetric skip event with primary key %s because state change primary key "
+ "is %s",
+ it->second.toString().c_str(), mStateChangePrimaryKey.second.toString().c_str());
+ return;
+ }
+
int64_t eventTimeNs = event.GetElapsedTimestampNs();
if (eventTimeNs < mCurrentBucketStartTimeNs) {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
(long long)mCurrentBucketStartTimeNs);
return;
}
- mMatchedMetricDimensionKeys.insert(eventKey);
+ mMatchedMetricDimensionKeys.insert(whatKey);
if (!mIsPulled) {
// We cannot flush without doing a pull first.
@@ -689,17 +774,35 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn
bool shouldSkipForPulledMetric = mIsPulled && !mUseDiff
&& mCondition != ConditionState::kTrue;
if (shouldSkipForPushMetric || shouldSkipForPulledMetric) {
- VLOG("ValueMetric skip event because condition is false");
+ VLOG("ValueMetric skip event because condition is false and we are not using diff (for "
+ "pulled metric)");
return;
}
if (hitGuardRailLocked(eventKey)) {
return;
}
- vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey];
- if (multiIntervals.size() < mFieldMatchers.size()) {
+
+ vector<BaseInfo>& baseInfos = mCurrentBaseInfo[whatKey];
+ if (baseInfos.size() < mFieldMatchers.size()) {
+ VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
+ baseInfos.resize(mFieldMatchers.size());
+ }
+
+ for (BaseInfo& baseInfo : baseInfos) {
+ if (!baseInfo.hasCurrentState) {
+ baseInfo.currentState = getUnknownStateKey();
+ baseInfo.hasCurrentState = true;
+ }
+ }
+
+ // We need to get the intervals stored with the previous state key so we can
+ // close these value intervals.
+ const auto oldStateKey = baseInfos[0].currentState;
+ vector<Interval>& intervals = mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)];
+ if (intervals.size() < mFieldMatchers.size()) {
VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
- multiIntervals.resize(mFieldMatchers.size());
+ intervals.resize(mFieldMatchers.size());
}
// We only use anomaly detection under certain cases.
@@ -712,9 +815,12 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn
for (int i = 0; i < (int)mFieldMatchers.size(); i++) {
const Matcher& matcher = mFieldMatchers[i];
- Interval& interval = multiIntervals[i];
+ BaseInfo& baseInfo = baseInfos[i];
+ Interval& interval = intervals[i];
interval.valueIndex = i;
Value value;
+ baseInfo.hasCurrentState = true;
+ baseInfo.currentState = stateKey;
if (!getDoubleOrLong(event, matcher, value)) {
VLOG("Failed to get value %d from event %s", i, event.ToString().c_str());
StatsdStats::getInstance().noteBadValueType(mMetricId);
@@ -723,60 +829,61 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn
interval.seenNewData = true;
if (mUseDiff) {
- if (!interval.hasBase) {
+ if (!baseInfo.hasBase) {
if (mHasGlobalBase && mUseZeroDefaultBase) {
// The bucket has global base. This key does not.
// Optionally use zero as base.
- interval.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
- interval.hasBase = true;
+ baseInfo.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
+ baseInfo.hasBase = true;
} else {
// no base. just update base and return.
- interval.base = value;
- interval.hasBase = true;
+ baseInfo.base = value;
+ baseInfo.hasBase = true;
// If we're missing a base, do not use anomaly detection on incomplete data
useAnomalyDetection = false;
- // Continue (instead of return) here in order to set interval.base and
- // interval.hasBase for other intervals
+ // Continue (instead of return) here in order to set baseInfo.base and
+ // baseInfo.hasBase for other baseInfos
continue;
}
}
+
Value diff;
switch (mValueDirection) {
case ValueMetric::INCREASING:
- if (value >= interval.base) {
- diff = value - interval.base;
+ if (value >= baseInfo.base) {
+ diff = value - baseInfo.base;
} else if (mUseAbsoluteValueOnReset) {
diff = value;
} else {
VLOG("Unexpected decreasing value");
StatsdStats::getInstance().notePullDataError(mPullTagId);
- interval.base = value;
+ baseInfo.base = value;
// If we've got bad data, do not use anomaly detection
useAnomalyDetection = false;
continue;
}
break;
case ValueMetric::DECREASING:
- if (interval.base >= value) {
- diff = interval.base - value;
+ if (baseInfo.base >= value) {
+ diff = baseInfo.base - value;
} else if (mUseAbsoluteValueOnReset) {
diff = value;
} else {
VLOG("Unexpected increasing value");
StatsdStats::getInstance().notePullDataError(mPullTagId);
- interval.base = value;
+ baseInfo.base = value;
// If we've got bad data, do not use anomaly detection
useAnomalyDetection = false;
continue;
}
break;
case ValueMetric::ANY:
- diff = value - interval.base;
+ diff = value - baseInfo.base;
break;
default:
break;
}
- interval.base = value;
+ baseInfo.base = value;
value = diff;
}
@@ -806,7 +913,7 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn
// Only trigger the tracker if all intervals are correct
if (useAnomalyDetection) {
// TODO: propgate proper values down stream when anomaly support doubles
- long wholeBucketVal = multiIntervals[0].value.long_value;
+ long wholeBucketVal = intervals[0].value.long_value;
auto prev = mCurrentFullBucket.find(eventKey);
if (prev != mCurrentFullBucket.end()) {
wholeBucketVal += prev->second;
@@ -823,7 +930,7 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn
void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
if (eventTimeNs < currentBucketEndTimeNs) {
- VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
+ VLOG("eventTime is %lld, less than current bucket end time %lld", (long long)eventTimeNs,
(long long)(currentBucketEndTimeNs));
return;
}
@@ -844,25 +951,39 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
const int64_t& nextBucketStartTimeNs) {
if (mCondition == ConditionState::kUnknown) {
StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
+ invalidateCurrentBucketWithoutResetBase(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
}
+ VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
+ (int)mCurrentSlicedBucket.size());
+
+ int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
+ int64_t bucketEndTime = fullBucketEndTimeNs;
int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
- if (numBucketsForward > 1) {
+
+ // Skip buckets if this is a pulled metric or a pushed metric that is diffed.
+ if (numBucketsForward > 1 && (mIsPulled || mUseDiff)) {
+
VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
// Something went wrong. Maybe the device was sleeping for a long time. It is better
// to mark the current bucket as invalid. The last pull might have been successful through.
- invalidateCurrentBucketWithoutResetBase();
+ invalidateCurrentBucketWithoutResetBase(eventTimeNs,
+ BucketDropReason::MULTIPLE_BUCKETS_SKIPPED);
+ // End the bucket at the next bucket start time so the entire interval is skipped.
+ bucketEndTime = nextBucketStartTimeNs;
+ } else if (eventTimeNs < fullBucketEndTimeNs) {
+ bucketEndTime = eventTimeNs;
}
- VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
- (int)mCurrentSlicedBucket.size());
- int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
- int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
// Close the current bucket.
int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime);
bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
- if (isBucketLargeEnough && !mCurrentBucketIsInvalid) {
+ if (!isBucketLargeEnough) {
+ skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL);
+ }
+ if (!mCurrentBucketIsSkipped) {
+ bool bucketHasData = false;
// The current bucket is large enough to keep.
for (const auto& slice : mCurrentSlicedBucket) {
ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second);
@@ -871,13 +992,34 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
if (bucket.valueIndex.size() > 0) {
auto& bucketList = mPastBuckets[slice.first];
bucketList.push_back(bucket);
+ bucketHasData = true;
}
}
- } else {
- mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime);
+ if (!bucketHasData) {
+ skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
+ }
+ }
+
+ if (mCurrentBucketIsSkipped) {
+ mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
+ mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime;
+ mSkippedBuckets.emplace_back(mCurrentSkippedBucket);
}
- appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);
+ // This means that the current bucket was not flushed before a forced bucket split.
+ // This can happen if an app update or a dump report with include_current_partial_bucket is
+ // requested before we get a chance to flush the bucket due to receiving new data, either from
+ // the statsd socket or the StatsPullerManager.
+ if (bucketEndTime < nextBucketStartTimeNs) {
+ SkippedBucket bucketInGap;
+ bucketInGap.bucketStartTimeNs = bucketEndTime;
+ bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs;
+ bucketInGap.dropEvents.emplace_back(
+ buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA));
+ mSkippedBuckets.emplace_back(bucketInGap);
+ }
+
+ appendToFullBucket(eventTimeNs > fullBucketEndTimeNs);
initCurrentSlicedBucket(nextBucketStartTimeNs);
// Update the condition timer again, in case we skipped buckets.
mConditionTimer.newBucketStart(nextBucketStartTimeNs);
@@ -927,22 +1069,24 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs)
} else {
it++;
}
+ // TODO(b/157655103): remove mCurrentBaseInfo entries when obsolete
}
- mCurrentBucketIsInvalid = false;
+ mCurrentBucketIsSkipped = false;
+ mCurrentSkippedBucket.reset();
+
// If we do not have a global base when the condition is true,
// we will have incomplete bucket for the next bucket.
if (mUseDiff && !mHasGlobalBase && mCondition) {
- mCurrentBucketIsInvalid = false;
+ mCurrentBucketIsSkipped = false;
}
mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
(long long)mCurrentBucketStartTimeNs);
}
-void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) {
- bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs;
- if (mCurrentBucketIsInvalid) {
+void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
+ if (mCurrentBucketIsSkipped) {
if (isFullBucketReached) {
// If the bucket is invalid, we ignore the full bucket since it contains invalid data.
mCurrentFullBucket.clear();