1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
/*
* Copyright (C) 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define DEBUG true // STOPSHIP if true
#include "Log.h"
#include "ShellSubscriber.h"
#include "matchers/matcher_util.h"
#include <android-base/file.h>
using android::util::ProtoOutputStream;
namespace android {
namespace os {
namespace statsd {
void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver) {
VLOG("start new shell subscription");
{
std::lock_guard<std::mutex> lock(mMutex);
if (mResultReceiver != nullptr) {
VLOG("Only one shell subscriber is allowed.");
return;
}
mInput = in;
mOutput = out;
mResultReceiver = resultReceiver;
IInterface::asBinder(mResultReceiver)->linkToDeath(this);
}
// Spawn another thread to read the config updates from the input file descriptor
std::thread reader([in, this] { readConfig(in); });
reader.detach();
std::unique_lock<std::mutex> lk(mMutex);
mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
if (reader.joinable()) {
reader.join();
}
}
void ShellSubscriber::updateConfig(const ShellSubscription& config) {
std::lock_guard<std::mutex> lock(mMutex);
mPushedMatchers.clear();
for (const auto& pushed : config.pushed()) {
mPushedMatchers.push_back(pushed);
VLOG("adding matcher for atom %d", pushed.atom_id());
}
}
void ShellSubscriber::readConfig(int in) {
if (in <= 0) {
return;
}
while (1) {
size_t bufferSize = 0;
int result = 0;
if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) {
VLOG("Done reading");
break;
} else if (result < 0 || result != sizeof(bufferSize)) {
ALOGE("Error reading config size");
break;
}
vector<uint8_t> buffer(bufferSize);
if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) {
ShellSubscription config;
if (config.ParseFromArray(buffer.data(), bufferSize)) {
updateConfig(config);
} else {
ALOGE("error parsing the config");
break;
}
} else {
VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize);
break;
}
}
}
void ShellSubscriber::cleanUpLocked() {
// The file descriptors will be closed by binder.
mInput = 0;
mOutput = 0;
mResultReceiver = nullptr;
mPushedMatchers.clear();
VLOG("done clean up");
}
void ShellSubscriber::onLogEvent(const LogEvent& event) {
std::lock_guard<std::mutex> lock(mMutex);
if (mOutput <= 0) {
return;
}
for (const auto& matcher : mPushedMatchers) {
if (matchesSimple(*mUidMap, matcher, event)) {
// First write the payload size.
size_t bufferSize = mProto.size();
write(mOutput, &bufferSize, sizeof(bufferSize));
// Then write the payload.
event.ToProto(mProto);
mProto.flush(mOutput);
mProto.clear();
break;
}
}
}
void ShellSubscriber::binderDied(const wp<IBinder>& who) {
{
VLOG("Shell exits");
std::lock_guard<std::mutex> lock(mMutex);
cleanUpLocked();
}
mShellDied.notify_all();
}
} // namespace statsd
} // namespace os
} // namespace android
|