summaryrefslogtreecommitdiff
path: root/cmds/statsd/src/shell/ShellSubscriber.h
diff options
context:
space:
mode:
authorXin Li <delphij@google.com>2020-09-09 20:21:10 -0700
committerXin Li <delphij@google.com>2020-09-09 20:21:10 -0700
commitc64112eb974e9aa7638aead998f07a868acfb5a7 (patch)
tree503334edcee47bfd9f7a76d987d881992ecae9aa /cmds/statsd/src/shell/ShellSubscriber.h
parent104d2f92b3911576c284ddb0adf78148359883d2 (diff)
parent14a6871e432e163533a320516ace97bd67d9c3a0 (diff)
Merge Android R
Bug: 168057903 Merged-In: Ice3e441cc9c0df8d0a6acc016bb74375e081bd67 Change-Id: I1d85742f594be2007c99841b290e502b6ede624e
Diffstat (limited to 'cmds/statsd/src/shell/ShellSubscriber.h')
-rw-r--r--cmds/statsd/src/shell/ShellSubscriber.h93
1 files changed, 60 insertions, 33 deletions
diff --git a/cmds/statsd/src/shell/ShellSubscriber.h b/cmds/statsd/src/shell/ShellSubscriber.h
index 86d85901083a..4c05fa7f71c2 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.h
+++ b/cmds/statsd/src/shell/ShellSubscriber.h
@@ -16,16 +16,17 @@
#pragma once
-#include "logd/LogEvent.h"
-
#include <android/util/ProtoOutputStream.h>
-#include <binder/IResultReceiver.h>
+#include <private/android_filesystem_config.h>
+
#include <condition_variable>
#include <mutex>
#include <thread>
+
#include "external/StatsPullerManager.h"
#include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h"
#include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
+#include "logd/LogEvent.h"
#include "packages/UidMap.h"
namespace android {
@@ -37,11 +38,11 @@ namespace statsd {
*
* A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client
* communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms.
- * The atoms are sent back to the client in real time, as opposed to
- * keeping the data in memory. Shell clients do not subscribe aggregated metrics, as they are
- * responsible for doing the aggregation after receiving the atom events.
+ * The atoms are sent back to the client in real time, as opposed to keeping the data in memory.
+ * Shell clients do not subscribe aggregated metrics, as they are responsible for doing the
+ * aggregation after receiving the atom events.
*
- * Shell client pass ShellSubscription in the proto binary format. Client can update the
+ * Shell clients pass ShellSubscription in the proto binary format. Clients can update the
* subscription by sending a new subscription. The new subscription would replace the old one.
* Input data stream format is:
*
@@ -53,43 +54,70 @@ namespace statsd {
* The stream would be in the following format:
* |size_t|shellData proto|size_t|shellData proto|....
*
- * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread
+ * Only one shell subscriber is allowed at a time because each shell subscriber blocks one thread
* until it exits.
*/
-class ShellSubscriber : public virtual IBinder::DeathRecipient {
+class ShellSubscriber : public virtual RefBase {
public:
ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr)
: mUidMap(uidMap), mPullerMgr(pullerMgr){};
- /**
- * Start a new subscription.
- */
- void startNewSubscription(int inFd, int outFd, sp<IResultReceiver> resultReceiver,
- int timeoutSec);
-
- void binderDied(const wp<IBinder>& who);
+ void startNewSubscription(int inFd, int outFd, int timeoutSec);
void onLogEvent(const LogEvent& event);
private:
struct PullInfo {
- PullInfo(const SimpleAtomMatcher& matcher, int64_t interval)
- : mPullerMatcher(matcher), mInterval(interval), mPrevPullElapsedRealtimeMs(0) {
+ PullInfo(const SimpleAtomMatcher& matcher, int64_t interval,
+ const std::vector<std::string>& packages, const std::vector<int32_t>& uids)
+ : mPullerMatcher(matcher),
+ mInterval(interval),
+ mPrevPullElapsedRealtimeMs(0),
+ mPullPackages(packages),
+ mPullUids(uids) {
}
SimpleAtomMatcher mPullerMatcher;
int64_t mInterval;
int64_t mPrevPullElapsedRealtimeMs;
+ std::vector<std::string> mPullPackages;
+ std::vector<int32_t> mPullUids;
+ };
+
+ struct SubscriptionInfo {
+ SubscriptionInfo(const int& inputFd, const int& outputFd)
+ : mInputFd(inputFd), mOutputFd(outputFd), mClientAlive(true) {
+ }
+
+ int mInputFd;
+ int mOutputFd;
+ std::vector<SimpleAtomMatcher> mPushedMatchers;
+ std::vector<PullInfo> mPulledInfo;
+ bool mClientAlive;
};
- void readConfig(int in);
- void updateConfig(const ShellSubscription& config);
+ int claimToken();
+
+ bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo);
+
+ void spawnHelperThread(int myToken);
- void startPull(int64_t token, int64_t intervalMillis);
+ void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo,
+ int myToken,
+ std::unique_lock<std::mutex>& lock,
+ int timeoutSec);
- void cleanUpLocked();
+ // Helper thread that pulls atoms at a regular frequency and sends
+ // heartbeats to perfd if statsd hasn't recently sent any data. Statsd must
+ // send heartbeats for perfd to escape a blocking read call and recheck if
+ // the user has terminated the subscription.
+ void pullAndSendHeartbeats(int myToken);
- void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
- const SimpleAtomMatcher& matcher);
+ void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
+ const SimpleAtomMatcher& matcher);
+
+ void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo);
+
+ void attemptWriteToPipeLocked(size_t dataSize);
sp<UidMap> mUidMap;
@@ -99,19 +127,18 @@ private:
mutable std::mutex mMutex;
- std::condition_variable mShellDied; // semaphore for waiting until shell exits.
-
- int mInput; // The input file descriptor
-
- int mOutput; // The output file descriptor
+ std::condition_variable mSubscriptionShouldEnd;
- sp<IResultReceiver> mResultReceiver;
+ std::shared_ptr<SubscriptionInfo> mSubscriptionInfo = nullptr;
- std::vector<SimpleAtomMatcher> mPushedMatchers;
+ int mToken = 0;
- std::vector<PullInfo> mPulledInfo;
+ const int32_t DEFAULT_PULL_UID = AID_SYSTEM;
- int64_t mPullToken = 0; // A unique token to identify a puller thread.
+ // Tracks when we last send data to perfd. We need that time to determine
+ // when next to send a heartbeat.
+ int64_t mLastWriteMs = 0;
+ const int64_t kMsBetweenHeartbeats = 1000;
};
} // namespace statsd