diff options
Diffstat (limited to 'cmds/statsd/src/shell/ShellSubscriber.cpp')
-rw-r--r-- | cmds/statsd/src/shell/ShellSubscriber.cpp | 190 |
1 files changed, 109 insertions, 81 deletions
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp index bed836a1bd90..7b687210ce33 100644 --- a/cmds/statsd/src/shell/ShellSubscriber.cpp +++ b/cmds/statsd/src/shell/ShellSubscriber.cpp @@ -19,6 +19,7 @@ #include "ShellSubscriber.h" #include <android-base/file.h> + #include "matchers/matcher_util.h" #include "stats_log_util.h" @@ -32,42 +33,53 @@ const static int FIELD_ID_ATOM = 1; void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { int myToken = claimToken(); + VLOG("ShellSubscriber: new subscription %d has come in", myToken); mSubscriptionShouldEnd.notify_one(); shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out); - if (!readConfig(mySubscriptionInfo)) { - return; - } + if (!readConfig(mySubscriptionInfo)) return; + + { + std::unique_lock<std::mutex> lock(mMutex); + if (myToken != mToken) { + // Some other subscription has already come in. Stop. + return; + } + mSubscriptionInfo = mySubscriptionInfo; + + spawnHelperThreadsLocked(mySubscriptionInfo, myToken); + waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec); + + if (mSubscriptionInfo == mySubscriptionInfo) { + mSubscriptionInfo = nullptr; + } - // critical-section - std::unique_lock<std::mutex> lock(mMutex); - if (myToken != mToken) { - // Some other subscription has already come in. Stop. - return; } - mSubscriptionInfo = mySubscriptionInfo; +} - if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) { - // This thread terminates after it detects that mToken has changed. +void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) { + if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) { std::thread puller([this, myToken] { startPull(myToken); }); puller.detach(); } - // Block until subscription has ended. + std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); }); + heartbeatSender.detach(); +} + +void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo, + int myToken, + std::unique_lock<std::mutex>& lock, + int timeoutSec) { if (timeoutSec > 0) { - mSubscriptionShouldEnd.wait_for( - lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] { - return mToken != myToken || !mySubscriptionInfo->mClientAlive; - }); + mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] { + return mToken != myToken || !myInfo->mClientAlive; + }); } else { - mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] { - return mToken != myToken || !mySubscriptionInfo->mClientAlive; + mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] { + return mToken != myToken || !myInfo->mClientAlive; }); } - - if (mSubscriptionInfo == mySubscriptionInfo) { - mSubscriptionInfo = nullptr; - } } // Atomically claim the next token. Token numbers denote subscriber ordering. @@ -129,51 +141,55 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) return true; } -void ShellSubscriber::startPull(int64_t myToken) { +void ShellSubscriber::startPull(int myToken) { + VLOG("ShellSubscriber: pull thread %d starting", myToken); while (true) { - std::lock_guard<std::mutex> lock(mMutex); - if (!mSubscriptionInfo || mToken != myToken) { - VLOG("Pulling thread %lld done!", (long long)myToken); - return; - } + { + std::lock_guard<std::mutex> lock(mMutex); + if (!mSubscriptionInfo || mToken != myToken) { + VLOG("ShellSubscriber: pulling thread %d done!", myToken); + return; + } - int64_t nowMillis = getElapsedRealtimeMillis(); - for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) { - if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { - vector<std::shared_ptr<LogEvent>> data; - vector<int32_t> uids; - uids.insert(uids.end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end()); - // This is slow. Consider storing the uids per app and listening to uidmap updates. - for (const string& pkg : pullInfo.mPullPackages) { - set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg); - uids.insert(uids.end(), uidsForPkg.begin(), uidsForPkg.end()); + int64_t nowMillis = getElapsedRealtimeMillis(); + for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { + if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) { + continue; } - uids.push_back(DEFAULT_PULL_UID); + + vector<int32_t> uids; + getUidsForPullAtom(&uids, pullInfo); + + vector<std::shared_ptr<LogEvent>> data; mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, &data); - VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); + VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); + writePulledAtomsLocked(data, pullInfo.mPullerMatcher); - if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) { - mSubscriptionInfo->mClientAlive = false; - mSubscriptionShouldEnd.notify_one(); - return; - } pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } } - VLOG("Pulling thread %lld sleep....", (long long)myToken); + VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken, + mSubscriptionInfo->mPullIntervalMin); std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin)); } } -// \return boolean indicating if writes were successful (will return false if -// client dies) -bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, +void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) { + uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end()); + // This is slow. Consider storing the uids per app and listening to uidmap updates. + for (const string& pkg : pullInfo.mPullPackages) { + set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg); + uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end()); + } + uids->push_back(DEFAULT_PULL_UID); +} + +void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, const SimpleAtomMatcher& matcher) { mProto.clear(); int count = 0; for (const auto& event : data) { - VLOG("%s", event->ToString().c_str()); if (matchesSimple(*mUidMap, matcher, *event)) { count++; uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | @@ -183,55 +199,67 @@ bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEve } } - if (count > 0) { - // First write the payload size. - size_t bufferSize = mProto.size(); - if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, - sizeof(bufferSize))) { - return false; - } - - VLOG("%d atoms, proto size: %zu", count, bufferSize); - // Then write the payload. - if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { - return false; - } - } - - return true; + if (count > 0) attemptWriteToSocketLocked(mProto.size()); } void ShellSubscriber::onLogEvent(const LogEvent& event) { std::lock_guard<std::mutex> lock(mMutex); - if (!mSubscriptionInfo) { - return; - } + if (!mSubscriptionInfo) return; mProto.clear(); for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { - VLOG("%s", event.ToString().c_str()); uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event.ToProto(mProto); mProto.end(atomToken); + attemptWriteToSocketLocked(mProto.size()); + } + } +} - // First write the payload size. - size_t bufferSize = mProto.size(); - if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, - sizeof(bufferSize))) { - mSubscriptionInfo->mClientAlive = false; - mSubscriptionShouldEnd.notify_one(); +// Tries to write the atom encoded in mProto to the socket. If the write fails +// because the read end of the pipe has closed, signals to other threads that +// the subscription should end. +void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) { + // First write the payload size. + if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) { + mSubscriptionInfo->mClientAlive = false; + mSubscriptionShouldEnd.notify_one(); + return; + } + + if (dataSize == 0) return; + + // Then, write the payload. + if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { + mSubscriptionInfo->mClientAlive = false; + mSubscriptionShouldEnd.notify_one(); + return; + } + + mLastWriteMs = getElapsedRealtimeMillis(); +} + +// Send a heartbeat, consisting solely of a data size of 0, if perfd has not +// recently received any writes from statsd. When it receives the data size of +// 0, perfd will not expect any data and recheck whether the shell command is +// still running. +void ShellSubscriber::sendHeartbeats(int myToken) { + while (true) { + { + std::lock_guard<std::mutex> lock(mMutex); + if (!mSubscriptionInfo || myToken != mToken) { + VLOG("ShellSubscriber: heartbeat thread %d done!", myToken); return; } - // Then write the payload. - if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { - mSubscriptionInfo->mClientAlive = false; - mSubscriptionShouldEnd.notify_one(); - return; + if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) { + VLOG("ShellSubscriber: sending a heartbeat to perfd"); + attemptWriteToSocketLocked(/*dataSize=*/0); } } + std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats)); } } |