summaryrefslogtreecommitdiff
path: root/cmds/statsd/src/external/StatsCallbackPuller.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cmds/statsd/src/external/StatsCallbackPuller.cpp')
-rw-r--r--cmds/statsd/src/external/StatsCallbackPuller.cpp89
1 files changed, 72 insertions, 17 deletions
diff --git a/cmds/statsd/src/external/StatsCallbackPuller.cpp b/cmds/statsd/src/external/StatsCallbackPuller.cpp
index d718273e9b85..78e6f094db7e 100644
--- a/cmds/statsd/src/external/StatsCallbackPuller.cpp
+++ b/cmds/statsd/src/external/StatsCallbackPuller.cpp
@@ -17,43 +17,98 @@
#define DEBUG false // STOPSHIP if true
#include "Log.h"
-#include <android/os/IStatsPullerCallback.h>
-
#include "StatsCallbackPuller.h"
+#include "PullResultReceiver.h"
+#include "StatsPullerManager.h"
#include "logd/LogEvent.h"
#include "stats_log_util.h"
-using namespace android::binder;
+#include <aidl/android/util/StatsEventParcel.h>
+
+using namespace std;
+
+using Status = ::ndk::ScopedAStatus;
+using aidl::android::util::StatsEventParcel;
+using ::ndk::SharedRefBase;
namespace android {
namespace os {
namespace statsd {
-StatsCallbackPuller::StatsCallbackPuller(int tagId, const sp<IStatsPullerCallback>& callback) :
- StatsPuller(tagId), mCallback(callback) {
- VLOG("StatsCallbackPuller created for tag %d", tagId);
+StatsCallbackPuller::StatsCallbackPuller(int tagId, const shared_ptr<IPullAtomCallback>& callback,
+ const int64_t coolDownNs, int64_t timeoutNs,
+ const vector<int> additiveFields)
+ : StatsPuller(tagId, coolDownNs, timeoutNs, additiveFields), mCallback(callback) {
+ VLOG("StatsCallbackPuller created for tag %d", tagId);
}
bool StatsCallbackPuller::PullInternal(vector<shared_ptr<LogEvent>>* data) {
- VLOG("StatsCallbackPuller called for tag %d", mTagId)
+ VLOG("StatsCallbackPuller called for tag %d", mTagId);
if(mCallback == nullptr) {
ALOGW("No callback registered");
return false;
}
- int64_t wallClockTimeNs = getWallClockNs();
- int64_t elapsedTimeNs = getElapsedRealtimeNs();
- vector<StatsLogEventWrapper> returned_value;
- Status status = mCallback->pullData(mTagId, elapsedTimeNs, wallClockTimeNs, &returned_value);
+
+ // Shared variables needed in the result receiver.
+ shared_ptr<mutex> cv_mutex = make_shared<mutex>();
+ shared_ptr<condition_variable> cv = make_shared<condition_variable>();
+ shared_ptr<bool> pullFinish = make_shared<bool>(false);
+ shared_ptr<bool> pullSuccess = make_shared<bool>(false);
+ shared_ptr<vector<shared_ptr<LogEvent>>> sharedData =
+ make_shared<vector<shared_ptr<LogEvent>>>();
+
+ shared_ptr<PullResultReceiver> resultReceiver = SharedRefBase::make<PullResultReceiver>(
+ [cv_mutex, cv, pullFinish, pullSuccess, sharedData](
+ int32_t atomTag, bool success, const vector<StatsEventParcel>& output) {
+ // This is the result of the pull, executing in a statsd binder thread.
+ // The pull could have taken a long time, and we should only modify
+ // data (the output param) if the pointer is in scope and the pull did not time out.
+ {
+ lock_guard<mutex> lk(*cv_mutex);
+ for (const StatsEventParcel& parcel: output) {
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(/*uid=*/-1, /*pid=*/-1);
+ bool valid = event->parseBuffer((uint8_t*)parcel.buffer.data(),
+ parcel.buffer.size());
+ if (valid) {
+ sharedData->push_back(event);
+ } else {
+ StatsdStats::getInstance().noteAtomError(event->GetTagId(),
+ /*pull=*/true);
+ }
+ }
+ *pullSuccess = success;
+ *pullFinish = true;
+ }
+ cv->notify_one();
+ });
+
+ // Initiate the pull. This is a oneway call to a different process, except
+ // in unit tests. In process calls are not oneway.
+ Status status = mCallback->onPullAtom(mTagId, resultReceiver);
if (!status.isOk()) {
- ALOGW("StatsCallbackPuller::pull failed for %d", mTagId);
+ StatsdStats::getInstance().notePullBinderCallFailed(mTagId);
return false;
}
- data->clear();
- for (const StatsLogEventWrapper& it: returned_value) {
- LogEvent::createLogEvents(it, *data);
+
+ {
+ unique_lock<mutex> unique_lk(*cv_mutex);
+ // Wait until the pull finishes, or until the pull timeout.
+ cv->wait_for(unique_lk, chrono::nanoseconds(mPullTimeoutNs),
+ [pullFinish] { return *pullFinish; });
+ if (!*pullFinish) {
+ // Note: The parent stats puller will also note that there was a timeout and that the
+ // cache should be cleared. Once we migrate all pullers to this callback, we could
+ // consolidate the logic.
+ return true;
+ } else {
+ // Only copy the data if we did not timeout and the pull was successful.
+ if (*pullSuccess) {
+ *data = std::move(*sharedData);
+ }
+ VLOG("StatsCallbackPuller::pull succeeded for %d", mTagId);
+ return *pullSuccess;
+ }
}
- VLOG("StatsCallbackPuller::pull succeeded for %d", mTagId);
- return true;
}
} // namespace statsd