summaryrefslogtreecommitdiff
path: root/cmds/statsd/src/shell/ShellSubscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cmds/statsd/src/shell/ShellSubscriber.cpp')
-rw-r--r--cmds/statsd/src/shell/ShellSubscriber.cpp190
1 files changed, 109 insertions, 81 deletions
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index bed836a1bd90..7b687210ce33 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -19,6 +19,7 @@
#include "ShellSubscriber.h"
#include <android-base/file.h>
+
#include "matchers/matcher_util.h"
#include "stats_log_util.h"
@@ -32,42 +33,53 @@ const static int FIELD_ID_ATOM = 1;
void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
int myToken = claimToken();
+ VLOG("ShellSubscriber: new subscription %d has come in", myToken);
mSubscriptionShouldEnd.notify_one();
shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
- if (!readConfig(mySubscriptionInfo)) {
- return;
- }
+ if (!readConfig(mySubscriptionInfo)) return;
+
+ {
+ std::unique_lock<std::mutex> lock(mMutex);
+ if (myToken != mToken) {
+ // Some other subscription has already come in. Stop.
+ return;
+ }
+ mSubscriptionInfo = mySubscriptionInfo;
+
+ spawnHelperThreadsLocked(mySubscriptionInfo, myToken);
+ waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec);
+
+ if (mSubscriptionInfo == mySubscriptionInfo) {
+ mSubscriptionInfo = nullptr;
+ }
- // critical-section
- std::unique_lock<std::mutex> lock(mMutex);
- if (myToken != mToken) {
- // Some other subscription has already come in. Stop.
- return;
}
- mSubscriptionInfo = mySubscriptionInfo;
+}
- if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) {
- // This thread terminates after it detects that mToken has changed.
+void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) {
+ if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) {
std::thread puller([this, myToken] { startPull(myToken); });
puller.detach();
}
- // Block until subscription has ended.
+ std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); });
+ heartbeatSender.detach();
+}
+
+void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,
+ int myToken,
+ std::unique_lock<std::mutex>& lock,
+ int timeoutSec) {
if (timeoutSec > 0) {
- mSubscriptionShouldEnd.wait_for(
- lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] {
- return mToken != myToken || !mySubscriptionInfo->mClientAlive;
- });
+ mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] {
+ return mToken != myToken || !myInfo->mClientAlive;
+ });
} else {
- mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] {
- return mToken != myToken || !mySubscriptionInfo->mClientAlive;
+ mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] {
+ return mToken != myToken || !myInfo->mClientAlive;
});
}
-
- if (mSubscriptionInfo == mySubscriptionInfo) {
- mSubscriptionInfo = nullptr;
- }
}
// Atomically claim the next token. Token numbers denote subscriber ordering.
@@ -129,51 +141,55 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo)
return true;
}
-void ShellSubscriber::startPull(int64_t myToken) {
+void ShellSubscriber::startPull(int myToken) {
+ VLOG("ShellSubscriber: pull thread %d starting", myToken);
while (true) {
- std::lock_guard<std::mutex> lock(mMutex);
- if (!mSubscriptionInfo || mToken != myToken) {
- VLOG("Pulling thread %lld done!", (long long)myToken);
- return;
- }
+ {
+ std::lock_guard<std::mutex> lock(mMutex);
+ if (!mSubscriptionInfo || mToken != myToken) {
+ VLOG("ShellSubscriber: pulling thread %d done!", myToken);
+ return;
+ }
- int64_t nowMillis = getElapsedRealtimeMillis();
- for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) {
- if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
- vector<std::shared_ptr<LogEvent>> data;
- vector<int32_t> uids;
- uids.insert(uids.end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
- // This is slow. Consider storing the uids per app and listening to uidmap updates.
- for (const string& pkg : pullInfo.mPullPackages) {
- set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
- uids.insert(uids.end(), uidsForPkg.begin(), uidsForPkg.end());
+ int64_t nowMillis = getElapsedRealtimeMillis();
+ for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
+ if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) {
+ continue;
}
- uids.push_back(DEFAULT_PULL_UID);
+
+ vector<int32_t> uids;
+ getUidsForPullAtom(&uids, pullInfo);
+
+ vector<std::shared_ptr<LogEvent>> data;
mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, &data);
- VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
+ VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
+ writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
- if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) {
- mSubscriptionInfo->mClientAlive = false;
- mSubscriptionShouldEnd.notify_one();
- return;
- }
pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
}
}
- VLOG("Pulling thread %lld sleep....", (long long)myToken);
+ VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken,
+ mSubscriptionInfo->mPullIntervalMin);
std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
}
}
-// \return boolean indicating if writes were successful (will return false if
-// client dies)
-bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
+void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
+ uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
+ // This is slow. Consider storing the uids per app and listening to uidmap updates.
+ for (const string& pkg : pullInfo.mPullPackages) {
+ set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
+ uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
+ }
+ uids->push_back(DEFAULT_PULL_UID);
+}
+
+void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
const SimpleAtomMatcher& matcher) {
mProto.clear();
int count = 0;
for (const auto& event : data) {
- VLOG("%s", event->ToString().c_str());
if (matchesSimple(*mUidMap, matcher, *event)) {
count++;
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
@@ -183,55 +199,67 @@ bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEve
}
}
- if (count > 0) {
- // First write the payload size.
- size_t bufferSize = mProto.size();
- if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
- sizeof(bufferSize))) {
- return false;
- }
-
- VLOG("%d atoms, proto size: %zu", count, bufferSize);
- // Then write the payload.
- if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
- return false;
- }
- }
-
- return true;
+ if (count > 0) attemptWriteToSocketLocked(mProto.size());
}
void ShellSubscriber::onLogEvent(const LogEvent& event) {
std::lock_guard<std::mutex> lock(mMutex);
- if (!mSubscriptionInfo) {
- return;
- }
+ if (!mSubscriptionInfo) return;
mProto.clear();
for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
if (matchesSimple(*mUidMap, matcher, event)) {
- VLOG("%s", event.ToString().c_str());
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
event.ToProto(mProto);
mProto.end(atomToken);
+ attemptWriteToSocketLocked(mProto.size());
+ }
+ }
+}
- // First write the payload size.
- size_t bufferSize = mProto.size();
- if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
- sizeof(bufferSize))) {
- mSubscriptionInfo->mClientAlive = false;
- mSubscriptionShouldEnd.notify_one();
+// Tries to write the atom encoded in mProto to the socket. If the write fails
+// because the read end of the pipe has closed, signals to other threads that
+// the subscription should end.
+void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) {
+ // First write the payload size.
+ if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) {
+ mSubscriptionInfo->mClientAlive = false;
+ mSubscriptionShouldEnd.notify_one();
+ return;
+ }
+
+ if (dataSize == 0) return;
+
+ // Then, write the payload.
+ if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
+ mSubscriptionInfo->mClientAlive = false;
+ mSubscriptionShouldEnd.notify_one();
+ return;
+ }
+
+ mLastWriteMs = getElapsedRealtimeMillis();
+}
+
+// Send a heartbeat, consisting solely of a data size of 0, if perfd has not
+// recently received any writes from statsd. When it receives the data size of
+// 0, perfd will not expect any data and recheck whether the shell command is
+// still running.
+void ShellSubscriber::sendHeartbeats(int myToken) {
+ while (true) {
+ {
+ std::lock_guard<std::mutex> lock(mMutex);
+ if (!mSubscriptionInfo || myToken != mToken) {
+ VLOG("ShellSubscriber: heartbeat thread %d done!", myToken);
return;
}
- // Then write the payload.
- if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
- mSubscriptionInfo->mClientAlive = false;
- mSubscriptionShouldEnd.notify_one();
- return;
+ if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) {
+ VLOG("ShellSubscriber: sending a heartbeat to perfd");
+ attemptWriteToSocketLocked(/*dataSize=*/0);
}
}
+ std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats));
}
}