diff options
Diffstat (limited to 'cmds/statsd/src/metrics/duration_helper/OringDurationTracker.cpp')
-rw-r--r-- | cmds/statsd/src/metrics/duration_helper/OringDurationTracker.cpp | 194 |
1 files changed, 121 insertions, 73 deletions
diff --git a/cmds/statsd/src/metrics/duration_helper/OringDurationTracker.cpp b/cmds/statsd/src/metrics/duration_helper/OringDurationTracker.cpp index 956383a99eea..0d49bbc269a3 100644 --- a/cmds/statsd/src/metrics/duration_helper/OringDurationTracker.cpp +++ b/cmds/statsd/src/metrics/duration_helper/OringDurationTracker.cpp @@ -26,27 +26,15 @@ using std::pair; OringDurationTracker::OringDurationTracker( const ConfigKey& key, const int64_t& id, const MetricDimensionKey& eventKey, - sp<ConditionWizard> wizard, int conditionIndex, const vector<Matcher>& dimensionInCondition, - bool nesting, int64_t currentBucketStartNs, int64_t currentBucketNum, - int64_t startTimeNs, int64_t bucketSizeNs, bool conditionSliced, bool fullLink, - const vector<sp<DurationAnomalyTracker>>& anomalyTrackers) - : DurationTracker(key, id, eventKey, wizard, conditionIndex, dimensionInCondition, nesting, - currentBucketStartNs, currentBucketNum, startTimeNs, bucketSizeNs, - conditionSliced, fullLink, anomalyTrackers), + sp<ConditionWizard> wizard, int conditionIndex, bool nesting, int64_t currentBucketStartNs, + int64_t currentBucketNum, int64_t startTimeNs, int64_t bucketSizeNs, bool conditionSliced, + bool fullLink, const vector<sp<DurationAnomalyTracker>>& anomalyTrackers) + : DurationTracker(key, id, eventKey, wizard, conditionIndex, nesting, currentBucketStartNs, + currentBucketNum, startTimeNs, bucketSizeNs, conditionSliced, fullLink, + anomalyTrackers), mStarted(), mPaused() { mLastStartTime = 0; - if (mWizard != nullptr) { - mSameConditionDimensionsInTracker = - mWizard->equalOutputDimensions(conditionIndex, mDimensionInCondition); - } -} - -unique_ptr<DurationTracker> OringDurationTracker::clone(const int64_t eventTime) { - auto clonedTracker = make_unique<OringDurationTracker>(*this); - clonedTracker->mLastStartTime = eventTime; - clonedTracker->mDuration = 0; - return clonedTracker; } bool OringDurationTracker::hitGuardRail(const HashableDimensionKey& newKey) { @@ -101,10 +89,14 @@ void OringDurationTracker::noteStop(const HashableDimensionKey& key, const int64 mConditionKeyMap.erase(key); } if (mStarted.empty()) { - mDuration += (timestamp - mLastStartTime); - detectAndDeclareAnomaly(timestamp, mCurrentBucketNum, mDuration + mDurationFullBucket); - VLOG("record duration %lld, total %lld ", (long long)timestamp - mLastStartTime, - (long long)mDuration); + mStateKeyDurationMap[mEventKey.getStateValuesKey()].mDuration += + (timestamp - mLastStartTime); + detectAndDeclareAnomaly( + timestamp, mCurrentBucketNum, + getCurrentStateKeyDuration() + getCurrentStateKeyFullBucketDuration()); + VLOG("record duration %lld, total duration %lld for state key %s", + (long long)timestamp - mLastStartTime, (long long)getCurrentStateKeyDuration(), + mEventKey.getStateValuesKey().toString().c_str()); } } @@ -123,10 +115,14 @@ void OringDurationTracker::noteStop(const HashableDimensionKey& key, const int64 void OringDurationTracker::noteStopAll(const int64_t timestamp) { if (!mStarted.empty()) { - mDuration += (timestamp - mLastStartTime); - VLOG("Oring Stop all: record duration %lld %lld ", (long long)timestamp - mLastStartTime, - (long long)mDuration); - detectAndDeclareAnomaly(timestamp, mCurrentBucketNum, mDuration + mDurationFullBucket); + mStateKeyDurationMap[mEventKey.getStateValuesKey()].mDuration += + (timestamp - mLastStartTime); + VLOG("Oring Stop all: record duration %lld, total duration %lld for state key %s", + (long long)timestamp - mLastStartTime, (long long)getCurrentStateKeyDuration(), + mEventKey.getStateValuesKey().toString().c_str()); + detectAndDeclareAnomaly( + timestamp, mCurrentBucketNum, + getCurrentStateKeyDuration() + getCurrentStateKeyFullBucketDuration()); } stopAnomalyAlarm(timestamp); @@ -157,21 +153,36 @@ bool OringDurationTracker::flushCurrentBucket( // Process the current bucket. if (mStarted.size() > 0) { - mDuration += (currentBucketEndTimeNs - mLastStartTime); - } - if (mDuration > 0) { - DurationBucket current_info; - current_info.mBucketStartNs = mCurrentBucketStartTimeNs; - current_info.mBucketEndNs = currentBucketEndTimeNs; - current_info.mDuration = mDuration; - (*output)[mEventKey].push_back(current_info); - mDurationFullBucket += mDuration; - VLOG(" duration: %lld", (long long)current_info.mDuration); + // Calculate the duration for the current state key. + mStateKeyDurationMap[mEventKey.getStateValuesKey()].mDuration += + (currentBucketEndTimeNs - mLastStartTime); } - if (eventTimeNs > fullBucketEnd) { - // End of full bucket, can send to anomaly tracker now. - addPastBucketToAnomalyTrackers(mDurationFullBucket, mCurrentBucketNum); - mDurationFullBucket = 0; + // Store DurationBucket info for each whatKey, stateKey pair. + // Note: The whatKey stored in mEventKey is constant for each DurationTracker, while the + // stateKey stored in mEventKey is only the current stateKey. mStateKeyDurationMap is used to + // store durations for each stateKey, so we need to flush the bucket by creating a + // DurationBucket for each stateKey. + for (auto& durationIt : mStateKeyDurationMap) { + if (durationIt.second.mDuration > 0) { + DurationBucket current_info; + current_info.mBucketStartNs = mCurrentBucketStartTimeNs; + current_info.mBucketEndNs = currentBucketEndTimeNs; + current_info.mDuration = durationIt.second.mDuration; + (*output)[MetricDimensionKey(mEventKey.getDimensionKeyInWhat(), durationIt.first)] + .push_back(current_info); + + durationIt.second.mDurationFullBucket += durationIt.second.mDuration; + VLOG(" duration: %lld", (long long)current_info.mDuration); + } + + if (eventTimeNs > fullBucketEnd) { + // End of full bucket, can send to anomaly tracker now. + addPastBucketToAnomalyTrackers( + MetricDimensionKey(mEventKey.getDimensionKeyInWhat(), durationIt.first), + getCurrentStateKeyFullBucketDuration(), mCurrentBucketNum); + durationIt.second.mDurationFullBucket = 0; + } + durationIt.second.mDuration = 0; } if (mStarted.size() > 0) { @@ -180,20 +191,19 @@ bool OringDurationTracker::flushCurrentBucket( info.mBucketStartNs = fullBucketEnd + mBucketSizeNs * (i - 1); info.mBucketEndNs = info.mBucketStartNs + mBucketSizeNs; info.mDuration = mBucketSizeNs; + // Full duration buckets are attributed to the current stateKey. (*output)[mEventKey].push_back(info); // Safe to send these buckets to anomaly tracker since they must be full buckets. // If it's a partial bucket, numBucketsForward would be 0. - addPastBucketToAnomalyTrackers(info.mDuration, mCurrentBucketNum + i); + addPastBucketToAnomalyTrackers(mEventKey, info.mDuration, mCurrentBucketNum + i); VLOG(" add filling bucket with duration %lld", (long long)info.mDuration); } } else { if (numBucketsForward >= 2) { - addPastBucketToAnomalyTrackers(0, mCurrentBucketNum + numBucketsForward - 1); + addPastBucketToAnomalyTrackers(mEventKey, 0, mCurrentBucketNum + numBucketsForward - 1); } } - mDuration = 0; - if (numBucketsForward > 0) { mCurrentBucketStartTimeNs = fullBucketEnd + (numBucketsForward - 1) * mBucketSizeNs; mCurrentBucketNum += numBucketsForward; @@ -227,17 +237,10 @@ void OringDurationTracker::onSlicedConditionMayChange(bool overallCondition, ++it; continue; } - std::unordered_set<HashableDimensionKey> conditionDimensionKeySet; ConditionState conditionState = mWizard->query(mConditionTrackerIndex, condIt->second, - mDimensionInCondition, - !mSameConditionDimensionsInTracker, - !mHasLinksToAllConditionDimensionsInTracker, - &conditionDimensionKeySet); - if (conditionState != ConditionState::kTrue || - (mDimensionInCondition.size() != 0 && - conditionDimensionKeySet.find(mEventKey.getDimensionKeyInCondition()) == - conditionDimensionKeySet.end())) { + !mHasLinksToAllConditionDimensionsInTracker); + if (conditionState != ConditionState::kTrue) { startedToPaused.push_back(*it); it = mStarted.erase(it); VLOG("Key %s started -> paused", key.toString().c_str()); @@ -247,10 +250,14 @@ void OringDurationTracker::onSlicedConditionMayChange(bool overallCondition, } if (mStarted.empty()) { - mDuration += (timestamp - mLastStartTime); - VLOG("Duration add %lld , to %lld ", (long long)(timestamp - mLastStartTime), - (long long)mDuration); - detectAndDeclareAnomaly(timestamp, mCurrentBucketNum, mDuration + mDurationFullBucket); + mStateKeyDurationMap[mEventKey.getStateValuesKey()].mDuration += + (timestamp - mLastStartTime); + VLOG("record duration %lld, total duration %lld for state key %s", + (long long)(timestamp - mLastStartTime), (long long)getCurrentStateKeyDuration(), + mEventKey.getStateValuesKey().toString().c_str()); + detectAndDeclareAnomaly( + timestamp, mCurrentBucketNum, + getCurrentStateKeyDuration() + getCurrentStateKeyFullBucketDuration()); } } @@ -262,17 +269,10 @@ void OringDurationTracker::onSlicedConditionMayChange(bool overallCondition, ++it; continue; } - std::unordered_set<HashableDimensionKey> conditionDimensionKeySet; ConditionState conditionState = mWizard->query(mConditionTrackerIndex, mConditionKeyMap[key], - mDimensionInCondition, - !mSameConditionDimensionsInTracker, - !mHasLinksToAllConditionDimensionsInTracker, - &conditionDimensionKeySet); - if (conditionState == ConditionState::kTrue && - (mDimensionInCondition.size() == 0 || - conditionDimensionKeySet.find(mEventKey.getDimensionKeyInCondition()) != - conditionDimensionKeySet.end())) { + !mHasLinksToAllConditionDimensionsInTracker); + if (conditionState == ConditionState::kTrue) { pausedToStarted.push_back(*it); it = mPaused.erase(it); VLOG("Key %s paused -> started", key.toString().c_str()); @@ -313,10 +313,13 @@ void OringDurationTracker::onConditionChanged(bool condition, const int64_t time } else { if (!mStarted.empty()) { VLOG("Condition false, all paused"); - mDuration += (timestamp - mLastStartTime); + mStateKeyDurationMap[mEventKey.getStateValuesKey()].mDuration += + (timestamp - mLastStartTime); mPaused.insert(mStarted.begin(), mStarted.end()); mStarted.clear(); - detectAndDeclareAnomaly(timestamp, mCurrentBucketNum, mDuration + mDurationFullBucket); + detectAndDeclareAnomaly( + timestamp, mCurrentBucketNum, + getCurrentStateKeyDuration() + getCurrentStateKeyFullBucketDuration()); } } if (mStarted.empty()) { @@ -324,6 +327,23 @@ void OringDurationTracker::onConditionChanged(bool condition, const int64_t time } } +void OringDurationTracker::onStateChanged(const int64_t timestamp, const int32_t atomId, + const FieldValue& newState) { + // Nothing needs to be done on a state change if we have not seen a start + // event, the metric is currently not active, or condition is false. + // For these cases, no keys are being tracked in mStarted, so update + // the current state key and return. + if (mStarted.empty()) { + updateCurrentStateKey(atomId, newState); + return; + } + // Add the current duration length to the previous state key and then update + // the last start time and current state key. + mStateKeyDurationMap[mEventKey.getStateValuesKey()].mDuration += (timestamp - mLastStartTime); + mLastStartTime = timestamp; + updateCurrentStateKey(atomId, newState); +} + int64_t OringDurationTracker::predictAnomalyTimestampNs( const DurationAnomalyTracker& anomalyTracker, const int64_t eventTimestampNs) const { @@ -333,12 +353,13 @@ int64_t OringDurationTracker::predictAnomalyTimestampNs( // The timestamp of the current bucket end. const int64_t currentBucketEndNs = getCurrentBucketEndTimeNs(); - // The past duration ns for the current bucket. - int64_t currentBucketPastNs = mDuration + mDurationFullBucket; + // The past duration ns for the current bucket of the current stateKey. + int64_t currentStateBucketPastNs = + getCurrentStateKeyDuration() + getCurrentStateKeyFullBucketDuration(); // As we move into the future, old buckets get overwritten (so their old data is erased). // Sum of past durations. Will change as we overwrite old buckets. - int64_t pastNs = currentBucketPastNs + anomalyTracker.getSumOverPastBuckets(mEventKey); + int64_t pastNs = currentStateBucketPastNs + anomalyTracker.getSumOverPastBuckets(mEventKey); // The refractory period end timestamp for dimension mEventKey. const int64_t refractoryPeriodEndNs = @@ -397,7 +418,7 @@ int64_t OringDurationTracker::predictAnomalyTimestampNs( mEventKey, mCurrentBucketNum - anomalyTracker.getNumOfPastBuckets() + futureBucketIdx); } else if (futureBucketIdx == anomalyTracker.getNumOfPastBuckets()) { - pastNs -= (currentBucketPastNs + (currentBucketEndNs - eventTimestampNs)); + pastNs -= (currentStateBucketPastNs + (currentBucketEndNs - eventTimestampNs)); } } @@ -407,7 +428,34 @@ int64_t OringDurationTracker::predictAnomalyTimestampNs( void OringDurationTracker::dumpStates(FILE* out, bool verbose) const { fprintf(out, "\t\t started count %lu\n", (unsigned long)mStarted.size()); fprintf(out, "\t\t paused count %lu\n", (unsigned long)mPaused.size()); - fprintf(out, "\t\t current duration %lld\n", (long long)mDuration); + fprintf(out, "\t\t current duration %lld\n", (long long)getCurrentStateKeyDuration()); +} + +int64_t OringDurationTracker::getCurrentStateKeyDuration() const { + auto it = mStateKeyDurationMap.find(mEventKey.getStateValuesKey()); + if (it == mStateKeyDurationMap.end()) { + return 0; + } else { + return it->second.mDuration; + } +} + +int64_t OringDurationTracker::getCurrentStateKeyFullBucketDuration() const { + auto it = mStateKeyDurationMap.find(mEventKey.getStateValuesKey()); + if (it == mStateKeyDurationMap.end()) { + return 0; + } else { + return it->second.mDurationFullBucket; + } +} + +void OringDurationTracker::updateCurrentStateKey(const int32_t atomId, const FieldValue& newState) { + HashableDimensionKey* stateValuesKey = mEventKey.getMutableStateValuesKey(); + for (size_t i = 0; i < stateValuesKey->getValues().size(); i++) { + if (stateValuesKey->getValues()[i].mField.getTag() == atomId) { + stateValuesKey->mutableValue(i)->mValue = newState.mValue; + } + } } } // namespace statsd |