diff options
Diffstat (limited to 'cmds/statsd/src/shell/ShellSubscriber.h')
-rw-r--r-- | cmds/statsd/src/shell/ShellSubscriber.h | 93 |
1 files changed, 60 insertions, 33 deletions
diff --git a/cmds/statsd/src/shell/ShellSubscriber.h b/cmds/statsd/src/shell/ShellSubscriber.h index 86d85901083a..4c05fa7f71c2 100644 --- a/cmds/statsd/src/shell/ShellSubscriber.h +++ b/cmds/statsd/src/shell/ShellSubscriber.h @@ -16,16 +16,17 @@ #pragma once -#include "logd/LogEvent.h" - #include <android/util/ProtoOutputStream.h> -#include <binder/IResultReceiver.h> +#include <private/android_filesystem_config.h> + #include <condition_variable> #include <mutex> #include <thread> + #include "external/StatsPullerManager.h" #include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h" #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h" +#include "logd/LogEvent.h" #include "packages/UidMap.h" namespace android { @@ -37,11 +38,11 @@ namespace statsd { * * A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client * communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms. - * The atoms are sent back to the client in real time, as opposed to - * keeping the data in memory. Shell clients do not subscribe aggregated metrics, as they are - * responsible for doing the aggregation after receiving the atom events. + * The atoms are sent back to the client in real time, as opposed to keeping the data in memory. + * Shell clients do not subscribe aggregated metrics, as they are responsible for doing the + * aggregation after receiving the atom events. * - * Shell client pass ShellSubscription in the proto binary format. Client can update the + * Shell clients pass ShellSubscription in the proto binary format. Clients can update the * subscription by sending a new subscription. The new subscription would replace the old one. * Input data stream format is: * @@ -53,43 +54,70 @@ namespace statsd { * The stream would be in the following format: * |size_t|shellData proto|size_t|shellData proto|.... * - * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread + * Only one shell subscriber is allowed at a time because each shell subscriber blocks one thread * until it exits. */ -class ShellSubscriber : public virtual IBinder::DeathRecipient { +class ShellSubscriber : public virtual RefBase { public: ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr) : mUidMap(uidMap), mPullerMgr(pullerMgr){}; - /** - * Start a new subscription. - */ - void startNewSubscription(int inFd, int outFd, sp<IResultReceiver> resultReceiver, - int timeoutSec); - - void binderDied(const wp<IBinder>& who); + void startNewSubscription(int inFd, int outFd, int timeoutSec); void onLogEvent(const LogEvent& event); private: struct PullInfo { - PullInfo(const SimpleAtomMatcher& matcher, int64_t interval) - : mPullerMatcher(matcher), mInterval(interval), mPrevPullElapsedRealtimeMs(0) { + PullInfo(const SimpleAtomMatcher& matcher, int64_t interval, + const std::vector<std::string>& packages, const std::vector<int32_t>& uids) + : mPullerMatcher(matcher), + mInterval(interval), + mPrevPullElapsedRealtimeMs(0), + mPullPackages(packages), + mPullUids(uids) { } SimpleAtomMatcher mPullerMatcher; int64_t mInterval; int64_t mPrevPullElapsedRealtimeMs; + std::vector<std::string> mPullPackages; + std::vector<int32_t> mPullUids; + }; + + struct SubscriptionInfo { + SubscriptionInfo(const int& inputFd, const int& outputFd) + : mInputFd(inputFd), mOutputFd(outputFd), mClientAlive(true) { + } + + int mInputFd; + int mOutputFd; + std::vector<SimpleAtomMatcher> mPushedMatchers; + std::vector<PullInfo> mPulledInfo; + bool mClientAlive; }; - void readConfig(int in); - void updateConfig(const ShellSubscription& config); + int claimToken(); + + bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo); + + void spawnHelperThread(int myToken); - void startPull(int64_t token, int64_t intervalMillis); + void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo, + int myToken, + std::unique_lock<std::mutex>& lock, + int timeoutSec); - void cleanUpLocked(); + // Helper thread that pulls atoms at a regular frequency and sends + // heartbeats to perfd if statsd hasn't recently sent any data. Statsd must + // send heartbeats for perfd to escape a blocking read call and recheck if + // the user has terminated the subscription. + void pullAndSendHeartbeats(int myToken); - void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, - const SimpleAtomMatcher& matcher); + void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, + const SimpleAtomMatcher& matcher); + + void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo); + + void attemptWriteToPipeLocked(size_t dataSize); sp<UidMap> mUidMap; @@ -99,19 +127,18 @@ private: mutable std::mutex mMutex; - std::condition_variable mShellDied; // semaphore for waiting until shell exits. - - int mInput; // The input file descriptor - - int mOutput; // The output file descriptor + std::condition_variable mSubscriptionShouldEnd; - sp<IResultReceiver> mResultReceiver; + std::shared_ptr<SubscriptionInfo> mSubscriptionInfo = nullptr; - std::vector<SimpleAtomMatcher> mPushedMatchers; + int mToken = 0; - std::vector<PullInfo> mPulledInfo; + const int32_t DEFAULT_PULL_UID = AID_SYSTEM; - int64_t mPullToken = 0; // A unique token to identify a puller thread. + // Tracks when we last send data to perfd. We need that time to determine + // when next to send a heartbeat. + int64_t mLastWriteMs = 0; + const int64_t kMsBetweenHeartbeats = 1000; }; } // namespace statsd |