diff options
Diffstat (limited to 'cmds/statsd/src/shell/ShellSubscriber.cpp')
-rw-r--r-- | cmds/statsd/src/shell/ShellSubscriber.cpp | 230 |
1 files changed, 104 insertions, 126 deletions
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp index a861a3b76868..c677222f31eb 100644 --- a/cmds/statsd/src/shell/ShellSubscriber.cpp +++ b/cmds/statsd/src/shell/ShellSubscriber.cpp @@ -18,6 +18,7 @@ #include "ShellSubscriber.h" +#include <android-base/file.h> #include "matchers/matcher_util.h" #include "stats_log_util.h" @@ -30,154 +31,129 @@ namespace statsd { const static int FIELD_ID_ATOM = 1; void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { - VLOG("start new shell subscription"); - int64_t subscriberId = getElapsedRealtimeNs(); + int myToken = claimToken(); + mSubscriptionShouldEnd.notify_one(); - { - std::lock_guard<std::mutex> lock(mMutex); - if (mSubscriberId> 0) { - VLOG("Only one shell subscriber is allowed."); - return; - } - mSubscriberId = subscriberId; - mInput = in; - mOutput = out; + shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out); + if (!readConfig(mySubscriptionInfo)) { + return; } - bool success = readConfig(); - if (!success) { - std::lock_guard<std::mutex> lock(mMutex); - cleanUpLocked(); + // critical-section + std::unique_lock<std::mutex> lock(mMutex); + if (myToken != mToken) { + // Some other subscription has already come in. Stop. + return; } + mSubscriptionInfo = mySubscriptionInfo; - VLOG("Wait for client to exit or timeout (%d sec)", timeoutSec); - std::unique_lock<std::mutex> lk(mMutex); + if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) { + // This thread terminates after it detects that mToken has changed. + std::thread puller([this, myToken] { startPull(myToken); }); + puller.detach(); + } - // Note that the following is blocking, and it's intended as we cannot return until the shell - // cmd exits or we time out. + // Block until subscription has ended. if (timeoutSec > 0) { - mShellDied.wait_for(lk, timeoutSec * 1s, - [this, subscriberId] { return mSubscriberId != subscriberId; }); + mSubscriptionShouldEnd.wait_for( + lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] { + return mToken != myToken || !mySubscriptionInfo->mClientAlive; + }); } else { - mShellDied.wait(lk, [this, subscriberId] { return mSubscriberId != subscriberId; }); + mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] { + return mToken != myToken || !mySubscriptionInfo->mClientAlive; + }); + } + + if (mSubscriptionInfo == mySubscriptionInfo) { + mSubscriptionInfo = nullptr; } } +// Atomically claim the next token. Token numbers denote subscriber ordering. +int ShellSubscriber::claimToken() { + std::unique_lock<std::mutex> lock(mMutex); + int myToken = ++mToken; + return myToken; +} -// Read configs until EOF is reached. There may be multiple configs in the input -// -- each new config should replace the previous one. -// -// Returns a boolean indicating whether the input was read successfully. -bool ShellSubscriber::readConfig() { - if (mInput < 0) { +// Read and parse single config. There should only one config per input. +bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) { + // Read the size of the config. + size_t bufferSize; + if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) { return false; } - while (true) { - // Read the size of the config. - size_t bufferSize = 0; - ssize_t bytesRead = read(mInput, &bufferSize, sizeof(bufferSize)); - if (bytesRead == 0) { - VLOG("We have reached the end of the input."); - return true; - } else if (bytesRead < 0 || (size_t)bytesRead != sizeof(bufferSize)) { - ALOGE("Error reading config size"); - return false; - } - - // Read and parse the config. - vector<uint8_t> buffer(bufferSize); - bytesRead = read(mInput, buffer.data(), bufferSize); - if (bytesRead > 0 && (size_t)bytesRead == bufferSize) { - ShellSubscription config; - if (config.ParseFromArray(buffer.data(), bufferSize)) { - updateConfig(config); - } else { - ALOGE("Error parsing the config"); - return false; - } - } else { - VLOG("Error reading the config, expected bytes: %zu, actual bytes: %zu", bufferSize, - bytesRead); - return false; - } + // Read the config. + vector<uint8_t> buffer(bufferSize); + if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) { + return false; } -} -void ShellSubscriber::updateConfig(const ShellSubscription& config) { - mPushedMatchers.clear(); - mPulledInfo.clear(); + // Parse the config. + ShellSubscription config; + if (!config.ParseFromArray(buffer.data(), bufferSize)) { + return false; + } + // Update SubscriptionInfo with state from config for (const auto& pushed : config.pushed()) { - mPushedMatchers.push_back(pushed); - VLOG("adding matcher for pushed atom %d", pushed.atom_id()); + subscriptionInfo->mPushedMatchers.push_back(pushed); } - int64_t token = getElapsedRealtimeNs(); - mPullToken = token; - - int64_t minInterval = -1; + int minInterval = -1; for (const auto& pulled : config.pulled()) { // All intervals need to be multiples of the min interval. if (minInterval < 0 || pulled.freq_millis() < minInterval) { minInterval = pulled.freq_millis(); } - - mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis()); - VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); + subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis()); } + subscriptionInfo->mPullIntervalMin = minInterval; - if (mPulledInfo.size() > 0 && minInterval > 0) { - // This thread is guaranteed to terminate after it detects the token is - // different. - std::thread puller([token, minInterval, this] { startPull(token, minInterval); }); - puller.detach(); - } + return true; } -void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) { +void ShellSubscriber::startPull(int64_t myToken) { while (true) { + std::lock_guard<std::mutex> lock(mMutex); + if (!mSubscriptionInfo || mToken != myToken) { + VLOG("Pulling thread %lld done!", (long long)myToken); + return; + } + int64_t nowMillis = getElapsedRealtimeMillis(); - { - std::lock_guard<std::mutex> lock(mMutex); - // If the token has changed, the config has changed, so this - // puller can now stop. - if (mPulledInfo.size() == 0 || mPullToken != token) { - VLOG("Pulling thread %lld done!", (long long)token); - return; - } - for (auto& pullInfo : mPulledInfo) { - if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { - VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id()); - - vector<std::shared_ptr<LogEvent>> data; - mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data); - VLOG("pulled %zu atoms", data.size()); - if (data.size() > 0) { - writeToOutputLocked(data, pullInfo.mPullerMatcher); - } - pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; + for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) { + if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { + vector<std::shared_ptr<LogEvent>> data; + mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data); + VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); + + // TODO(b/150969574): Don't write to a pipe while holding a lock. + if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) { + mSubscriptionInfo->mClientAlive = false; + mSubscriptionShouldEnd.notify_one(); + return; } + pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } } - VLOG("Pulling thread %lld sleep....", (long long)token); - std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis)); + + VLOG("Pulling thread %lld sleep....", (long long)myToken); + std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin)); } } -// Must be called with the lock acquired, so that mProto isn't being written to -// at the same time by multiple threads. -void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, - const SimpleAtomMatcher& matcher) { - if (mOutput < 0) { - return; - } - int count = 0; +// \return boolean indicating if writes were successful (will return false if +// client dies) +bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, + const SimpleAtomMatcher& matcher) { mProto.clear(); + int count = 0; for (const auto& event : data) { VLOG("%s", event->ToString().c_str()); if (matchesSimple(*mUidMap, matcher, *event)) { - VLOG("matched"); count++; uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); @@ -189,24 +165,29 @@ void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent> if (count > 0) { // First write the payload size. size_t bufferSize = mProto.size(); - write(mOutput, &bufferSize, sizeof(bufferSize)); + if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, + sizeof(bufferSize))) { + return false; + } VLOG("%d atoms, proto size: %zu", count, bufferSize); // Then write the payload. - mProto.flush(mOutput); + if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { + return false; + } } + + return true; } void ShellSubscriber::onLogEvent(const LogEvent& event) { - // Acquire a lock to prevent corruption from multiple threads writing to - // mProto. std::lock_guard<std::mutex> lock(mMutex); - if (mOutput < 0) { + if (!mSubscriptionInfo) { return; } mProto.clear(); - for (const auto& matcher : mPushedMatchers) { + for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { VLOG("%s", event.ToString().c_str()); uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | @@ -216,26 +197,23 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) { // First write the payload size. size_t bufferSize = mProto.size(); - write(mOutput, &bufferSize, sizeof(bufferSize)); + if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, + sizeof(bufferSize))) { + mSubscriptionInfo->mClientAlive = false; + mSubscriptionShouldEnd.notify_one(); + return; + } // Then write the payload. - mProto.flush(mOutput); + if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { + mSubscriptionInfo->mClientAlive = false; + mSubscriptionShouldEnd.notify_one(); + return; + } } } } -void ShellSubscriber::cleanUpLocked() { - // The file descriptors will be closed by binder. - mInput = -1; - mOutput = -1; - mSubscriberId = 0; - mPushedMatchers.clear(); - mPulledInfo.clear(); - // Setting mPullToken == 0 tells pull thread that its work is done. - mPullToken = 0; - VLOG("done clean up"); -} - } // namespace statsd } // namespace os } // namespace android |