summaryrefslogtreecommitdiff
path: root/cmds/statsd/src/shell/ShellSubscriber.cpp
diff options
context:
space:
mode:
authorRuchir Rastogi <ruchirr@google.com>2020-04-22 09:03:22 -0700
committerRuchir Rastogi <ruchirr@google.com>2020-05-06 19:52:16 -0700
commit1e2405160447116c8fd9b6f09e2245381c55f23e (patch)
tree69aa374f6c4b852f8ba9e25d3970272a0e24a091 /cmds/statsd/src/shell/ShellSubscriber.cpp
parent98f012c3044b7c3c674da91bca80a123005ce998 (diff)
Fix ShellSubscriber concurrency issues
This CL creates a sendHeartbeat thread that ocassionally sends heartbeats, consisting of a dataSize of 0, to perfd. perfd will discard those heartbeats, recheck if the user has canceled the subscription, and if not, wait for more data from statsd. Sending heartbeats solves two big problems: (1) Allows statsd to robustly check if writes to the socket fail because the read end of the pipe has closed. Previously, if no atoms were pushed or pulled, statsd never attempted to write to perfd, so statsd could never detect the end of the subscription. However, now, writes are regularly made regardless of if statsd receives data. Note that even if atoms were pushed or pulled, there is no guarantee that they would have matched the atom matchers sent in perfd's config. (2) Allows perfd to escape a blocking read call and recheck whether the user has canceled the subscription. If no data is sent to perfd, perfd will block in this read call and the AndroidStudio UI will freeze up. Heartbeats are only sent if statsd has not sent any data to perfd within the last second, so we do not spam perfd with writes. + decomposes the startNewSubscription function + prevents startPull from holding the lock while sleeping Test: atest stastd_test Test: atest CtsStatsdHostTestCases Test: manually confirm that AndroidStudio is not freezing Bug: 153595161 Change-Id: I78f0818e8ed29bdadd02c151444ee7c9555623a4
Diffstat (limited to 'cmds/statsd/src/shell/ShellSubscriber.cpp')
-rw-r--r--cmds/statsd/src/shell/ShellSubscriber.cpp190
1 files changed, 109 insertions, 81 deletions
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index bed836a1bd90..7b687210ce33 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -19,6 +19,7 @@
#include "ShellSubscriber.h"
#include <android-base/file.h>
+
#include "matchers/matcher_util.h"
#include "stats_log_util.h"
@@ -32,42 +33,53 @@ const static int FIELD_ID_ATOM = 1;
void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
int myToken = claimToken();
+ VLOG("ShellSubscriber: new subscription %d has come in", myToken);
mSubscriptionShouldEnd.notify_one();
shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
- if (!readConfig(mySubscriptionInfo)) {
- return;
- }
+ if (!readConfig(mySubscriptionInfo)) return;
+
+ {
+ std::unique_lock<std::mutex> lock(mMutex);
+ if (myToken != mToken) {
+ // Some other subscription has already come in. Stop.
+ return;
+ }
+ mSubscriptionInfo = mySubscriptionInfo;
+
+ spawnHelperThreadsLocked(mySubscriptionInfo, myToken);
+ waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec);
+
+ if (mSubscriptionInfo == mySubscriptionInfo) {
+ mSubscriptionInfo = nullptr;
+ }
- // critical-section
- std::unique_lock<std::mutex> lock(mMutex);
- if (myToken != mToken) {
- // Some other subscription has already come in. Stop.
- return;
}
- mSubscriptionInfo = mySubscriptionInfo;
+}
- if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) {
- // This thread terminates after it detects that mToken has changed.
+void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) {
+ if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) {
std::thread puller([this, myToken] { startPull(myToken); });
puller.detach();
}
- // Block until subscription has ended.
+ std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); });
+ heartbeatSender.detach();
+}
+
+void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,
+ int myToken,
+ std::unique_lock<std::mutex>& lock,
+ int timeoutSec) {
if (timeoutSec > 0) {
- mSubscriptionShouldEnd.wait_for(
- lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] {
- return mToken != myToken || !mySubscriptionInfo->mClientAlive;
- });
+ mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] {
+ return mToken != myToken || !myInfo->mClientAlive;
+ });
} else {
- mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] {
- return mToken != myToken || !mySubscriptionInfo->mClientAlive;
+ mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] {
+ return mToken != myToken || !myInfo->mClientAlive;
});
}
-
- if (mSubscriptionInfo == mySubscriptionInfo) {
- mSubscriptionInfo = nullptr;
- }
}
// Atomically claim the next token. Token numbers denote subscriber ordering.
@@ -129,51 +141,55 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo)
return true;
}
-void ShellSubscriber::startPull(int64_t myToken) {
+void ShellSubscriber::startPull(int myToken) {
+ VLOG("ShellSubscriber: pull thread %d starting", myToken);
while (true) {
- std::lock_guard<std::mutex> lock(mMutex);
- if (!mSubscriptionInfo || mToken != myToken) {
- VLOG("Pulling thread %lld done!", (long long)myToken);
- return;
- }
+ {
+ std::lock_guard<std::mutex> lock(mMutex);
+ if (!mSubscriptionInfo || mToken != myToken) {
+ VLOG("ShellSubscriber: pulling thread %d done!", myToken);
+ return;
+ }
- int64_t nowMillis = getElapsedRealtimeMillis();
- for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) {
- if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
- vector<std::shared_ptr<LogEvent>> data;
- vector<int32_t> uids;
- uids.insert(uids.end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
- // This is slow. Consider storing the uids per app and listening to uidmap updates.
- for (const string& pkg : pullInfo.mPullPackages) {
- set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
- uids.insert(uids.end(), uidsForPkg.begin(), uidsForPkg.end());
+ int64_t nowMillis = getElapsedRealtimeMillis();
+ for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
+ if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) {
+ continue;
}
- uids.push_back(DEFAULT_PULL_UID);
+
+ vector<int32_t> uids;
+ getUidsForPullAtom(&uids, pullInfo);
+
+ vector<std::shared_ptr<LogEvent>> data;
mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, &data);
- VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
+ VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
+ writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
- if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) {
- mSubscriptionInfo->mClientAlive = false;
- mSubscriptionShouldEnd.notify_one();
- return;
- }
pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
}
}
- VLOG("Pulling thread %lld sleep....", (long long)myToken);
+ VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken,
+ mSubscriptionInfo->mPullIntervalMin);
std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
}
}
-// \return boolean indicating if writes were successful (will return false if
-// client dies)
-bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
+void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
+ uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
+ // This is slow. Consider storing the uids per app and listening to uidmap updates.
+ for (const string& pkg : pullInfo.mPullPackages) {
+ set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
+ uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
+ }
+ uids->push_back(DEFAULT_PULL_UID);
+}
+
+void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
const SimpleAtomMatcher& matcher) {
mProto.clear();
int count = 0;
for (const auto& event : data) {
- VLOG("%s", event->ToString().c_str());
if (matchesSimple(*mUidMap, matcher, *event)) {
count++;
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
@@ -183,55 +199,67 @@ bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEve
}
}
- if (count > 0) {
- // First write the payload size.
- size_t bufferSize = mProto.size();
- if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
- sizeof(bufferSize))) {
- return false;
- }
-
- VLOG("%d atoms, proto size: %zu", count, bufferSize);
- // Then write the payload.
- if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
- return false;
- }
- }
-
- return true;
+ if (count > 0) attemptWriteToSocketLocked(mProto.size());
}
void ShellSubscriber::onLogEvent(const LogEvent& event) {
std::lock_guard<std::mutex> lock(mMutex);
- if (!mSubscriptionInfo) {
- return;
- }
+ if (!mSubscriptionInfo) return;
mProto.clear();
for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
if (matchesSimple(*mUidMap, matcher, event)) {
- VLOG("%s", event.ToString().c_str());
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
event.ToProto(mProto);
mProto.end(atomToken);
+ attemptWriteToSocketLocked(mProto.size());
+ }
+ }
+}
- // First write the payload size.
- size_t bufferSize = mProto.size();
- if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
- sizeof(bufferSize))) {
- mSubscriptionInfo->mClientAlive = false;
- mSubscriptionShouldEnd.notify_one();
+// Tries to write the atom encoded in mProto to the socket. If the write fails
+// because the read end of the pipe has closed, signals to other threads that
+// the subscription should end.
+void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) {
+ // First write the payload size.
+ if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) {
+ mSubscriptionInfo->mClientAlive = false;
+ mSubscriptionShouldEnd.notify_one();
+ return;
+ }
+
+ if (dataSize == 0) return;
+
+ // Then, write the payload.
+ if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
+ mSubscriptionInfo->mClientAlive = false;
+ mSubscriptionShouldEnd.notify_one();
+ return;
+ }
+
+ mLastWriteMs = getElapsedRealtimeMillis();
+}
+
+// Send a heartbeat, consisting solely of a data size of 0, if perfd has not
+// recently received any writes from statsd. When it receives the data size of
+// 0, perfd will not expect any data and recheck whether the shell command is
+// still running.
+void ShellSubscriber::sendHeartbeats(int myToken) {
+ while (true) {
+ {
+ std::lock_guard<std::mutex> lock(mMutex);
+ if (!mSubscriptionInfo || myToken != mToken) {
+ VLOG("ShellSubscriber: heartbeat thread %d done!", myToken);
return;
}
- // Then write the payload.
- if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
- mSubscriptionInfo->mClientAlive = false;
- mSubscriptionShouldEnd.notify_one();
- return;
+ if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) {
+ VLOG("ShellSubscriber: sending a heartbeat to perfd");
+ attemptWriteToSocketLocked(/*dataSize=*/0);
}
}
+ std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats));
}
}