/* * Copyright (C) 2018 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define DEBUG false // STOPSHIP if true #include "Log.h" #include "ShellSubscriber.h" #include #include "matchers/matcher_util.h" #include "stats_log_util.h" using android::util::ProtoOutputStream; namespace android { namespace os { namespace statsd { const static int FIELD_ID_ATOM = 1; void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { int myToken = claimToken(); mSubscriptionShouldEnd.notify_one(); shared_ptr mySubscriptionInfo = make_shared(in, out); if (!readConfig(mySubscriptionInfo)) { return; } // critical-section std::unique_lock lock(mMutex); if (myToken != mToken) { // Some other subscription has already come in. Stop. return; } mSubscriptionInfo = mySubscriptionInfo; if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) { // This thread terminates after it detects that mToken has changed. std::thread puller([this, myToken] { startPull(myToken); }); puller.detach(); } // Block until subscription has ended. if (timeoutSec > 0) { mSubscriptionShouldEnd.wait_for( lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] { return mToken != myToken || !mySubscriptionInfo->mClientAlive; }); } else { mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] { return mToken != myToken || !mySubscriptionInfo->mClientAlive; }); } if (mSubscriptionInfo == mySubscriptionInfo) { mSubscriptionInfo = nullptr; } } // Atomically claim the next token. Token numbers denote subscriber ordering. int ShellSubscriber::claimToken() { std::unique_lock lock(mMutex); int myToken = ++mToken; return myToken; } // Read and parse single config. There should only one config per input. bool ShellSubscriber::readConfig(shared_ptr subscriptionInfo) { // Read the size of the config. size_t bufferSize; if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) { return false; } // Read the config. vector buffer(bufferSize); if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) { return false; } // Parse the config. ShellSubscription config; if (!config.ParseFromArray(buffer.data(), bufferSize)) { return false; } // Update SubscriptionInfo with state from config for (const auto& pushed : config.pushed()) { subscriptionInfo->mPushedMatchers.push_back(pushed); } int minInterval = -1; for (const auto& pulled : config.pulled()) { // All intervals need to be multiples of the min interval. if (minInterval < 0 || pulled.freq_millis() < minInterval) { minInterval = pulled.freq_millis(); } subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis()); } subscriptionInfo->mPullIntervalMin = minInterval; return true; } void ShellSubscriber::startPull(int64_t myToken) { while (true) { std::lock_guard lock(mMutex); if (!mSubscriptionInfo || mToken != myToken) { VLOG("Pulling thread %lld done!", (long long)myToken); return; } int64_t nowMillis = getElapsedRealtimeMillis(); for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) { if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { vector> data; mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data); VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); // TODO(b/150969574): Don't write to a pipe while holding a lock. if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } } VLOG("Pulling thread %lld sleep....", (long long)myToken); std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin)); } } // \return boolean indicating if writes were successful (will return false if // client dies) bool ShellSubscriber::writePulledAtomsLocked(const vector>& data, const SimpleAtomMatcher& matcher) { mProto.clear(); int count = 0; for (const auto& event : data) { VLOG("%s", event->ToString().c_str()); if (matchesSimple(*mUidMap, matcher, *event)) { count++; uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event->ToProto(mProto); mProto.end(atomToken); } } if (count > 0) { // First write the payload size. size_t bufferSize = mProto.size(); if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, sizeof(bufferSize))) { return false; } VLOG("%d atoms, proto size: %zu", count, bufferSize); // Then write the payload. if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { return false; } } return true; } void ShellSubscriber::onLogEvent(const LogEvent& event) { std::lock_guard lock(mMutex); if (!mSubscriptionInfo) { return; } mProto.clear(); for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { VLOG("%s", event.ToString().c_str()); uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event.ToProto(mProto); mProto.end(atomToken); // First write the payload size. size_t bufferSize = mProto.size(); if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, sizeof(bufferSize))) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } // Then write the payload. if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } } } } } // namespace statsd } // namespace os } // namespace android