/* * Copyright (C) 2018 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define DEBUG false // STOPSHIP if true #include "Log.h" #include "ShellSubscriber.h" #include #include "matchers/matcher_util.h" #include "stats_log_util.h" using android::util::ProtoOutputStream; namespace android { namespace os { namespace statsd { const static int FIELD_ID_ATOM = 1; void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { int myToken = claimToken(); VLOG("ShellSubscriber: new subscription %d has come in", myToken); mSubscriptionShouldEnd.notify_one(); shared_ptr mySubscriptionInfo = make_shared(in, out); if (!readConfig(mySubscriptionInfo)) return; { std::unique_lock lock(mMutex); if (myToken != mToken) { // Some other subscription has already come in. Stop. return; } mSubscriptionInfo = mySubscriptionInfo; spawnHelperThreadsLocked(mySubscriptionInfo, myToken); waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec); if (mSubscriptionInfo == mySubscriptionInfo) { mSubscriptionInfo = nullptr; } } } void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr myInfo, int myToken) { if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) { std::thread puller([this, myToken] { startPull(myToken); }); puller.detach(); } std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); }); heartbeatSender.detach(); } void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr myInfo, int myToken, std::unique_lock& lock, int timeoutSec) { if (timeoutSec > 0) { mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] { return mToken != myToken || !myInfo->mClientAlive; }); } else { mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] { return mToken != myToken || !myInfo->mClientAlive; }); } } // Atomically claim the next token. Token numbers denote subscriber ordering. int ShellSubscriber::claimToken() { std::unique_lock lock(mMutex); int myToken = ++mToken; return myToken; } // Read and parse single config. There should only one config per input. bool ShellSubscriber::readConfig(shared_ptr subscriptionInfo) { // Read the size of the config. size_t bufferSize; if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) { return false; } // Read the config. vector buffer(bufferSize); if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) { return false; } // Parse the config. ShellSubscription config; if (!config.ParseFromArray(buffer.data(), bufferSize)) { return false; } // Update SubscriptionInfo with state from config for (const auto& pushed : config.pushed()) { subscriptionInfo->mPushedMatchers.push_back(pushed); } int minInterval = -1; for (const auto& pulled : config.pulled()) { // All intervals need to be multiples of the min interval. if (minInterval < 0 || pulled.freq_millis() < minInterval) { minInterval = pulled.freq_millis(); } vector packages; vector uids; for (const string& pkg : pulled.packages()) { auto it = UidMap::sAidToUidMapping.find(pkg); if (it != UidMap::sAidToUidMapping.end()) { uids.push_back(it->second); } else { packages.push_back(pkg); } } subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis(), packages, uids); VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); } subscriptionInfo->mPullIntervalMin = minInterval; return true; } void ShellSubscriber::startPull(int myToken) { VLOG("ShellSubscriber: pull thread %d starting", myToken); while (true) { { std::lock_guard lock(mMutex); if (!mSubscriptionInfo || mToken != myToken) { VLOG("ShellSubscriber: pulling thread %d done!", myToken); return; } int64_t nowMillis = getElapsedRealtimeMillis(); for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) { continue; } vector uids; getUidsForPullAtom(&uids, pullInfo); vector> data; mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, &data); VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); writePulledAtomsLocked(data, pullInfo.mPullerMatcher); pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } } VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken, mSubscriptionInfo->mPullIntervalMin); std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin)); } } void ShellSubscriber::getUidsForPullAtom(vector* uids, const PullInfo& pullInfo) { uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end()); // This is slow. Consider storing the uids per app and listening to uidmap updates. for (const string& pkg : pullInfo.mPullPackages) { set uidsForPkg = mUidMap->getAppUid(pkg); uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end()); } uids->push_back(DEFAULT_PULL_UID); } void ShellSubscriber::writePulledAtomsLocked(const vector>& data, const SimpleAtomMatcher& matcher) { mProto.clear(); int count = 0; for (const auto& event : data) { if (matchesSimple(*mUidMap, matcher, *event)) { count++; uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event->ToProto(mProto); mProto.end(atomToken); } } if (count > 0) attemptWriteToSocketLocked(mProto.size()); } void ShellSubscriber::onLogEvent(const LogEvent& event) { std::lock_guard lock(mMutex); if (!mSubscriptionInfo) return; mProto.clear(); for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event.ToProto(mProto); mProto.end(atomToken); attemptWriteToSocketLocked(mProto.size()); } } } // Tries to write the atom encoded in mProto to the socket. If the write fails // because the read end of the pipe has closed, signals to other threads that // the subscription should end. void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) { // First write the payload size. if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } if (dataSize == 0) return; // Then, write the payload. if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } mLastWriteMs = getElapsedRealtimeMillis(); } // Send a heartbeat, consisting solely of a data size of 0, if perfd has not // recently received any writes from statsd. When it receives the data size of // 0, perfd will not expect any data and recheck whether the shell command is // still running. void ShellSubscriber::sendHeartbeats(int myToken) { while (true) { { std::lock_guard lock(mMutex); if (!mSubscriptionInfo || myToken != mToken) { VLOG("ShellSubscriber: heartbeat thread %d done!", myToken); return; } if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) { VLOG("ShellSubscriber: sending a heartbeat to perfd"); attemptWriteToSocketLocked(/*dataSize=*/0); } } std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats)); } } } // namespace statsd } // namespace os } // namespace android