summaryrefslogtreecommitdiff
path: root/cmds/statsd/src/shell/ShellSubscriber.cpp
diff options
context:
space:
mode:
authorRuchir Rastogi <ruchirr@google.com>2020-02-10 17:40:09 -0800
committerRuchir Rastogi <ruchirr@google.com>2020-02-14 18:07:37 -0800
commite449b0c1855b9a8d967d8926bc295c1c1c16922c (patch)
tree72d7e8a428a1a5c8de8335abf08bc199a84410f8 /cmds/statsd/src/shell/ShellSubscriber.cpp
parent22b21595af9f70454607b7c806374cf80a0410bd (diff)
Move statsd (and tests) to libbinder_ndk
Major changes include: - Removing unused permission checks within StatsService. These include ENFORCE_DUMP_AND_USAGE_STATS, checkDumpAndUsageStats, kOpUsage, and kPermissionUsage. - Converting from sp to shared_ptr - Using libbinder_ndk functions instead of libbinder functions (e.g. for installing death recipients, getting calling uids, etc.) - New death recipients were added in StatsService, ConfigManager, and SubscriberReporter. - Using a unique token (timestamp) to identify shell subscribers instead of IResultReceiver because IResultReceiver is not exposed by libbinder_ndk. Currently, statsd cannot detect if perfd dies; we will fix that later. Bug: 145232107 Bug: 148609603 Test: m statsd Test: m statsd_test Test: bit stastd_test:* Test: atest GtsStatsdHostTestCases Change-Id: Ia1fda7280c22320bc4ebc8371acaadbe8eabcbd2
Diffstat (limited to 'cmds/statsd/src/shell/ShellSubscriber.cpp')
-rw-r--r--cmds/statsd/src/shell/ShellSubscriber.cpp199
1 files changed, 103 insertions, 96 deletions
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index d6a04336bc46..a861a3b76868 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -29,49 +29,88 @@ namespace statsd {
const static int FIELD_ID_ATOM = 1;
-void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver,
- int timeoutSec) {
+void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
VLOG("start new shell subscription");
+ int64_t subscriberId = getElapsedRealtimeNs();
+
{
std::lock_guard<std::mutex> lock(mMutex);
- if (mResultReceiver != nullptr) {
+ if (mSubscriberId> 0) {
VLOG("Only one shell subscriber is allowed.");
return;
}
+ mSubscriberId = subscriberId;
mInput = in;
mOutput = out;
- mResultReceiver = resultReceiver;
- IInterface::asBinder(mResultReceiver)->linkToDeath(this);
}
- // Note that the following is blocking, and it's intended as we cannot return until the shell
- // cmd exits, otherwise all resources & FDs will be automatically closed.
-
- // Read config forever until EOF is reached. Clients may send multiple configs -- each new
- // config replace the previous one.
- readConfig(in);
- VLOG("timeout : %d", timeoutSec);
+ bool success = readConfig();
+ if (!success) {
+ std::lock_guard<std::mutex> lock(mMutex);
+ cleanUpLocked();
+ }
- // Now we have read an EOF we now wait for the semaphore until the client exits.
- VLOG("Now wait for client to exit");
+ VLOG("Wait for client to exit or timeout (%d sec)", timeoutSec);
std::unique_lock<std::mutex> lk(mMutex);
+ // Note that the following is blocking, and it's intended as we cannot return until the shell
+ // cmd exits or we time out.
if (timeoutSec > 0) {
mShellDied.wait_for(lk, timeoutSec * 1s,
- [this, resultReceiver] { return mResultReceiver != resultReceiver; });
+ [this, subscriberId] { return mSubscriberId != subscriberId; });
} else {
- mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
+ mShellDied.wait(lk, [this, subscriberId] { return mSubscriberId != subscriberId; });
+ }
+}
+
+
+// Read configs until EOF is reached. There may be multiple configs in the input
+// -- each new config should replace the previous one.
+//
+// Returns a boolean indicating whether the input was read successfully.
+bool ShellSubscriber::readConfig() {
+ if (mInput < 0) {
+ return false;
+ }
+
+ while (true) {
+ // Read the size of the config.
+ size_t bufferSize = 0;
+ ssize_t bytesRead = read(mInput, &bufferSize, sizeof(bufferSize));
+ if (bytesRead == 0) {
+ VLOG("We have reached the end of the input.");
+ return true;
+ } else if (bytesRead < 0 || (size_t)bytesRead != sizeof(bufferSize)) {
+ ALOGE("Error reading config size");
+ return false;
+ }
+
+ // Read and parse the config.
+ vector<uint8_t> buffer(bufferSize);
+ bytesRead = read(mInput, buffer.data(), bufferSize);
+ if (bytesRead > 0 && (size_t)bytesRead == bufferSize) {
+ ShellSubscription config;
+ if (config.ParseFromArray(buffer.data(), bufferSize)) {
+ updateConfig(config);
+ } else {
+ ALOGE("Error parsing the config");
+ return false;
+ }
+ } else {
+ VLOG("Error reading the config, expected bytes: %zu, actual bytes: %zu", bufferSize,
+ bytesRead);
+ return false;
+ }
}
}
void ShellSubscriber::updateConfig(const ShellSubscription& config) {
- std::lock_guard<std::mutex> lock(mMutex);
mPushedMatchers.clear();
mPulledInfo.clear();
for (const auto& pushed : config.pushed()) {
mPushedMatchers.push_back(pushed);
- VLOG("adding matcher for atom %d", pushed.atom_id());
+ VLOG("adding matcher for pushed atom %d", pushed.atom_id());
}
int64_t token = getElapsedRealtimeNs();
@@ -89,46 +128,20 @@ void ShellSubscriber::updateConfig(const ShellSubscription& config) {
}
if (mPulledInfo.size() > 0 && minInterval > 0) {
- // This thread is guaranteed to terminate after it detects the token is different or
- // cleaned up.
+ // This thread is guaranteed to terminate after it detects the token is
+ // different.
std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
puller.detach();
}
}
-void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
- const SimpleAtomMatcher& matcher) {
- if (mOutput == 0) return;
- int count = 0;
- mProto.clear();
- for (const auto& event : data) {
- VLOG("%s", event->ToString().c_str());
- if (matchesSimple(*mUidMap, matcher, *event)) {
- VLOG("matched");
- count++;
- uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
- util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
- event->ToProto(mProto);
- mProto.end(atomToken);
- }
- }
-
- if (count > 0) {
- // First write the payload size.
- size_t bufferSize = mProto.size();
- write(mOutput, &bufferSize, sizeof(bufferSize));
- VLOG("%d atoms, proto size: %zu", count, bufferSize);
- // Then write the payload.
- mProto.flush(mOutput);
- }
- mProto.clear();
-}
-
void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
- while (1) {
+ while (true) {
int64_t nowMillis = getElapsedRealtimeMillis();
{
std::lock_guard<std::mutex> lock(mMutex);
+ // If the token has changed, the config has changed, so this
+ // puller can now stop.
if (mPulledInfo.size() == 0 || mPullToken != token) {
VLOG("Pulling thread %lld done!", (long long)token);
return;
@@ -152,55 +165,47 @@ void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
}
}
-void ShellSubscriber::readConfig(int in) {
- if (in <= 0) {
+// Must be called with the lock acquired, so that mProto isn't being written to
+// at the same time by multiple threads.
+void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
+ const SimpleAtomMatcher& matcher) {
+ if (mOutput < 0) {
return;
}
-
- while (1) {
- size_t bufferSize = 0;
- int result = 0;
- if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) {
- VLOG("Done reading");
- break;
- } else if (result < 0 || result != sizeof(bufferSize)) {
- ALOGE("Error reading config size");
- break;
- }
-
- vector<uint8_t> buffer(bufferSize);
- if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) {
- ShellSubscription config;
- if (config.ParseFromArray(buffer.data(), bufferSize)) {
- updateConfig(config);
- } else {
- ALOGE("error parsing the config");
- break;
- }
- } else {
- VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize);
- break;
+ int count = 0;
+ mProto.clear();
+ for (const auto& event : data) {
+ VLOG("%s", event->ToString().c_str());
+ if (matchesSimple(*mUidMap, matcher, *event)) {
+ VLOG("matched");
+ count++;
+ uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
+ util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
+ event->ToProto(mProto);
+ mProto.end(atomToken);
}
}
-}
-void ShellSubscriber::cleanUpLocked() {
- // The file descriptors will be closed by binder.
- mInput = 0;
- mOutput = 0;
- mResultReceiver = nullptr;
- mPushedMatchers.clear();
- mPulledInfo.clear();
- mPullToken = 0;
- VLOG("done clean up");
+ if (count > 0) {
+ // First write the payload size.
+ size_t bufferSize = mProto.size();
+ write(mOutput, &bufferSize, sizeof(bufferSize));
+
+ VLOG("%d atoms, proto size: %zu", count, bufferSize);
+ // Then write the payload.
+ mProto.flush(mOutput);
+ }
}
void ShellSubscriber::onLogEvent(const LogEvent& event) {
+ // Acquire a lock to prevent corruption from multiple threads writing to
+ // mProto.
std::lock_guard<std::mutex> lock(mMutex);
-
- if (mOutput <= 0) {
+ if (mOutput < 0) {
return;
}
+
+ mProto.clear();
for (const auto& matcher : mPushedMatchers) {
if (matchesSimple(*mUidMap, matcher, event)) {
VLOG("%s", event.ToString().c_str());
@@ -208,25 +213,27 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) {
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
event.ToProto(mProto);
mProto.end(atomToken);
+
// First write the payload size.
size_t bufferSize = mProto.size();
write(mOutput, &bufferSize, sizeof(bufferSize));
// Then write the payload.
mProto.flush(mOutput);
- mProto.clear();
- break;
}
}
}
-void ShellSubscriber::binderDied(const wp<IBinder>& who) {
- {
- VLOG("Shell exits");
- std::lock_guard<std::mutex> lock(mMutex);
- cleanUpLocked();
- }
- mShellDied.notify_all();
+void ShellSubscriber::cleanUpLocked() {
+ // The file descriptors will be closed by binder.
+ mInput = -1;
+ mOutput = -1;
+ mSubscriberId = 0;
+ mPushedMatchers.clear();
+ mPulledInfo.clear();
+ // Setting mPullToken == 0 tells pull thread that its work is done.
+ mPullToken = 0;
+ VLOG("done clean up");
}
} // namespace statsd