summaryrefslogtreecommitdiff
path: root/cmds/statsd/src/metrics/CountMetricProducer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cmds/statsd/src/metrics/CountMetricProducer.cpp')
-rw-r--r--cmds/statsd/src/metrics/CountMetricProducer.cpp298
1 files changed, 298 insertions, 0 deletions
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
new file mode 100644
index 000000000000..bd2674b86a46
--- /dev/null
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -0,0 +1,298 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define DEBUG false // STOPSHIP if true
+#include "Log.h"
+
+#include "CountMetricProducer.h"
+#include "guardrail/StatsdStats.h"
+#include "stats_util.h"
+#include "stats_log_util.h"
+
+#include <limits.h>
+#include <stdlib.h>
+
+using android::util::FIELD_COUNT_REPEATED;
+using android::util::FIELD_TYPE_BOOL;
+using android::util::FIELD_TYPE_FLOAT;
+using android::util::FIELD_TYPE_INT32;
+using android::util::FIELD_TYPE_INT64;
+using android::util::FIELD_TYPE_MESSAGE;
+using android::util::FIELD_TYPE_STRING;
+using android::util::ProtoOutputStream;
+using std::map;
+using std::string;
+using std::unordered_map;
+using std::vector;
+
+namespace android {
+namespace os {
+namespace statsd {
+
+// for StatsLogReport
+const int FIELD_ID_ID = 1;
+const int FIELD_ID_COUNT_METRICS = 5;
+// for CountMetricDataWrapper
+const int FIELD_ID_DATA = 1;
+// for CountMetricData
+const int FIELD_ID_DIMENSION_IN_WHAT = 1;
+const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
+const int FIELD_ID_BUCKET_INFO = 3;
+// for CountBucketInfo
+const int FIELD_ID_START_BUCKET_NANOS = 1;
+const int FIELD_ID_END_BUCKET_NANOS = 2;
+const int FIELD_ID_COUNT = 3;
+
+CountMetricProducer::CountMetricProducer(const ConfigKey& key, const CountMetric& metric,
+ const int conditionIndex,
+ const sp<ConditionWizard>& wizard,
+ const uint64_t startTimeNs)
+ : MetricProducer(metric.id(), key, startTimeNs, conditionIndex, wizard) {
+ // TODO: evaluate initial conditions. and set mConditionMet.
+ if (metric.has_bucket()) {
+ mBucketSizeNs = TimeUnitToBucketSizeInMillis(metric.bucket()) * 1000000;
+ } else {
+ mBucketSizeNs = LLONG_MAX;
+ }
+
+ if (metric.has_dimensions_in_what()) {
+ translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
+ }
+
+ if (metric.has_dimensions_in_condition()) {
+ translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition);
+ }
+
+ if (metric.links().size() > 0) {
+ for (const auto& link : metric.links()) {
+ Metric2Condition mc;
+ mc.conditionId = link.condition();
+ translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
+ translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
+ mMetric2ConditionLinks.push_back(mc);
+ }
+ mConditionSliced = true;
+ }
+
+ mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
+
+ VLOG("metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
+ (long long)mBucketSizeNs, (long long)mStartTimeNs);
+}
+
+CountMetricProducer::~CountMetricProducer() {
+ VLOG("~CountMetricProducer() called");
+}
+
+void CountMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
+ VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
+}
+
+void CountMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+ ProtoOutputStream* protoOutput) {
+ flushIfNeededLocked(dumpTimeNs);
+ if (mPastBuckets.empty()) {
+ return;
+ }
+
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
+ long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_COUNT_METRICS);
+
+ for (const auto& counter : mPastBuckets) {
+ const MetricDimensionKey& dimensionKey = counter.first;
+ VLOG(" dimension key %s", dimensionKey.c_str());
+
+ long long wrapperToken =
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
+
+ // First fill dimension.
+ long long dimensionInWhatToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
+ writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), protoOutput);
+ protoOutput->end(dimensionInWhatToken);
+
+ if (dimensionKey.hasDimensionKeyInCondition()) {
+ long long dimensionInConditionToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
+ writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), protoOutput);
+ protoOutput->end(dimensionInConditionToken);
+ }
+
+ // Then fill bucket_info (CountBucketInfo).
+ for (const auto& bucket : counter.second) {
+ long long bucketInfoToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+ (long long)bucket.mBucketStartNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+ (long long)bucket.mBucketEndNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_COUNT, (long long)bucket.mCount);
+ protoOutput->end(bucketInfoToken);
+ VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
+ (long long)bucket.mBucketEndNs, (long long)bucket.mCount);
+ }
+ protoOutput->end(wrapperToken);
+ }
+
+ protoOutput->end(protoToken);
+
+ mPastBuckets.clear();
+
+ // TODO: Clear mDimensionKeyMap once the report is dumped.
+}
+
+void CountMetricProducer::onConditionChangedLocked(const bool conditionMet,
+ const uint64_t eventTime) {
+ VLOG("Metric %lld onConditionChanged", (long long)mMetricId);
+ mCondition = conditionMet;
+}
+
+bool CountMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
+ if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) {
+ return false;
+ }
+ // ===========GuardRail==============
+ // 1. Report the tuple count if the tuple count > soft limit
+ if (mCurrentSlicedCounter->size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) {
+ size_t newTupleCount = mCurrentSlicedCounter->size() + 1;
+ StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
+ // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
+ if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) {
+ ALOGE("CountMetric %lld dropping data for dimension key %s",
+ (long long)mMetricId, newKey.c_str());
+ return true;
+ }
+ }
+
+ return false;
+}
+
+void CountMetricProducer::onMatchedLogEventInternalLocked(
+ const size_t matcherIndex, const MetricDimensionKey& eventKey,
+ const ConditionKey& conditionKey, bool condition,
+ const LogEvent& event) {
+ uint64_t eventTimeNs = event.GetTimestampNs();
+ flushIfNeededLocked(eventTimeNs);
+
+ if (condition == false) {
+ return;
+ }
+
+ auto it = mCurrentSlicedCounter->find(eventKey);
+ if (it == mCurrentSlicedCounter->end()) {
+ // ===========GuardRail==============
+ if (hitGuardRailLocked(eventKey)) {
+ return;
+ }
+ // create a counter for the new key
+ (*mCurrentSlicedCounter)[eventKey] = 1;
+ } else {
+ // increment the existing value
+ auto& count = it->second;
+ count++;
+ }
+ for (auto& tracker : mAnomalyTrackers) {
+ int64_t countWholeBucket = mCurrentSlicedCounter->find(eventKey)->second;
+ auto prev = mCurrentFullCounters->find(eventKey);
+ if (prev != mCurrentFullCounters->end()) {
+ countWholeBucket += prev->second;
+ }
+ tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
+ countWholeBucket);
+ }
+
+ VLOG("metric %lld %s->%lld", (long long)mMetricId, eventKey.c_str(),
+ (long long)(*mCurrentSlicedCounter)[eventKey]);
+}
+
+// When a new matched event comes in, we check if event falls into the current
+// bucket. If not, flush the old counter to past buckets and initialize the new bucket.
+void CountMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
+ uint64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
+ if (eventTimeNs < currentBucketEndTimeNs) {
+ return;
+ }
+
+ flushCurrentBucketLocked(eventTimeNs);
+ // Setup the bucket start time and number.
+ uint64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
+ mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
+ mCurrentBucketNum += numBucketsForward;
+ VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
+ (long long)mCurrentBucketStartTimeNs);
+}
+
+void CountMetricProducer::flushCurrentBucketLocked(const uint64_t& eventTimeNs) {
+ uint64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
+ CountBucket info;
+ info.mBucketStartNs = mCurrentBucketStartTimeNs;
+ if (eventTimeNs < fullBucketEndTimeNs) {
+ info.mBucketEndNs = eventTimeNs;
+ } else {
+ info.mBucketEndNs = fullBucketEndTimeNs;
+ }
+ info.mBucketNum = mCurrentBucketNum;
+ for (const auto& counter : *mCurrentSlicedCounter) {
+ info.mCount = counter.second;
+ auto& bucketList = mPastBuckets[counter.first];
+ bucketList.push_back(info);
+ VLOG("metric %lld, dump key value: %s -> %lld", (long long)mMetricId, counter.first.c_str(),
+ (long long)counter.second);
+ }
+
+ // If we have finished a full bucket, then send this to anomaly tracker.
+ if (eventTimeNs > fullBucketEndTimeNs) {
+ // Accumulate partial buckets with current value and then send to anomaly tracker.
+ if (mCurrentFullCounters->size() > 0) {
+ for (const auto& keyValuePair : *mCurrentSlicedCounter) {
+ (*mCurrentFullCounters)[keyValuePair.first] += keyValuePair.second;
+ }
+ for (auto& tracker : mAnomalyTrackers) {
+ tracker->addPastBucket(mCurrentFullCounters, mCurrentBucketNum);
+ }
+ mCurrentFullCounters = std::make_shared<DimToValMap>();
+ } else {
+ // Skip aggregating the partial buckets since there's no previous partial bucket.
+ for (auto& tracker : mAnomalyTrackers) {
+ tracker->addPastBucket(mCurrentSlicedCounter, mCurrentBucketNum);
+ }
+ }
+ } else {
+ // Accumulate partial bucket.
+ for (const auto& keyValuePair : *mCurrentSlicedCounter) {
+ (*mCurrentFullCounters)[keyValuePair.first] += keyValuePair.second;
+ }
+ }
+
+ // Only resets the counters, but doesn't setup the times nor numbers.
+ // (Do not clear since the old one is still referenced in mAnomalyTrackers).
+ mCurrentSlicedCounter = std::make_shared<DimToValMap>();
+}
+
+// Rough estimate of CountMetricProducer buffer stored. This number will be
+// greater than actual data size as it contains each dimension of
+// CountMetricData is duplicated.
+size_t CountMetricProducer::byteSizeLocked() const {
+ size_t totalSize = 0;
+ for (const auto& pair : mPastBuckets) {
+ totalSize += pair.second.size() * kBucketSize;
+ }
+ return totalSize;
+}
+
+} // namespace statsd
+} // namespace os
+} // namespace android