summaryrefslogtreecommitdiff
path: root/services/incremental/IncrementalService.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'services/incremental/IncrementalService.cpp')
-rw-r--r--services/incremental/IncrementalService.cpp79
1 files changed, 79 insertions, 0 deletions
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp
index 5f145f33f628..599ac9344e73 100644
--- a/services/incremental/IncrementalService.cpp
+++ b/services/incremental/IncrementalService.cpp
@@ -1801,6 +1801,31 @@ bool IncrementalService::unregisterLoadingProgressListener(StorageId storage) {
return removeTimedJobs(*mProgressUpdateJobQueue, storage);
}
+bool IncrementalService::registerStorageHealthListener(
+ StorageId storage, StorageHealthCheckParams&& healthCheckParams,
+ const StorageHealthListener& healthListener) {
+ DataLoaderStubPtr dataLoaderStub;
+ {
+ std::unique_lock l(mLock);
+ const auto& ifs = getIfsLocked(storage);
+ if (!ifs) {
+ return false;
+ }
+ dataLoaderStub = ifs->dataLoaderStub;
+ if (!dataLoaderStub) {
+ return false;
+ }
+ }
+ dataLoaderStub->setHealthListener(std::move(healthCheckParams), &healthListener);
+ return true;
+}
+
+void IncrementalService::unregisterStorageHealthListener(StorageId storage) {
+ StorageHealthCheckParams invalidCheckParams;
+ invalidCheckParams.blockedTimeoutMs = -1;
+ registerStorageHealthListener(storage, std::move(invalidCheckParams), {});
+}
+
bool IncrementalService::perfLoggingEnabled() {
static const bool enabled = base::GetBoolProperty("incremental.perflogging", false);
return enabled;
@@ -2137,6 +2162,19 @@ binder::Status IncrementalService::DataLoaderStub::onStatusChanged(MountId mount
binder::Status IncrementalService::DataLoaderStub::reportStreamHealth(MountId mountId,
int newStatus) {
+ if (!isValid()) {
+ return binder::Status::
+ fromServiceSpecificError(-EINVAL,
+ "reportStreamHealth came to invalid DataLoaderStub");
+ }
+ if (id() != mountId) {
+ LOG(ERROR) << "Mount ID mismatch: expected " << id() << ", but got: " << mountId;
+ return binder::Status::fromServiceSpecificError(-EPERM, "Mount ID mismatch.");
+ }
+ {
+ std::lock_guard lock(mMutex);
+ mStreamStatus = newStatus;
+ }
return binder::Status::ok();
}
@@ -2153,6 +2191,33 @@ void IncrementalService::DataLoaderStub::onHealthStatus(StorageHealthListener he
}
}
+static int adjustHealthStatus(int healthStatus, int streamStatus) {
+ if (healthStatus == IStorageHealthListener::HEALTH_STATUS_OK) {
+ // everything is good; no need to change status
+ return healthStatus;
+ }
+ int newHeathStatus = healthStatus;
+ switch (streamStatus) {
+ case IDataLoaderStatusListener::STREAM_STORAGE_ERROR:
+ // storage is limited and storage not healthy
+ newHeathStatus = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY_STORAGE;
+ break;
+ case IDataLoaderStatusListener::STREAM_INTEGRITY_ERROR:
+ // fall through
+ case IDataLoaderStatusListener::STREAM_SOURCE_ERROR:
+ // fall through
+ case IDataLoaderStatusListener::STREAM_TRANSPORT_ERROR:
+ if (healthStatus == IStorageHealthListener::HEALTH_STATUS_UNHEALTHY) {
+ newHeathStatus = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY_TRANSPORT;
+ }
+ // pending/blocked status due to transportation issues is not regarded as unhealthy
+ break;
+ default:
+ break;
+ }
+ return newHeathStatus;
+}
+
void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) {
LOG(DEBUG) << id() << ": updateHealthStatus" << (baseline ? " (baseline)" : "");
@@ -2232,6 +2297,8 @@ void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) {
checkBackAfter = unhealthyMonitoring;
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY;
}
+ // Adjust health status based on stream status
+ healthStatusToReport = adjustHealthStatus(healthStatusToReport, mStreamStatus);
LOG(DEBUG) << id() << ": updateHealthStatus in " << double(checkBackAfter.count()) / 1000.0
<< "secs";
mService.addTimedJob(*mService.mTimedQueue, id(), checkBackAfter,
@@ -2321,6 +2388,18 @@ void IncrementalService::DataLoaderStub::unregisterFromPendingReads() {
mService.mLooper->wake();
}
+void IncrementalService::DataLoaderStub::setHealthListener(
+ StorageHealthCheckParams&& healthCheckParams, const StorageHealthListener* healthListener) {
+ std::lock_guard lock(mMutex);
+ mHealthCheckParams = std::move(healthCheckParams);
+ if (healthListener == nullptr) {
+ // reset listener and params
+ mHealthListener = {};
+ } else {
+ mHealthListener = *healthListener;
+ }
+}
+
void IncrementalService::DataLoaderStub::onDump(int fd) {
dprintf(fd, " dataLoader: {\n");
dprintf(fd, " currentStatus: %d\n", mCurrentStatus);