summaryrefslogtreecommitdiff
path: root/cmds/statsd/src/shell/ShellSubscriber.h
blob: 4c05fa7f71c2678d8692fa00a260cd3592ebe419 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
 * Copyright (C) 2018 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <android/util/ProtoOutputStream.h>
#include <private/android_filesystem_config.h>

#include <condition_variable>
#include <mutex>
#include <thread>

#include "external/StatsPullerManager.h"
#include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h"
#include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
#include "logd/LogEvent.h"
#include "packages/UidMap.h"

namespace android {
namespace os {
namespace statsd {

/**
 * Handles atoms subscription via shell cmd.
 *
 * A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client
 * communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms.
 * The atoms are sent back to the client in real time, as opposed to keeping the data in memory.
 * Shell clients do not subscribe aggregated metrics, as they are responsible for doing the
 * aggregation after receiving the atom events.
 *
 * Shell clients pass ShellSubscription in the proto binary format. Clients can update the
 * subscription by sending a new subscription. The new subscription would replace the old one.
 * Input data stream format is:
 *
 * |size_t|subscription proto|size_t|subscription proto|....
 *
 * statsd sends the events back in Atom proto binary format. Each Atom message is preceded
 * with sizeof(size_t) bytes indicating the size of the proto message payload.
 *
 * The stream would be in the following format:
 * |size_t|shellData proto|size_t|shellData proto|....
 *
 * Only one shell subscriber is allowed at a time because each shell subscriber blocks one thread
 * until it exits.
 */
class ShellSubscriber : public virtual RefBase {
public:
    ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr)
        : mUidMap(uidMap), mPullerMgr(pullerMgr){};

    void startNewSubscription(int inFd, int outFd, int timeoutSec);

    void onLogEvent(const LogEvent& event);

private:
    struct PullInfo {
        PullInfo(const SimpleAtomMatcher& matcher, int64_t interval,
                 const std::vector<std::string>& packages, const std::vector<int32_t>& uids)
            : mPullerMatcher(matcher),
              mInterval(interval),
              mPrevPullElapsedRealtimeMs(0),
              mPullPackages(packages),
              mPullUids(uids) {
        }
        SimpleAtomMatcher mPullerMatcher;
        int64_t mInterval;
        int64_t mPrevPullElapsedRealtimeMs;
        std::vector<std::string> mPullPackages;
        std::vector<int32_t> mPullUids;
    };

    struct SubscriptionInfo {
        SubscriptionInfo(const int& inputFd, const int& outputFd)
            : mInputFd(inputFd), mOutputFd(outputFd), mClientAlive(true) {
        }

        int mInputFd;
        int mOutputFd;
        std::vector<SimpleAtomMatcher> mPushedMatchers;
        std::vector<PullInfo> mPulledInfo;
        bool mClientAlive;
    };

    int claimToken();

    bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo);

    void spawnHelperThread(int myToken);

    void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo,
                                        int myToken,
                                        std::unique_lock<std::mutex>& lock,
                                        int timeoutSec);

    // Helper thread that pulls atoms at a regular frequency and sends
    // heartbeats to perfd if statsd hasn't recently sent any data. Statsd must
    // send heartbeats for perfd to escape a blocking read call and recheck if
    // the user has terminated the subscription.
    void pullAndSendHeartbeats(int myToken);

    void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
                                const SimpleAtomMatcher& matcher);

    void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo);

    void attemptWriteToPipeLocked(size_t dataSize);

    sp<UidMap> mUidMap;

    sp<StatsPullerManager> mPullerMgr;

    android::util::ProtoOutputStream mProto;

    mutable std::mutex mMutex;

    std::condition_variable mSubscriptionShouldEnd;

    std::shared_ptr<SubscriptionInfo> mSubscriptionInfo = nullptr;

    int mToken = 0;

    const int32_t DEFAULT_PULL_UID = AID_SYSTEM;

    // Tracks when we last send data to perfd. We need that time to determine
    // when next to send a heartbeat.
    int64_t mLastWriteMs = 0;
    const int64_t kMsBetweenHeartbeats = 1000;
};

}  // namespace statsd
}  // namespace os
}  // namespace android