diff options
Diffstat (limited to 'cmds/statsd/src/metrics/ValueMetricProducer.h')
-rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.h | 125 |
1 files changed, 83 insertions, 42 deletions
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h index 784ac64880a3..b359af745c91 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.h +++ b/cmds/statsd/src/metrics/ValueMetricProducer.h @@ -50,12 +50,18 @@ struct ValueBucket { // - an alarm set to the end of the bucket class ValueMetricProducer : public virtual MetricProducer, public virtual PullDataReceiver { public: - ValueMetricProducer(const ConfigKey& key, const ValueMetric& valueMetric, - const int conditionIndex, const sp<ConditionWizard>& conditionWizard, - const int whatMatcherIndex, - const sp<EventMatcherWizard>& matcherWizard, - const int pullTagId, const int64_t timeBaseNs, const int64_t startTimeNs, - const sp<StatsPullerManager>& pullerManager); + ValueMetricProducer( + const ConfigKey& key, const ValueMetric& valueMetric, const int conditionIndex, + const vector<ConditionState>& initialConditionCache, + const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex, + const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, + const int64_t timeBaseNs, const int64_t startTimeNs, + const sp<StatsPullerManager>& pullerManager, + const std::unordered_map<int, std::shared_ptr<Activation>>& eventActivationMap = {}, + const std::unordered_map<int, std::vector<std::shared_ptr<Activation>>>& + eventDeactivationMap = {}, + const vector<int>& slicedStateAtoms = {}, + const unordered_map<int, unordered_map<int, int64_t>>& stateGroupMap = {}); virtual ~ValueMetricProducer(); @@ -64,23 +70,34 @@ public: bool pullSuccess, int64_t originalPullTimeNs) override; // ValueMetric needs special logic if it's a pulled atom. - void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid, - const int64_t version) override { + void notifyAppUpgrade(const int64_t& eventTimeNs) override { std::lock_guard<std::mutex> lock(mMutex); if (!mSplitBucketForAppUpgrade) { return; } - if (mIsPulled && mCondition) { - pullAndMatchEventsLocked(eventTimeNs, mCondition); + if (mIsPulled && mCondition == ConditionState::kTrue) { + pullAndMatchEventsLocked(eventTimeNs); } flushCurrentBucketLocked(eventTimeNs, eventTimeNs); }; + // ValueMetric needs special logic if it's a pulled atom. + void onStatsdInitCompleted(const int64_t& eventTimeNs) override { + std::lock_guard<std::mutex> lock(mMutex); + if (mIsPulled && mCondition == ConditionState::kTrue) { + pullAndMatchEventsLocked(eventTimeNs); + } + flushCurrentBucketLocked(eventTimeNs, eventTimeNs); + }; + + void onStateChanged(int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey, + const FieldValue& oldState, const FieldValue& newState) override; + protected: void onMatchedLogEventInternalLocked( const size_t matcherIndex, const MetricDimensionKey& eventKey, - const ConditionKey& conditionKey, bool condition, - const LogEvent& event) override; + const ConditionKey& conditionKey, bool condition, const LogEvent& event, + const std::map<int, HashableDimensionKey>& statePrimaryKeys) override; private: void onDumpReportLocked(const int64_t dumpTimeNs, @@ -125,8 +142,15 @@ private: int64_t calcBucketsForwardCount(const int64_t& eventTimeNs) const; // Mark the data as invalid. - void invalidateCurrentBucket(); - void invalidateCurrentBucketWithoutResetBase(); + void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); + + void invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs, + const BucketDropReason reason); + + // Skips the current bucket without notifying StatsdStats of the skipped bucket. + // This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that + // causes the bucket to be invalidated will not notify StatsdStats. + void skipCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); const int mWhatMatcherIndex; @@ -138,7 +162,10 @@ private: std::vector<Matcher> mFieldMatchers; // Value fields for matching. - std::set<MetricDimensionKey> mMatchedMetricDimensionKeys; + std::set<HashableDimensionKey> mMatchedMetricDimensionKeys; + + // Holds the atom id, primary key pair from a state change. + pair<int32_t, HashableDimensionKey> mStateChangePrimaryKey; // tagId for pulled data. -1 if this is not pulled const int mPullTagId; @@ -150,10 +177,6 @@ private: typedef struct { // Index in multi value aggregation. int valueIndex; - // Holds current base value of the dimension. Take diff and update if necessary. - Value base; - // Whether there is a base to diff to. - bool hasBase; // Current value, depending on the aggregation type. Value value; // Number of samples collected. @@ -165,34 +188,46 @@ private: bool seenNewData = false; } Interval; + typedef struct { + // Holds current base value of the dimension. Take diff and update if necessary. + Value base; + // Whether there is a base to diff to. + bool hasBase; + // Last seen state value(s). + HashableDimensionKey currentState; + // Whether this dimensions in what key has a current state key. + bool hasCurrentState; + } BaseInfo; + std::unordered_map<MetricDimensionKey, std::vector<Interval>> mCurrentSlicedBucket; + std::unordered_map<HashableDimensionKey, std::vector<BaseInfo>> mCurrentBaseInfo; + std::unordered_map<MetricDimensionKey, int64_t> mCurrentFullBucket; // Save the past buckets and we can clear when the StatsLogReport is dumped. std::unordered_map<MetricDimensionKey, std::vector<ValueBucket>> mPastBuckets; - // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped. - std::list<std::pair<int64_t, int64_t>> mSkippedBuckets; - const int64_t mMinBucketSizeNs; // Util function to check whether the specified dimension hits the guardrail. bool hitGuardRailLocked(const MetricDimensionKey& newKey); + bool hasReachedGuardRailLimit() const; bool hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey); - void pullAndMatchEventsLocked(const int64_t timestampNs, ConditionState condition); + void pullAndMatchEventsLocked(const int64_t timestampNs); void accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, - int64_t originalPullTimeNs, int64_t eventElapsedTimeNs, - ConditionState condition); + int64_t originalPullTimeNs, int64_t eventElapsedTimeNs); ValueBucket buildPartialBucket(int64_t bucketEndTime, const std::vector<Interval>& intervals); + void initCurrentSlicedBucket(int64_t nextBucketStartTimeNs); - void appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs); + + void appendToFullBucket(const bool isFullBucketReached); // Reset diff base and mHasGlobalBase void resetBase(); @@ -225,11 +260,9 @@ private: // diff against. bool mHasGlobalBase; - // Invalid bucket. There was a problem in collecting data in the current bucket so we cannot - // trust any of the data in this bucket. - // - // For instance, one pull failed. - bool mCurrentBucketIsInvalid; + // This is to track whether or not the bucket is skipped for any of the reasons listed in + // BucketDropReason, many of which make the bucket potentially invalid. + bool mCurrentBucketIsSkipped; const int64_t mMaxPullDelayNs; @@ -239,12 +272,10 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection); FRIEND_TEST(ValueMetricProducerTest, TestBaseSetOnConditionChange); - FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2); - FRIEND_TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid); FRIEND_TEST(ValueMetricProducerTest, TestBucketInvalidIfGlobalBaseIsNotSet); FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime); FRIEND_TEST(ValueMetricProducerTest, TestDataIsNotUpdatedWhenNoConditionChanged); @@ -253,14 +284,8 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled); FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition); FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket); - FRIEND_TEST(ValueMetricProducerTest, TestFullBucketResetWhenLastBucketInvalid); - FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit); - FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed); - FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed); - FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed); FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithDiff); FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithoutDiff); - FRIEND_TEST(ValueMetricProducerTest, TestPartialBucketCreated); FRIEND_TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries); FRIEND_TEST(ValueMetricProducerTest, TestPulledData_noDiff_bucketBoundaryFalse); FRIEND_TEST(ValueMetricProducerTest, TestPulledData_noDiff_bucketBoundaryTrue); @@ -271,15 +296,12 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering); - FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade); - FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse); FRIEND_TEST(ValueMetricProducerTest, TestPulledWithAppUpgradeDisabled); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateAvg); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMax); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMin); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSum); FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithCondition); - FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade); FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition); FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullDelayExceeded); FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange); @@ -288,9 +310,28 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate); FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput); FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutputMultiValue); + FRIEND_TEST(ValueMetricProducerTest, TestSlicedState); + FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithMap); + FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithPrimaryField_WithDimensions); + FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithCondition); FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures); + + FRIEND_TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenOneConditionFailed); + FRIEND_TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenInitialPullFailed); + FRIEND_TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenLastPullFailed); + FRIEND_TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenGuardRailHit); + FRIEND_TEST(ValueMetricProducerTest_BucketDrop, + TestInvalidBucketWhenAccumulateEventWrongBucket); + + FRIEND_TEST(ValueMetricProducerTest_PartialBucket, TestBucketBoundariesOnPartialBucket); + FRIEND_TEST(ValueMetricProducerTest_PartialBucket, TestFullBucketResetWhenLastBucketInvalid); + FRIEND_TEST(ValueMetricProducerTest_PartialBucket, TestPartialBucketCreated); + FRIEND_TEST(ValueMetricProducerTest_PartialBucket, TestPushedEvents); + FRIEND_TEST(ValueMetricProducerTest_PartialBucket, TestPulledValue); + FRIEND_TEST(ValueMetricProducerTest_PartialBucket, TestPulledValueWhileConditionFalse); + friend class ValueMetricProducerTestHelper; }; |