diff options
author | Ruchir Rastogi <ruchirr@google.com> | 2020-02-10 17:40:09 -0800 |
---|---|---|
committer | Ruchir Rastogi <ruchirr@google.com> | 2020-02-14 18:07:37 -0800 |
commit | e449b0c1855b9a8d967d8926bc295c1c1c16922c (patch) | |
tree | 72d7e8a428a1a5c8de8335abf08bc199a84410f8 /cmds/statsd/src/shell/ShellSubscriber.cpp | |
parent | 22b21595af9f70454607b7c806374cf80a0410bd (diff) |
Move statsd (and tests) to libbinder_ndk
Major changes include:
- Removing unused permission checks within StatsService. These
include ENFORCE_DUMP_AND_USAGE_STATS, checkDumpAndUsageStats,
kOpUsage, and kPermissionUsage.
- Converting from sp to shared_ptr
- Using libbinder_ndk functions instead of libbinder functions
(e.g. for installing death recipients, getting calling uids, etc.)
- New death recipients were added in StatsService,
ConfigManager, and SubscriberReporter.
- Using a unique token (timestamp) to identify shell subscribers
instead of IResultReceiver because IResultReceiver is not exposed by
libbinder_ndk. Currently, statsd cannot detect if perfd dies; we
will fix that later.
Bug: 145232107
Bug: 148609603
Test: m statsd
Test: m statsd_test
Test: bit stastd_test:*
Test: atest GtsStatsdHostTestCases
Change-Id: Ia1fda7280c22320bc4ebc8371acaadbe8eabcbd2
Diffstat (limited to 'cmds/statsd/src/shell/ShellSubscriber.cpp')
-rw-r--r-- | cmds/statsd/src/shell/ShellSubscriber.cpp | 199 |
1 files changed, 103 insertions, 96 deletions
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp index d6a04336bc46..a861a3b76868 100644 --- a/cmds/statsd/src/shell/ShellSubscriber.cpp +++ b/cmds/statsd/src/shell/ShellSubscriber.cpp @@ -29,49 +29,88 @@ namespace statsd { const static int FIELD_ID_ATOM = 1; -void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver, - int timeoutSec) { +void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { VLOG("start new shell subscription"); + int64_t subscriberId = getElapsedRealtimeNs(); + { std::lock_guard<std::mutex> lock(mMutex); - if (mResultReceiver != nullptr) { + if (mSubscriberId> 0) { VLOG("Only one shell subscriber is allowed."); return; } + mSubscriberId = subscriberId; mInput = in; mOutput = out; - mResultReceiver = resultReceiver; - IInterface::asBinder(mResultReceiver)->linkToDeath(this); } - // Note that the following is blocking, and it's intended as we cannot return until the shell - // cmd exits, otherwise all resources & FDs will be automatically closed. - - // Read config forever until EOF is reached. Clients may send multiple configs -- each new - // config replace the previous one. - readConfig(in); - VLOG("timeout : %d", timeoutSec); + bool success = readConfig(); + if (!success) { + std::lock_guard<std::mutex> lock(mMutex); + cleanUpLocked(); + } - // Now we have read an EOF we now wait for the semaphore until the client exits. - VLOG("Now wait for client to exit"); + VLOG("Wait for client to exit or timeout (%d sec)", timeoutSec); std::unique_lock<std::mutex> lk(mMutex); + // Note that the following is blocking, and it's intended as we cannot return until the shell + // cmd exits or we time out. if (timeoutSec > 0) { mShellDied.wait_for(lk, timeoutSec * 1s, - [this, resultReceiver] { return mResultReceiver != resultReceiver; }); + [this, subscriberId] { return mSubscriberId != subscriberId; }); } else { - mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; }); + mShellDied.wait(lk, [this, subscriberId] { return mSubscriberId != subscriberId; }); + } +} + + +// Read configs until EOF is reached. There may be multiple configs in the input +// -- each new config should replace the previous one. +// +// Returns a boolean indicating whether the input was read successfully. +bool ShellSubscriber::readConfig() { + if (mInput < 0) { + return false; + } + + while (true) { + // Read the size of the config. + size_t bufferSize = 0; + ssize_t bytesRead = read(mInput, &bufferSize, sizeof(bufferSize)); + if (bytesRead == 0) { + VLOG("We have reached the end of the input."); + return true; + } else if (bytesRead < 0 || (size_t)bytesRead != sizeof(bufferSize)) { + ALOGE("Error reading config size"); + return false; + } + + // Read and parse the config. + vector<uint8_t> buffer(bufferSize); + bytesRead = read(mInput, buffer.data(), bufferSize); + if (bytesRead > 0 && (size_t)bytesRead == bufferSize) { + ShellSubscription config; + if (config.ParseFromArray(buffer.data(), bufferSize)) { + updateConfig(config); + } else { + ALOGE("Error parsing the config"); + return false; + } + } else { + VLOG("Error reading the config, expected bytes: %zu, actual bytes: %zu", bufferSize, + bytesRead); + return false; + } } } void ShellSubscriber::updateConfig(const ShellSubscription& config) { - std::lock_guard<std::mutex> lock(mMutex); mPushedMatchers.clear(); mPulledInfo.clear(); for (const auto& pushed : config.pushed()) { mPushedMatchers.push_back(pushed); - VLOG("adding matcher for atom %d", pushed.atom_id()); + VLOG("adding matcher for pushed atom %d", pushed.atom_id()); } int64_t token = getElapsedRealtimeNs(); @@ -89,46 +128,20 @@ void ShellSubscriber::updateConfig(const ShellSubscription& config) { } if (mPulledInfo.size() > 0 && minInterval > 0) { - // This thread is guaranteed to terminate after it detects the token is different or - // cleaned up. + // This thread is guaranteed to terminate after it detects the token is + // different. std::thread puller([token, minInterval, this] { startPull(token, minInterval); }); puller.detach(); } } -void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, - const SimpleAtomMatcher& matcher) { - if (mOutput == 0) return; - int count = 0; - mProto.clear(); - for (const auto& event : data) { - VLOG("%s", event->ToString().c_str()); - if (matchesSimple(*mUidMap, matcher, *event)) { - VLOG("matched"); - count++; - uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | - util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); - event->ToProto(mProto); - mProto.end(atomToken); - } - } - - if (count > 0) { - // First write the payload size. - size_t bufferSize = mProto.size(); - write(mOutput, &bufferSize, sizeof(bufferSize)); - VLOG("%d atoms, proto size: %zu", count, bufferSize); - // Then write the payload. - mProto.flush(mOutput); - } - mProto.clear(); -} - void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) { - while (1) { + while (true) { int64_t nowMillis = getElapsedRealtimeMillis(); { std::lock_guard<std::mutex> lock(mMutex); + // If the token has changed, the config has changed, so this + // puller can now stop. if (mPulledInfo.size() == 0 || mPullToken != token) { VLOG("Pulling thread %lld done!", (long long)token); return; @@ -152,55 +165,47 @@ void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) { } } -void ShellSubscriber::readConfig(int in) { - if (in <= 0) { +// Must be called with the lock acquired, so that mProto isn't being written to +// at the same time by multiple threads. +void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, + const SimpleAtomMatcher& matcher) { + if (mOutput < 0) { return; } - - while (1) { - size_t bufferSize = 0; - int result = 0; - if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) { - VLOG("Done reading"); - break; - } else if (result < 0 || result != sizeof(bufferSize)) { - ALOGE("Error reading config size"); - break; - } - - vector<uint8_t> buffer(bufferSize); - if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) { - ShellSubscription config; - if (config.ParseFromArray(buffer.data(), bufferSize)) { - updateConfig(config); - } else { - ALOGE("error parsing the config"); - break; - } - } else { - VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize); - break; + int count = 0; + mProto.clear(); + for (const auto& event : data) { + VLOG("%s", event->ToString().c_str()); + if (matchesSimple(*mUidMap, matcher, *event)) { + VLOG("matched"); + count++; + uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | + util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); + event->ToProto(mProto); + mProto.end(atomToken); } } -} -void ShellSubscriber::cleanUpLocked() { - // The file descriptors will be closed by binder. - mInput = 0; - mOutput = 0; - mResultReceiver = nullptr; - mPushedMatchers.clear(); - mPulledInfo.clear(); - mPullToken = 0; - VLOG("done clean up"); + if (count > 0) { + // First write the payload size. + size_t bufferSize = mProto.size(); + write(mOutput, &bufferSize, sizeof(bufferSize)); + + VLOG("%d atoms, proto size: %zu", count, bufferSize); + // Then write the payload. + mProto.flush(mOutput); + } } void ShellSubscriber::onLogEvent(const LogEvent& event) { + // Acquire a lock to prevent corruption from multiple threads writing to + // mProto. std::lock_guard<std::mutex> lock(mMutex); - - if (mOutput <= 0) { + if (mOutput < 0) { return; } + + mProto.clear(); for (const auto& matcher : mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { VLOG("%s", event.ToString().c_str()); @@ -208,25 +213,27 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) { util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event.ToProto(mProto); mProto.end(atomToken); + // First write the payload size. size_t bufferSize = mProto.size(); write(mOutput, &bufferSize, sizeof(bufferSize)); // Then write the payload. mProto.flush(mOutput); - mProto.clear(); - break; } } } -void ShellSubscriber::binderDied(const wp<IBinder>& who) { - { - VLOG("Shell exits"); - std::lock_guard<std::mutex> lock(mMutex); - cleanUpLocked(); - } - mShellDied.notify_all(); +void ShellSubscriber::cleanUpLocked() { + // The file descriptors will be closed by binder. + mInput = -1; + mOutput = -1; + mSubscriberId = 0; + mPushedMatchers.clear(); + mPulledInfo.clear(); + // Setting mPullToken == 0 tells pull thread that its work is done. + mPullToken = 0; + VLOG("done clean up"); } } // namespace statsd |