diff options
Diffstat (limited to 'cmds/statsd/src/shell/ShellSubscriber.cpp')
-rw-r--r-- | cmds/statsd/src/shell/ShellSubscriber.cpp | 92 |
1 files changed, 34 insertions, 58 deletions
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp index 361b161c76ac..fd883c29dba0 100644 --- a/cmds/statsd/src/shell/ShellSubscriber.cpp +++ b/cmds/statsd/src/shell/ShellSubscriber.cpp @@ -41,13 +41,8 @@ void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { { std::unique_lock<std::mutex> lock(mMutex); - if (myToken != mToken) { - // Some other subscription has already come in. Stop. - return; - } mSubscriptionInfo = mySubscriptionInfo; - - spawnHelperThreadsLocked(mySubscriptionInfo, myToken); + spawnHelperThread(myToken); waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec); if (mSubscriptionInfo == mySubscriptionInfo) { @@ -57,14 +52,9 @@ void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { } } -void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) { - if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) { - std::thread puller([this, myToken] { startPull(myToken); }); - puller.detach(); - } - - std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); }); - heartbeatSender.detach(); +void ShellSubscriber::spawnHelperThread(int myToken) { + std::thread t([this, myToken] { pullAndSendHeartbeats(myToken); }); + t.detach(); } void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo, @@ -114,13 +104,7 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) subscriptionInfo->mPushedMatchers.push_back(pushed); } - int minInterval = -1; for (const auto& pulled : config.pulled()) { - // All intervals need to be multiples of the min interval. - if (minInterval < 0 || pulled.freq_millis() < minInterval) { - minInterval = pulled.freq_millis(); - } - vector<string> packages; vector<int32_t> uids; for (const string& pkg : pulled.packages()) { @@ -136,18 +120,18 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) uids); VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); } - subscriptionInfo->mPullIntervalMin = minInterval; return true; } -void ShellSubscriber::startPull(int myToken) { - VLOG("ShellSubscriber: pull thread %d starting", myToken); +void ShellSubscriber::pullAndSendHeartbeats(int myToken) { + VLOG("ShellSubscriber: helper thread %d starting", myToken); while (true) { + int64_t sleepTimeMs = INT_MAX; { std::lock_guard<std::mutex> lock(mMutex); if (!mSubscriptionInfo || mToken != myToken) { - VLOG("ShellSubscriber: pulling thread %d done!", myToken); + VLOG("ShellSubscriber: helper thread %d done!", myToken); return; } @@ -168,11 +152,27 @@ void ShellSubscriber::startPull(int myToken) { pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } + + // Send a heartbeat, consisting of a data size of 0, if perfd hasn't recently received + // data from statsd. When it receives the data size of 0, perfd will not expect any + // atoms and recheck whether the subscription should end. + if (nowMillis - mLastWriteMs > kMsBetweenHeartbeats) { + attemptWriteToPipeLocked(/*dataSize=*/0); + } + + // Determine how long to sleep before doing more work. + for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { + int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval; + int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative + if (timeBeforePull < sleepTimeMs) sleepTimeMs = timeBeforePull; + } + int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis; + if (timeBeforeHeartbeat < sleepTimeMs) sleepTimeMs = timeBeforeHeartbeat; } - VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken, - mSubscriptionInfo->mPullIntervalMin); - std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin)); + VLOG("ShellSubscriber: helper thread %d sleeping for %lld ms", myToken, + (long long)sleepTimeMs); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs)); } } @@ -200,7 +200,7 @@ void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEve } } - if (count > 0) attemptWriteToSocketLocked(mProto.size()); + if (count > 0) attemptWriteToPipeLocked(mProto.size()); } void ShellSubscriber::onLogEvent(const LogEvent& event) { @@ -214,26 +214,24 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) { util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event.ToProto(mProto); mProto.end(atomToken); - attemptWriteToSocketLocked(mProto.size()); + attemptWriteToPipeLocked(mProto.size()); } } } -// Tries to write the atom encoded in mProto to the socket. If the write fails +// Tries to write the atom encoded in mProto to the pipe. If the write fails // because the read end of the pipe has closed, signals to other threads that // the subscription should end. -void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) { - // First write the payload size. +void ShellSubscriber::attemptWriteToPipeLocked(size_t dataSize) { + // First, write the payload size. if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } - if (dataSize == 0) return; - - // Then, write the payload. - if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { + // Then, write the payload if this is not just a heartbeat. + if (dataSize > 0 && !mProto.flush(mSubscriptionInfo->mOutputFd)) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; @@ -242,28 +240,6 @@ void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) { mLastWriteMs = getElapsedRealtimeMillis(); } -// Send a heartbeat, consisting solely of a data size of 0, if perfd has not -// recently received any writes from statsd. When it receives the data size of -// 0, perfd will not expect any data and recheck whether the shell command is -// still running. -void ShellSubscriber::sendHeartbeats(int myToken) { - while (true) { - { - std::lock_guard<std::mutex> lock(mMutex); - if (!mSubscriptionInfo || myToken != mToken) { - VLOG("ShellSubscriber: heartbeat thread %d done!", myToken); - return; - } - - if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) { - VLOG("ShellSubscriber: sending a heartbeat to perfd"); - attemptWriteToSocketLocked(/*dataSize=*/0); - } - } - std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats)); - } -} - } // namespace statsd } // namespace os } // namespace android |