diff options
author | Baligh Uddin <baligh@google.com> | 2020-12-03 04:43:33 +0000 |
---|---|---|
committer | Baligh Uddin <baligh@google.com> | 2020-12-03 04:43:33 +0000 |
commit | b74ba2454caf3a69324c18d5a4a8183439e7d900 (patch) | |
tree | 7c3dfa8774188e7bcc2c769d5a9632f0e0dcd7ba /cmds/statsd/src/shell/ShellSubscriber.cpp | |
parent | 45b88d5001e9bcfc6673b1778da32de3cb85c729 (diff) |
Migrate frameworks/base/cmds/statsd to packages/modules/StatsD/bin
Add statsd protos to platform_protos rule
BUG: 167962588
TEST: TH
TEST: Local build [ crosshatch-userdebug ]
Change-Id: I053f2a211ea28c2f181937af3d58ad16b235d096
Diffstat (limited to 'cmds/statsd/src/shell/ShellSubscriber.cpp')
-rw-r--r-- | cmds/statsd/src/shell/ShellSubscriber.cpp | 245 |
1 files changed, 0 insertions, 245 deletions
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp deleted file mode 100644 index 9d8f0c24e253..000000000000 --- a/cmds/statsd/src/shell/ShellSubscriber.cpp +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Copyright (C) 2018 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#define DEBUG false // STOPSHIP if true -#include "Log.h" - -#include "ShellSubscriber.h" - -#include <android-base/file.h> - -#include "matchers/matcher_util.h" -#include "stats_log_util.h" - -using android::util::ProtoOutputStream; - -namespace android { -namespace os { -namespace statsd { - -const static int FIELD_ID_ATOM = 1; - -void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { - int myToken = claimToken(); - VLOG("ShellSubscriber: new subscription %d has come in", myToken); - mSubscriptionShouldEnd.notify_one(); - - shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out); - if (!readConfig(mySubscriptionInfo)) return; - - { - std::unique_lock<std::mutex> lock(mMutex); - mSubscriptionInfo = mySubscriptionInfo; - spawnHelperThread(myToken); - waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec); - - if (mSubscriptionInfo == mySubscriptionInfo) { - mSubscriptionInfo = nullptr; - } - - } -} - -void ShellSubscriber::spawnHelperThread(int myToken) { - std::thread t([this, myToken] { pullAndSendHeartbeats(myToken); }); - t.detach(); -} - -void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo, - int myToken, - std::unique_lock<std::mutex>& lock, - int timeoutSec) { - if (timeoutSec > 0) { - mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] { - return mToken != myToken || !myInfo->mClientAlive; - }); - } else { - mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] { - return mToken != myToken || !myInfo->mClientAlive; - }); - } -} - -// Atomically claim the next token. Token numbers denote subscriber ordering. -int ShellSubscriber::claimToken() { - std::unique_lock<std::mutex> lock(mMutex); - int myToken = ++mToken; - return myToken; -} - -// Read and parse single config. There should only one config per input. -bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) { - // Read the size of the config. - size_t bufferSize; - if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) { - return false; - } - - // Read the config. - vector<uint8_t> buffer(bufferSize); - if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) { - return false; - } - - // Parse the config. - ShellSubscription config; - if (!config.ParseFromArray(buffer.data(), bufferSize)) { - return false; - } - - // Update SubscriptionInfo with state from config - for (const auto& pushed : config.pushed()) { - subscriptionInfo->mPushedMatchers.push_back(pushed); - } - - for (const auto& pulled : config.pulled()) { - vector<string> packages; - vector<int32_t> uids; - for (const string& pkg : pulled.packages()) { - auto it = UidMap::sAidToUidMapping.find(pkg); - if (it != UidMap::sAidToUidMapping.end()) { - uids.push_back(it->second); - } else { - packages.push_back(pkg); - } - } - - subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis(), packages, - uids); - VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); - } - - return true; -} - -void ShellSubscriber::pullAndSendHeartbeats(int myToken) { - VLOG("ShellSubscriber: helper thread %d starting", myToken); - while (true) { - int64_t sleepTimeMs = INT_MAX; - { - std::lock_guard<std::mutex> lock(mMutex); - if (!mSubscriptionInfo || mToken != myToken) { - VLOG("ShellSubscriber: helper thread %d done!", myToken); - return; - } - - int64_t nowMillis = getElapsedRealtimeMillis(); - int64_t nowNanos = getElapsedRealtimeNs(); - for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { - if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) { - continue; - } - - vector<int32_t> uids; - getUidsForPullAtom(&uids, pullInfo); - - vector<std::shared_ptr<LogEvent>> data; - mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data); - VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); - writePulledAtomsLocked(data, pullInfo.mPullerMatcher); - - pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; - } - - // Send a heartbeat, consisting of a data size of 0, if perfd hasn't recently received - // data from statsd. When it receives the data size of 0, perfd will not expect any - // atoms and recheck whether the subscription should end. - if (nowMillis - mLastWriteMs > kMsBetweenHeartbeats) { - attemptWriteToPipeLocked(/*dataSize=*/0); - } - - // Determine how long to sleep before doing more work. - for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { - int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval; - int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative - if (timeBeforePull < sleepTimeMs) sleepTimeMs = timeBeforePull; - } - int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis; - if (timeBeforeHeartbeat < sleepTimeMs) sleepTimeMs = timeBeforeHeartbeat; - } - - VLOG("ShellSubscriber: helper thread %d sleeping for %lld ms", myToken, - (long long)sleepTimeMs); - std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs)); - } -} - -void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) { - uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end()); - // This is slow. Consider storing the uids per app and listening to uidmap updates. - for (const string& pkg : pullInfo.mPullPackages) { - set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg); - uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end()); - } - uids->push_back(DEFAULT_PULL_UID); -} - -void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, - const SimpleAtomMatcher& matcher) { - mProto.clear(); - int count = 0; - for (const auto& event : data) { - if (matchesSimple(mUidMap, matcher, *event)) { - count++; - uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | - util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); - event->ToProto(mProto); - mProto.end(atomToken); - } - } - - if (count > 0) attemptWriteToPipeLocked(mProto.size()); -} - -void ShellSubscriber::onLogEvent(const LogEvent& event) { - std::lock_guard<std::mutex> lock(mMutex); - if (!mSubscriptionInfo) return; - - mProto.clear(); - for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) { - if (matchesSimple(mUidMap, matcher, event)) { - uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | - util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); - event.ToProto(mProto); - mProto.end(atomToken); - attemptWriteToPipeLocked(mProto.size()); - } - } -} - -// Tries to write the atom encoded in mProto to the pipe. If the write fails -// because the read end of the pipe has closed, signals to other threads that -// the subscription should end. -void ShellSubscriber::attemptWriteToPipeLocked(size_t dataSize) { - // First, write the payload size. - if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) { - mSubscriptionInfo->mClientAlive = false; - mSubscriptionShouldEnd.notify_one(); - return; - } - - // Then, write the payload if this is not just a heartbeat. - if (dataSize > 0 && !mProto.flush(mSubscriptionInfo->mOutputFd)) { - mSubscriptionInfo->mClientAlive = false; - mSubscriptionShouldEnd.notify_one(); - return; - } - - mLastWriteMs = getElapsedRealtimeMillis(); -} - -} // namespace statsd -} // namespace os -} // namespace android |