summaryrefslogtreecommitdiff
path: root/cmds/statsd/src/shell/ShellSubscriber.cpp
diff options
context:
space:
mode:
authorXin Li <delphij@google.com>2020-09-10 17:22:01 +0000
committerGerrit Code Review <noreply-gerritcodereview@google.com>2020-09-10 17:22:01 +0000
commit8ac6741e47c76bde065f868ea64d2f04541487b9 (patch)
tree1a679458fdbd8d370692d56791e2bf83acee35b5 /cmds/statsd/src/shell/ShellSubscriber.cpp
parent3de940cc40b1e3fdf8224e18a8308a16768cbfa8 (diff)
parentc64112eb974e9aa7638aead998f07a868acfb5a7 (diff)
Merge "Merge Android R"
Diffstat (limited to 'cmds/statsd/src/shell/ShellSubscriber.cpp')
-rw-r--r--cmds/statsd/src/shell/ShellSubscriber.cpp301
1 files changed, 156 insertions, 145 deletions
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index d6a04336bc46..fd883c29dba0 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -18,6 +18,8 @@
#include "ShellSubscriber.h"
+#include <android-base/file.h>
+
#include "matchers/matcher_util.h"
#include "stats_log_util.h"
@@ -29,204 +31,213 @@ namespace statsd {
const static int FIELD_ID_ATOM = 1;
-void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver,
- int timeoutSec) {
- VLOG("start new shell subscription");
+void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
+ int myToken = claimToken();
+ VLOG("ShellSubscriber: new subscription %d has come in", myToken);
+ mSubscriptionShouldEnd.notify_one();
+
+ shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
+ if (!readConfig(mySubscriptionInfo)) return;
+
{
- std::lock_guard<std::mutex> lock(mMutex);
- if (mResultReceiver != nullptr) {
- VLOG("Only one shell subscriber is allowed.");
- return;
- }
- mInput = in;
- mOutput = out;
- mResultReceiver = resultReceiver;
- IInterface::asBinder(mResultReceiver)->linkToDeath(this);
- }
+ std::unique_lock<std::mutex> lock(mMutex);
+ mSubscriptionInfo = mySubscriptionInfo;
+ spawnHelperThread(myToken);
+ waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec);
- // Note that the following is blocking, and it's intended as we cannot return until the shell
- // cmd exits, otherwise all resources & FDs will be automatically closed.
+ if (mSubscriptionInfo == mySubscriptionInfo) {
+ mSubscriptionInfo = nullptr;
+ }
- // Read config forever until EOF is reached. Clients may send multiple configs -- each new
- // config replace the previous one.
- readConfig(in);
- VLOG("timeout : %d", timeoutSec);
+ }
+}
- // Now we have read an EOF we now wait for the semaphore until the client exits.
- VLOG("Now wait for client to exit");
- std::unique_lock<std::mutex> lk(mMutex);
+void ShellSubscriber::spawnHelperThread(int myToken) {
+ std::thread t([this, myToken] { pullAndSendHeartbeats(myToken); });
+ t.detach();
+}
+void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,
+ int myToken,
+ std::unique_lock<std::mutex>& lock,
+ int timeoutSec) {
if (timeoutSec > 0) {
- mShellDied.wait_for(lk, timeoutSec * 1s,
- [this, resultReceiver] { return mResultReceiver != resultReceiver; });
+ mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] {
+ return mToken != myToken || !myInfo->mClientAlive;
+ });
} else {
- mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
+ mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] {
+ return mToken != myToken || !myInfo->mClientAlive;
+ });
}
}
-void ShellSubscriber::updateConfig(const ShellSubscription& config) {
- std::lock_guard<std::mutex> lock(mMutex);
- mPushedMatchers.clear();
- mPulledInfo.clear();
+// Atomically claim the next token. Token numbers denote subscriber ordering.
+int ShellSubscriber::claimToken() {
+ std::unique_lock<std::mutex> lock(mMutex);
+ int myToken = ++mToken;
+ return myToken;
+}
- for (const auto& pushed : config.pushed()) {
- mPushedMatchers.push_back(pushed);
- VLOG("adding matcher for atom %d", pushed.atom_id());
+// Read and parse single config. There should only one config per input.
+bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) {
+ // Read the size of the config.
+ size_t bufferSize;
+ if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) {
+ return false;
}
- int64_t token = getElapsedRealtimeNs();
- mPullToken = token;
-
- int64_t minInterval = -1;
- for (const auto& pulled : config.pulled()) {
- // All intervals need to be multiples of the min interval.
- if (minInterval < 0 || pulled.freq_millis() < minInterval) {
- minInterval = pulled.freq_millis();
- }
+ // Read the config.
+ vector<uint8_t> buffer(bufferSize);
+ if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) {
+ return false;
+ }
- mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
- VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
+ // Parse the config.
+ ShellSubscription config;
+ if (!config.ParseFromArray(buffer.data(), bufferSize)) {
+ return false;
}
- if (mPulledInfo.size() > 0 && minInterval > 0) {
- // This thread is guaranteed to terminate after it detects the token is different or
- // cleaned up.
- std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
- puller.detach();
+ // Update SubscriptionInfo with state from config
+ for (const auto& pushed : config.pushed()) {
+ subscriptionInfo->mPushedMatchers.push_back(pushed);
}
-}
-void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
- const SimpleAtomMatcher& matcher) {
- if (mOutput == 0) return;
- int count = 0;
- mProto.clear();
- for (const auto& event : data) {
- VLOG("%s", event->ToString().c_str());
- if (matchesSimple(*mUidMap, matcher, *event)) {
- VLOG("matched");
- count++;
- uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
- util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
- event->ToProto(mProto);
- mProto.end(atomToken);
+ for (const auto& pulled : config.pulled()) {
+ vector<string> packages;
+ vector<int32_t> uids;
+ for (const string& pkg : pulled.packages()) {
+ auto it = UidMap::sAidToUidMapping.find(pkg);
+ if (it != UidMap::sAidToUidMapping.end()) {
+ uids.push_back(it->second);
+ } else {
+ packages.push_back(pkg);
+ }
}
- }
- if (count > 0) {
- // First write the payload size.
- size_t bufferSize = mProto.size();
- write(mOutput, &bufferSize, sizeof(bufferSize));
- VLOG("%d atoms, proto size: %zu", count, bufferSize);
- // Then write the payload.
- mProto.flush(mOutput);
+ subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis(), packages,
+ uids);
+ VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
}
- mProto.clear();
+
+ return true;
}
-void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
- while (1) {
- int64_t nowMillis = getElapsedRealtimeMillis();
+void ShellSubscriber::pullAndSendHeartbeats(int myToken) {
+ VLOG("ShellSubscriber: helper thread %d starting", myToken);
+ while (true) {
+ int64_t sleepTimeMs = INT_MAX;
{
std::lock_guard<std::mutex> lock(mMutex);
- if (mPulledInfo.size() == 0 || mPullToken != token) {
- VLOG("Pulling thread %lld done!", (long long)token);
+ if (!mSubscriptionInfo || mToken != myToken) {
+ VLOG("ShellSubscriber: helper thread %d done!", myToken);
return;
}
- for (auto& pullInfo : mPulledInfo) {
- if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
- VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
-
- vector<std::shared_ptr<LogEvent>> data;
- mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
- VLOG("pulled %zu atoms", data.size());
- if (data.size() > 0) {
- writeToOutputLocked(data, pullInfo.mPullerMatcher);
- }
- pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
+
+ int64_t nowMillis = getElapsedRealtimeMillis();
+ int64_t nowNanos = getElapsedRealtimeNs();
+ for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
+ if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) {
+ continue;
}
+
+ vector<int32_t> uids;
+ getUidsForPullAtom(&uids, pullInfo);
+
+ vector<std::shared_ptr<LogEvent>> data;
+ mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data);
+ VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
+ writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
+
+ pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
}
+
+ // Send a heartbeat, consisting of a data size of 0, if perfd hasn't recently received
+ // data from statsd. When it receives the data size of 0, perfd will not expect any
+ // atoms and recheck whether the subscription should end.
+ if (nowMillis - mLastWriteMs > kMsBetweenHeartbeats) {
+ attemptWriteToPipeLocked(/*dataSize=*/0);
+ }
+
+ // Determine how long to sleep before doing more work.
+ for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
+ int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval;
+ int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative
+ if (timeBeforePull < sleepTimeMs) sleepTimeMs = timeBeforePull;
+ }
+ int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis;
+ if (timeBeforeHeartbeat < sleepTimeMs) sleepTimeMs = timeBeforeHeartbeat;
}
- VLOG("Pulling thread %lld sleep....", (long long)token);
- std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
+
+ VLOG("ShellSubscriber: helper thread %d sleeping for %lld ms", myToken,
+ (long long)sleepTimeMs);
+ std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs));
}
}
-void ShellSubscriber::readConfig(int in) {
- if (in <= 0) {
- return;
+void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
+ uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
+ // This is slow. Consider storing the uids per app and listening to uidmap updates.
+ for (const string& pkg : pullInfo.mPullPackages) {
+ set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
+ uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
}
+ uids->push_back(DEFAULT_PULL_UID);
+}
- while (1) {
- size_t bufferSize = 0;
- int result = 0;
- if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) {
- VLOG("Done reading");
- break;
- } else if (result < 0 || result != sizeof(bufferSize)) {
- ALOGE("Error reading config size");
- break;
- }
-
- vector<uint8_t> buffer(bufferSize);
- if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) {
- ShellSubscription config;
- if (config.ParseFromArray(buffer.data(), bufferSize)) {
- updateConfig(config);
- } else {
- ALOGE("error parsing the config");
- break;
- }
- } else {
- VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize);
- break;
+void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
+ const SimpleAtomMatcher& matcher) {
+ mProto.clear();
+ int count = 0;
+ for (const auto& event : data) {
+ if (matchesSimple(*mUidMap, matcher, *event)) {
+ count++;
+ uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
+ util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
+ event->ToProto(mProto);
+ mProto.end(atomToken);
}
}
-}
-void ShellSubscriber::cleanUpLocked() {
- // The file descriptors will be closed by binder.
- mInput = 0;
- mOutput = 0;
- mResultReceiver = nullptr;
- mPushedMatchers.clear();
- mPulledInfo.clear();
- mPullToken = 0;
- VLOG("done clean up");
+ if (count > 0) attemptWriteToPipeLocked(mProto.size());
}
void ShellSubscriber::onLogEvent(const LogEvent& event) {
std::lock_guard<std::mutex> lock(mMutex);
+ if (!mSubscriptionInfo) return;
- if (mOutput <= 0) {
- return;
- }
- for (const auto& matcher : mPushedMatchers) {
+ mProto.clear();
+ for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
if (matchesSimple(*mUidMap, matcher, event)) {
- VLOG("%s", event.ToString().c_str());
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
event.ToProto(mProto);
mProto.end(atomToken);
- // First write the payload size.
- size_t bufferSize = mProto.size();
- write(mOutput, &bufferSize, sizeof(bufferSize));
-
- // Then write the payload.
- mProto.flush(mOutput);
- mProto.clear();
- break;
+ attemptWriteToPipeLocked(mProto.size());
}
}
}
-void ShellSubscriber::binderDied(const wp<IBinder>& who) {
- {
- VLOG("Shell exits");
- std::lock_guard<std::mutex> lock(mMutex);
- cleanUpLocked();
+// Tries to write the atom encoded in mProto to the pipe. If the write fails
+// because the read end of the pipe has closed, signals to other threads that
+// the subscription should end.
+void ShellSubscriber::attemptWriteToPipeLocked(size_t dataSize) {
+ // First, write the payload size.
+ if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) {
+ mSubscriptionInfo->mClientAlive = false;
+ mSubscriptionShouldEnd.notify_one();
+ return;
}
- mShellDied.notify_all();
+
+ // Then, write the payload if this is not just a heartbeat.
+ if (dataSize > 0 && !mProto.flush(mSubscriptionInfo->mOutputFd)) {
+ mSubscriptionInfo->mClientAlive = false;
+ mSubscriptionShouldEnd.notify_one();
+ return;
+ }
+
+ mLastWriteMs = getElapsedRealtimeMillis();
}
} // namespace statsd