diff options
Diffstat (limited to 'cmds/statsd/src/shell/ShellSubscriber.cpp')
-rw-r--r-- | cmds/statsd/src/shell/ShellSubscriber.cpp | 301 |
1 files changed, 156 insertions, 145 deletions
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp index d6a04336bc46..fd883c29dba0 100644 --- a/cmds/statsd/src/shell/ShellSubscriber.cpp +++ b/cmds/statsd/src/shell/ShellSubscriber.cpp @@ -18,6 +18,8 @@ #include "ShellSubscriber.h" +#include <android-base/file.h> + #include "matchers/matcher_util.h" #include "stats_log_util.h" @@ -29,204 +31,213 @@ namespace statsd { const static int FIELD_ID_ATOM = 1; -void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver, - int timeoutSec) { - VLOG("start new shell subscription"); +void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { + int myToken = claimToken(); + VLOG("ShellSubscriber: new subscription %d has come in", myToken); + mSubscriptionShouldEnd.notify_one(); + + shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out); + if (!readConfig(mySubscriptionInfo)) return; + { - std::lock_guard<std::mutex> lock(mMutex); - if (mResultReceiver != nullptr) { - VLOG("Only one shell subscriber is allowed."); - return; - } - mInput = in; - mOutput = out; - mResultReceiver = resultReceiver; - IInterface::asBinder(mResultReceiver)->linkToDeath(this); - } + std::unique_lock<std::mutex> lock(mMutex); + mSubscriptionInfo = mySubscriptionInfo; + spawnHelperThread(myToken); + waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec); - // Note that the following is blocking, and it's intended as we cannot return until the shell - // cmd exits, otherwise all resources & FDs will be automatically closed. + if (mSubscriptionInfo == mySubscriptionInfo) { + mSubscriptionInfo = nullptr; + } - // Read config forever until EOF is reached. Clients may send multiple configs -- each new - // config replace the previous one. - readConfig(in); - VLOG("timeout : %d", timeoutSec); + } +} - // Now we have read an EOF we now wait for the semaphore until the client exits. - VLOG("Now wait for client to exit"); - std::unique_lock<std::mutex> lk(mMutex); +void ShellSubscriber::spawnHelperThread(int myToken) { + std::thread t([this, myToken] { pullAndSendHeartbeats(myToken); }); + t.detach(); +} +void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo, + int myToken, + std::unique_lock<std::mutex>& lock, + int timeoutSec) { if (timeoutSec > 0) { - mShellDied.wait_for(lk, timeoutSec * 1s, - [this, resultReceiver] { return mResultReceiver != resultReceiver; }); + mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] { + return mToken != myToken || !myInfo->mClientAlive; + }); } else { - mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; }); + mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] { + return mToken != myToken || !myInfo->mClientAlive; + }); } } -void ShellSubscriber::updateConfig(const ShellSubscription& config) { - std::lock_guard<std::mutex> lock(mMutex); - mPushedMatchers.clear(); - mPulledInfo.clear(); +// Atomically claim the next token. Token numbers denote subscriber ordering. +int ShellSubscriber::claimToken() { + std::unique_lock<std::mutex> lock(mMutex); + int myToken = ++mToken; + return myToken; +} - for (const auto& pushed : config.pushed()) { - mPushedMatchers.push_back(pushed); - VLOG("adding matcher for atom %d", pushed.atom_id()); +// Read and parse single config. There should only one config per input. +bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) { + // Read the size of the config. + size_t bufferSize; + if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) { + return false; } - int64_t token = getElapsedRealtimeNs(); - mPullToken = token; - - int64_t minInterval = -1; - for (const auto& pulled : config.pulled()) { - // All intervals need to be multiples of the min interval. - if (minInterval < 0 || pulled.freq_millis() < minInterval) { - minInterval = pulled.freq_millis(); - } + // Read the config. + vector<uint8_t> buffer(bufferSize); + if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) { + return false; + } - mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis()); - VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); + // Parse the config. + ShellSubscription config; + if (!config.ParseFromArray(buffer.data(), bufferSize)) { + return false; } - if (mPulledInfo.size() > 0 && minInterval > 0) { - // This thread is guaranteed to terminate after it detects the token is different or - // cleaned up. - std::thread puller([token, minInterval, this] { startPull(token, minInterval); }); - puller.detach(); + // Update SubscriptionInfo with state from config + for (const auto& pushed : config.pushed()) { + subscriptionInfo->mPushedMatchers.push_back(pushed); } -} -void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, - const SimpleAtomMatcher& matcher) { - if (mOutput == 0) return; - int count = 0; - mProto.clear(); - for (const auto& event : data) { - VLOG("%s", event->ToString().c_str()); - if (matchesSimple(*mUidMap, matcher, *event)) { - VLOG("matched"); - count++; - uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | - util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); - event->ToProto(mProto); - mProto.end(atomToken); + for (const auto& pulled : config.pulled()) { + vector<string> packages; + vector<int32_t> uids; + for (const string& pkg : pulled.packages()) { + auto it = UidMap::sAidToUidMapping.find(pkg); + if (it != UidMap::sAidToUidMapping.end()) { + uids.push_back(it->second); + } else { + packages.push_back(pkg); + } } - } - if (count > 0) { - // First write the payload size. - size_t bufferSize = mProto.size(); - write(mOutput, &bufferSize, sizeof(bufferSize)); - VLOG("%d atoms, proto size: %zu", count, bufferSize); - // Then write the payload. - mProto.flush(mOutput); + subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis(), packages, + uids); + VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); } - mProto.clear(); + + return true; } -void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) { - while (1) { - int64_t nowMillis = getElapsedRealtimeMillis(); +void ShellSubscriber::pullAndSendHeartbeats(int myToken) { + VLOG("ShellSubscriber: helper thread %d starting", myToken); + while (true) { + int64_t sleepTimeMs = INT_MAX; { std::lock_guard<std::mutex> lock(mMutex); - if (mPulledInfo.size() == 0 || mPullToken != token) { - VLOG("Pulling thread %lld done!", (long long)token); + if (!mSubscriptionInfo || mToken != myToken) { + VLOG("ShellSubscriber: helper thread %d done!", myToken); return; } - for (auto& pullInfo : mPulledInfo) { - if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { - VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id()); - - vector<std::shared_ptr<LogEvent>> data; - mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data); - VLOG("pulled %zu atoms", data.size()); - if (data.size() > 0) { - writeToOutputLocked(data, pullInfo.mPullerMatcher); - } - pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; + + int64_t nowMillis = getElapsedRealtimeMillis(); + int64_t nowNanos = getElapsedRealtimeNs(); + for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { + if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) { + continue; } + + vector<int32_t> uids; + getUidsForPullAtom(&uids, pullInfo); + + vector<std::shared_ptr<LogEvent>> data; + mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data); + VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); + writePulledAtomsLocked(data, pullInfo.mPullerMatcher); + + pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } + + // Send a heartbeat, consisting of a data size of 0, if perfd hasn't recently received + // data from statsd. When it receives the data size of 0, perfd will not expect any + // atoms and recheck whether the subscription should end. + if (nowMillis - mLastWriteMs > kMsBetweenHeartbeats) { + attemptWriteToPipeLocked(/*dataSize=*/0); + } + + // Determine how long to sleep before doing more work. + for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { + int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval; + int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative + if (timeBeforePull < sleepTimeMs) sleepTimeMs = timeBeforePull; + } + int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis; + if (timeBeforeHeartbeat < sleepTimeMs) sleepTimeMs = timeBeforeHeartbeat; } - VLOG("Pulling thread %lld sleep....", (long long)token); - std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis)); + + VLOG("ShellSubscriber: helper thread %d sleeping for %lld ms", myToken, + (long long)sleepTimeMs); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs)); } } -void ShellSubscriber::readConfig(int in) { - if (in <= 0) { - return; +void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) { + uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end()); + // This is slow. Consider storing the uids per app and listening to uidmap updates. + for (const string& pkg : pullInfo.mPullPackages) { + set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg); + uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end()); } + uids->push_back(DEFAULT_PULL_UID); +} - while (1) { - size_t bufferSize = 0; - int result = 0; - if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) { - VLOG("Done reading"); - break; - } else if (result < 0 || result != sizeof(bufferSize)) { - ALOGE("Error reading config size"); - break; - } - - vector<uint8_t> buffer(bufferSize); - if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) { - ShellSubscription config; - if (config.ParseFromArray(buffer.data(), bufferSize)) { - updateConfig(config); - } else { - ALOGE("error parsing the config"); - break; - } - } else { - VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize); - break; +void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, + const SimpleAtomMatcher& matcher) { + mProto.clear(); + int count = 0; + for (const auto& event : data) { + if (matchesSimple(*mUidMap, matcher, *event)) { + count++; + uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | + util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); + event->ToProto(mProto); + mProto.end(atomToken); } } -} -void ShellSubscriber::cleanUpLocked() { - // The file descriptors will be closed by binder. - mInput = 0; - mOutput = 0; - mResultReceiver = nullptr; - mPushedMatchers.clear(); - mPulledInfo.clear(); - mPullToken = 0; - VLOG("done clean up"); + if (count > 0) attemptWriteToPipeLocked(mProto.size()); } void ShellSubscriber::onLogEvent(const LogEvent& event) { std::lock_guard<std::mutex> lock(mMutex); + if (!mSubscriptionInfo) return; - if (mOutput <= 0) { - return; - } - for (const auto& matcher : mPushedMatchers) { + mProto.clear(); + for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { - VLOG("%s", event.ToString().c_str()); uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event.ToProto(mProto); mProto.end(atomToken); - // First write the payload size. - size_t bufferSize = mProto.size(); - write(mOutput, &bufferSize, sizeof(bufferSize)); - - // Then write the payload. - mProto.flush(mOutput); - mProto.clear(); - break; + attemptWriteToPipeLocked(mProto.size()); } } } -void ShellSubscriber::binderDied(const wp<IBinder>& who) { - { - VLOG("Shell exits"); - std::lock_guard<std::mutex> lock(mMutex); - cleanUpLocked(); +// Tries to write the atom encoded in mProto to the pipe. If the write fails +// because the read end of the pipe has closed, signals to other threads that +// the subscription should end. +void ShellSubscriber::attemptWriteToPipeLocked(size_t dataSize) { + // First, write the payload size. + if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) { + mSubscriptionInfo->mClientAlive = false; + mSubscriptionShouldEnd.notify_one(); + return; } - mShellDied.notify_all(); + + // Then, write the payload if this is not just a heartbeat. + if (dataSize > 0 && !mProto.flush(mSubscriptionInfo->mOutputFd)) { + mSubscriptionInfo->mClientAlive = false; + mSubscriptionShouldEnd.notify_one(); + return; + } + + mLastWriteMs = getElapsedRealtimeMillis(); } } // namespace statsd |