diff options
Diffstat (limited to 'cmds/statsd/src/metrics/MetricProducer.h')
-rw-r--r-- | cmds/statsd/src/metrics/MetricProducer.h | 376 |
1 files changed, 226 insertions, 150 deletions
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h index c77bc0135d86..be4cd6724bb1 100644 --- a/cmds/statsd/src/metrics/MetricProducer.h +++ b/cmds/statsd/src/metrics/MetricProducer.h @@ -28,6 +28,8 @@ #include "config/ConfigKey.h" #include "matchers/matcher_util.h" #include "packages/PackageInfoListener.h" +#include "state/StateListener.h" +#include "state/StateManager.h" namespace android { namespace os { @@ -67,61 +69,101 @@ enum DumpLatency { NO_TIME_CONSTRAINTS = 2 }; +// Keep this in sync with BucketDropReason enum in stats_log.proto +enum BucketDropReason { + // For ValueMetric, a bucket is dropped during a dump report request iff + // current bucket should be included, a pull is needed (pulled metric and + // condition is true), and we are under fast time constraints. + DUMP_REPORT_REQUESTED = 1, + EVENT_IN_WRONG_BUCKET = 2, + CONDITION_UNKNOWN = 3, + PULL_FAILED = 4, + PULL_DELAYED = 5, + DIMENSION_GUARDRAIL_REACHED = 6, + MULTIPLE_BUCKETS_SKIPPED = 7, + // Not an invalid bucket case, but the bucket is dropped. + BUCKET_TOO_SMALL = 8, + // Not an invalid bucket case, but the bucket is skipped. + NO_DATA = 9 +}; + +struct Activation { + Activation(const ActivationType& activationType, const int64_t ttlNs) + : ttl_ns(ttlNs), + start_ns(0), + state(ActivationState::kNotActive), + activationType(activationType) {} + + const int64_t ttl_ns; + int64_t start_ns; + ActivationState state; + const ActivationType activationType; +}; + +struct DropEvent { + // Reason for dropping the bucket and/or marking the bucket invalid. + BucketDropReason reason; + // The timestamp of the drop event. + int64_t dropTimeNs; +}; + +struct SkippedBucket { + // Start time of the dropped bucket. + int64_t bucketStartTimeNs; + // End time of the dropped bucket. + int64_t bucketEndTimeNs; + // List of events that invalidated this bucket. + std::vector<DropEvent> dropEvents; + + void reset() { + bucketStartTimeNs = 0; + bucketEndTimeNs = 0; + dropEvents.clear(); + } +}; + // A MetricProducer is responsible for compute one single metrics, creating stats log report, and // writing the report to dropbox. MetricProducers should respond to package changes as required in // PackageInfoListener, but if none of the metrics are slicing by package name, then the update can // be a no-op. -class MetricProducer : public virtual android::RefBase { +class MetricProducer : public virtual android::RefBase, public virtual StateListener { public: MetricProducer(const int64_t& metricId, const ConfigKey& key, const int64_t timeBaseNs, - const int conditionIndex, const sp<ConditionWizard>& wizard) - : mMetricId(metricId), - mConfigKey(key), - mTimeBaseNs(timeBaseNs), - mCurrentBucketStartTimeNs(timeBaseNs), - mCurrentBucketNum(0), - mCondition(initialCondition(conditionIndex)), - mConditionSliced(false), - mWizard(wizard), - mConditionTrackerIndex(conditionIndex), - mContainANYPositionInDimensionsInWhat(false), - mSliceByPositionALL(false), - mSameConditionDimensionsInTracker(false), - mHasLinksToAllConditionDimensionsInTracker(false), - mIsActive(true) { - } + const int conditionIndex, const vector<ConditionState>& initialConditionCache, + const sp<ConditionWizard>& wizard, + const std::unordered_map<int, std::shared_ptr<Activation>>& eventActivationMap, + const std::unordered_map<int, std::vector<std::shared_ptr<Activation>>>& + eventDeactivationMap, + const vector<int>& slicedStateAtoms, + const unordered_map<int, unordered_map<int, int64_t>>& stateGroupMap); virtual ~MetricProducer(){}; - ConditionState initialCondition(const int conditionIndex) const { - return conditionIndex >= 0 ? ConditionState::kUnknown : ConditionState::kTrue; + ConditionState initialCondition(const int conditionIndex, + const vector<ConditionState>& initialConditionCache) const { + return conditionIndex >= 0 ? initialConditionCache[conditionIndex] : ConditionState::kTrue; } /** - * Forces this metric to split into a partial bucket right now. If we're past a full bucket, we - * first call the standard flushing code to flush up to the latest full bucket. Then we call - * the flush again when the end timestamp is forced to be now, and then after flushing, update - * the start timestamp to be now. + * Force a partial bucket split on app upgrade */ - virtual void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid, - const int64_t version) { + virtual void notifyAppUpgrade(const int64_t& eventTimeNs) { std::lock_guard<std::mutex> lock(mMutex); - - if (eventTimeNs > getCurrentBucketEndTimeNs()) { - // Flush full buckets on the normal path up to the latest bucket boundary. - flushIfNeededLocked(eventTimeNs); - } - // Now flush a partial bucket. - flushCurrentBucketLocked(eventTimeNs, eventTimeNs); - // Don't update the current bucket number so that the anomaly tracker knows this bucket - // is a partial bucket and can merge it with the previous bucket. + flushLocked(eventTimeNs); }; - void notifyAppRemoved(const int64_t& eventTimeNs, const string& apk, const int uid) { + void notifyAppRemoved(const int64_t& eventTimeNs) { // Force buckets to split on removal also. - notifyAppUpgrade(eventTimeNs, apk, uid, 0); + notifyAppUpgrade(eventTimeNs); }; + /** + * Force a partial bucket split on boot complete. + */ + virtual void onStatsdInitCompleted(const int64_t& eventTimeNs) { + std::lock_guard<std::mutex> lock(mMutex); + flushLocked(eventTimeNs); + } // Consume the parsed stats log entry that already matched the "what" of the metric. void onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event) { std::lock_guard<std::mutex> lock(mMutex); @@ -143,6 +185,10 @@ public: return mConditionSliced; }; + void onStateChanged(const int64_t eventTimeNs, const int32_t atomId, + const HashableDimensionKey& primaryKey, const FieldValue& oldState, + const FieldValue& newState){}; + // Output the metrics data to [protoOutput]. All metrics reports end with the same timestamp. // This method clears all the past buckets. void onDumpReport(const int64_t dumpTimeNs, @@ -161,9 +207,9 @@ public: return clearPastBucketsLocked(dumpTimeNs); } - void dumpStates(FILE* out, bool verbose) const { + void prepareFirstBucket() { std::lock_guard<std::mutex> lock(mMutex); - dumpStatesLocked(out, verbose); + prepareFirstBucketLocked(); } // Returns the memory in bytes currently used to store this metric's data. Does not change @@ -173,34 +219,9 @@ public: return byteSizeLocked(); } - /* If alert is valid, adds an AnomalyTracker and returns it. If invalid, returns nullptr. */ - virtual sp<AnomalyTracker> addAnomalyTracker(const Alert &alert, - const sp<AlarmMonitor>& anomalyAlarmMonitor) { - std::lock_guard<std::mutex> lock(mMutex); - sp<AnomalyTracker> anomalyTracker = new AnomalyTracker(alert, mConfigKey); - if (anomalyTracker != nullptr) { - mAnomalyTrackers.push_back(anomalyTracker); - } - return anomalyTracker; - } - - int64_t getBuckeSizeInNs() const { - std::lock_guard<std::mutex> lock(mMutex); - return mBucketSizeNs; - } - - // Only needed for unit-testing to override guardrail. - void setBucketSize(int64_t bucketSize) { - mBucketSizeNs = bucketSize; - } - - inline const int64_t& getMetricId() const { - return mMetricId; - } - - void loadActiveMetric(const ActiveMetric& activeMetric, int64_t currentTimeNs) { + void dumpStates(FILE* out, bool verbose) const { std::lock_guard<std::mutex> lock(mMutex); - loadActiveMetricLocked(activeMetric, currentTimeNs); + dumpStatesLocked(out, verbose); } // Let MetricProducer drop in-memory data to save memory. @@ -212,9 +233,9 @@ public: dropDataLocked(dropTimeNs); } - // For test only. - inline int64_t getCurrentBucketNum() const { - return mCurrentBucketNum; + void loadActiveMetric(const ActiveMetric& activeMetric, int64_t currentTimeNs) { + std::lock_guard<std::mutex> lock(mMutex); + loadActiveMetricLocked(activeMetric, currentTimeNs); } void activate(int activationTrackerIndex, int64_t elapsedTimestampNs) { @@ -232,59 +253,49 @@ public: return isActiveLocked(); } - void addActivation(int activationTrackerIndex, const ActivationType& activationType, - int64_t ttl_seconds, int deactivationTrackerIndex = -1); - - void prepareFirstBucket() { - std::lock_guard<std::mutex> lock(mMutex); - prepareFirstBucketLocked(); - } - void flushIfExpire(int64_t elapsedTimestampNs); void writeActiveMetricToProtoOutputStream( int64_t currentTimeNs, const DumpReportReason reason, ProtoOutputStream* proto); -protected: - virtual void onConditionChangedLocked(const bool condition, const int64_t eventTime) = 0; - virtual void onSlicedConditionMayChangeLocked(bool overallCondition, - const int64_t eventTime) = 0; - virtual void onDumpReportLocked(const int64_t dumpTimeNs, - const bool include_current_partial_bucket, - const bool erase_data, - const DumpLatency dumpLatency, - std::set<string> *str_set, - android::util::ProtoOutputStream* protoOutput) = 0; - virtual void clearPastBucketsLocked(const int64_t dumpTimeNs) = 0; - virtual size_t byteSizeLocked() const = 0; - virtual void dumpStatesLocked(FILE* out, bool verbose) const = 0; - bool evaluateActiveStateLocked(int64_t elapsedTimestampNs); + // Start: getters/setters + inline const int64_t& getMetricId() const { + return mMetricId; + } - void activateLocked(int activationTrackerIndex, int64_t elapsedTimestampNs); - void cancelEventActivationLocked(int deactivationTrackerIndex); + // For test only. + inline int64_t getCurrentBucketNum() const { + return mCurrentBucketNum; + } - inline bool isActiveLocked() const { - return mIsActive; + int64_t getBucketSizeInNs() const { + std::lock_guard<std::mutex> lock(mMutex); + return mBucketSizeNs; } - void loadActiveMetricLocked(const ActiveMetric& activeMetric, int64_t currentTimeNs); + inline const std::vector<int> getSlicedStateAtoms() { + std::lock_guard<std::mutex> lock(mMutex); + return mSlicedStateAtoms; + } - virtual void prepareFirstBucketLocked() {}; + /* If alert is valid, adds an AnomalyTracker and returns it. If invalid, returns nullptr. */ + virtual sp<AnomalyTracker> addAnomalyTracker(const Alert &alert, + const sp<AlarmMonitor>& anomalyAlarmMonitor) { + std::lock_guard<std::mutex> lock(mMutex); + sp<AnomalyTracker> anomalyTracker = new AnomalyTracker(alert, mConfigKey); + if (anomalyTracker != nullptr) { + mAnomalyTrackers.push_back(anomalyTracker); + } + return anomalyTracker; + } + // End: getters/setters +protected: /** - * Flushes the current bucket if the eventTime is after the current bucket's end time. This will - also flush the current partial bucket in memory. + * Flushes the current bucket if the eventTime is after the current bucket's end time. */ virtual void flushIfNeededLocked(const int64_t& eventTime){}; /** - * Flushes all the data including the current partial bucket. - */ - virtual void flushLocked(const int64_t& eventTimeNs) { - flushIfNeededLocked(eventTimeNs); - flushCurrentBucketLocked(eventTimeNs, eventTimeNs); - }; - - /** * For metrics that aggregate (ie, every metric producer except for EventMetricProducer), * we need to be able to flush the current buckets on demand (ie, end the current bucket and * start new bucket). If this function is called when eventTimeNs is greater than the current @@ -297,12 +308,66 @@ protected: virtual void flushCurrentBucketLocked(const int64_t& eventTimeNs, const int64_t& nextBucketStartTimeNs) {}; + /** + * Flushes all the data including the current partial bucket. + */ + virtual void flushLocked(const int64_t& eventTimeNs) { + flushIfNeededLocked(eventTimeNs); + flushCurrentBucketLocked(eventTimeNs, eventTimeNs); + }; + + /* + * Individual metrics can implement their own business logic here. All pre-processing is done. + * + * [matcherIndex]: the index of the matcher which matched this event. This is interesting to + * DurationMetric, because it has start/stop/stop_all 3 matchers. + * [eventKey]: the extracted dimension key for the final output. if the metric doesn't have + * dimensions, it will be DEFAULT_DIMENSION_KEY + * [conditionKey]: the keys of conditions which should be used to query the condition for this + * target event (from MetricConditionLink). This is passed to individual metrics + * because DurationMetric needs it to be cached. + * [condition]: whether condition is met. If condition is sliced, this is the result coming from + * query with ConditionWizard; If condition is not sliced, this is the + * nonSlicedCondition. + * [event]: the log event, just in case the metric needs its data, e.g., EventMetric. + */ + virtual void onMatchedLogEventInternalLocked( + const size_t matcherIndex, const MetricDimensionKey& eventKey, + const ConditionKey& conditionKey, bool condition, const LogEvent& event, + const map<int, HashableDimensionKey>& statePrimaryKeys) = 0; + + // Consume the parsed stats log entry that already matched the "what" of the metric. + virtual void onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event); + virtual void onConditionChangedLocked(const bool condition, const int64_t eventTime) = 0; + virtual void onSlicedConditionMayChangeLocked(bool overallCondition, + const int64_t eventTime) = 0; + virtual void onDumpReportLocked(const int64_t dumpTimeNs, + const bool include_current_partial_bucket, + const bool erase_data, + const DumpLatency dumpLatency, + std::set<string> *str_set, + android::util::ProtoOutputStream* protoOutput) = 0; + virtual void clearPastBucketsLocked(const int64_t dumpTimeNs) = 0; + virtual void prepareFirstBucketLocked(){}; + virtual size_t byteSizeLocked() const = 0; + virtual void dumpStatesLocked(FILE* out, bool verbose) const = 0; + virtual void dropDataLocked(const int64_t dropTimeNs) = 0; + void loadActiveMetricLocked(const ActiveMetric& activeMetric, int64_t currentTimeNs); + void activateLocked(int activationTrackerIndex, int64_t elapsedTimestampNs); + void cancelEventActivationLocked(int deactivationTrackerIndex); + + bool evaluateActiveStateLocked(int64_t elapsedTimestampNs); + virtual void onActiveStateChangedLocked(const int64_t& eventTimeNs) { if (!mIsActive) { flushLocked(eventTimeNs); } } + inline bool isActiveLocked() const { + return mIsActive; + } + // Convenience to compute the current bucket's end time, which is always aligned with the // start time of the metric. int64_t getCurrentBucketEndTimeNs() const { @@ -313,7 +378,25 @@ protected: return (endNs - mTimeBaseNs) / mBucketSizeNs - 1; } - virtual void dropDataLocked(const int64_t dropTimeNs) = 0; + // Query StateManager for original state value using the queryKey. + // The field and value are output. + void queryStateValue(const int32_t atomId, const HashableDimensionKey& queryKey, + FieldValue* value); + + // If a state map exists for the given atom, replace the original state + // value with the group id mapped to the value. + // If no state map exists, keep the original state value. + void mapStateValue(const int32_t atomId, FieldValue* value); + + // Returns a HashableDimensionKey with unknown state value for each state + // atom. + HashableDimensionKey getUnknownStateKey(); + + DropEvent buildDropEvent(const int64_t dropTimeNs, const BucketDropReason reason); + + // Returns true if the number of drop events in the current bucket has + // exceeded the maximum number allowed, which is currently capped at 10. + bool maxDropEventsReached(); const int64_t mMetricId; @@ -335,21 +418,17 @@ protected: ConditionState mCondition; + int mConditionTrackerIndex; + bool mConditionSliced; sp<ConditionWizard> mWizard; - int mConditionTrackerIndex; - - vector<Matcher> mDimensionsInWhat; // The dimensions_in_what defined in statsd_config - vector<Matcher> mDimensionsInCondition; // The dimensions_in_condition defined in statsd_config - bool mContainANYPositionInDimensionsInWhat; + bool mSliceByPositionALL; - // True iff the condition dimensions equal to the sliced dimensions in the simple condition - // tracker. This field is always false for combinational condition trackers. - bool mSameConditionDimensionsInTracker; + vector<Matcher> mDimensionsInWhat; // The dimensions_in_what defined in statsd_config // True iff the metric to condition links cover all dimension fields in the condition tracker. // This field is always false for combinational condition trackers. @@ -359,43 +438,8 @@ protected: std::vector<sp<AnomalyTracker>> mAnomalyTrackers; - /* - * Individual metrics can implement their own business logic here. All pre-processing is done. - * - * [matcherIndex]: the index of the matcher which matched this event. This is interesting to - * DurationMetric, because it has start/stop/stop_all 3 matchers. - * [eventKey]: the extracted dimension key for the final output. if the metric doesn't have - * dimensions, it will be DEFAULT_DIMENSION_KEY - * [conditionKey]: the keys of conditions which should be used to query the condition for this - * target event (from MetricConditionLink). This is passed to individual metrics - * because DurationMetric needs it to be cached. - * [condition]: whether condition is met. If condition is sliced, this is the result coming from - * query with ConditionWizard; If condition is not sliced, this is the - * nonSlicedCondition. - * [event]: the log event, just in case the metric needs its data, e.g., EventMetric. - */ - virtual void onMatchedLogEventInternalLocked( - const size_t matcherIndex, const MetricDimensionKey& eventKey, - const ConditionKey& conditionKey, bool condition, - const LogEvent& event) = 0; - - // Consume the parsed stats log entry that already matched the "what" of the metric. - virtual void onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event); - mutable std::mutex mMutex; - struct Activation { - Activation(const ActivationType& activationType, const int64_t ttlNs) - : ttl_ns(ttlNs), - start_ns(0), - state(ActivationState::kNotActive), - activationType(activationType) {} - - const int64_t ttl_ns; - int64_t start_ns; - ActivationState state; - const ActivationType activationType; - }; // When the metric producer has multiple activations, these activations are ORed to determine // whether the metric producer is ready to generate metrics. std::unordered_map<int, std::shared_ptr<Activation>> mEventActivationMap; @@ -405,12 +449,37 @@ protected: bool mIsActive; + // The slice_by_state atom ids defined in statsd_config. + const std::vector<int32_t> mSlicedStateAtoms; + + // Maps atom ids and state values to group_ids (<atom_id, <value, group_id>>). + const std::unordered_map<int32_t, std::unordered_map<int, int64_t>> mStateGroupMap; + + // MetricStateLinks defined in statsd_config that link fields in the state + // atom to fields in the "what" atom. + std::vector<Metric2State> mMetric2StateLinks; + + SkippedBucket mCurrentSkippedBucket; + // Buckets that were invalidated and had their data dropped. + std::vector<SkippedBucket> mSkippedBuckets; + + FRIEND_TEST(CountMetricE2eTest, TestSlicedState); + FRIEND_TEST(CountMetricE2eTest, TestSlicedStateWithMap); + FRIEND_TEST(CountMetricE2eTest, TestMultipleSlicedStates); + FRIEND_TEST(CountMetricE2eTest, TestSlicedStateWithPrimaryFields); + FRIEND_TEST(CountMetricE2eTest, TestInitialConditionChanges); + FRIEND_TEST(DurationMetricE2eTest, TestOneBucket); FRIEND_TEST(DurationMetricE2eTest, TestTwoBuckets); FRIEND_TEST(DurationMetricE2eTest, TestWithActivation); FRIEND_TEST(DurationMetricE2eTest, TestWithCondition); FRIEND_TEST(DurationMetricE2eTest, TestWithSlicedCondition); FRIEND_TEST(DurationMetricE2eTest, TestWithActivationAndSlicedCondition); + FRIEND_TEST(DurationMetricE2eTest, TestWithSlicedState); + FRIEND_TEST(DurationMetricE2eTest, TestWithConditionAndSlicedState); + FRIEND_TEST(DurationMetricE2eTest, TestWithSlicedStateMapped); + FRIEND_TEST(DurationMetricE2eTest, TestSlicedStatePrimaryFieldsNotSubsetDimInWhat); + FRIEND_TEST(DurationMetricE2eTest, TestWithSlicedStatePrimaryFieldsSubset); FRIEND_TEST(MetricActivationE2eTest, TestCountMetric); FRIEND_TEST(MetricActivationE2eTest, TestCountMetricWithOneDeactivation); @@ -424,6 +493,13 @@ protected: FRIEND_TEST(StatsLogProcessorTest, TestActivationOnBootMultipleActivationsDifferentActivationTypes); FRIEND_TEST(StatsLogProcessorTest, TestActivationsPersistAcrossSystemServerRestart); + + FRIEND_TEST(ValueMetricE2eTest, TestInitWithSlicedState); + FRIEND_TEST(ValueMetricE2eTest, TestInitWithSlicedState_WithDimensions); + FRIEND_TEST(ValueMetricE2eTest, TestInitWithSlicedState_WithIncorrectDimensions); + FRIEND_TEST(ValueMetricE2eTest, TestInitialConditionChanges); + + FRIEND_TEST(MetricsManagerTest, TestInitialConditions); }; } // namespace statsd |