diff options
Diffstat (limited to 'services/incremental/IncrementalService.cpp')
-rw-r--r-- | services/incremental/IncrementalService.cpp | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp index 5f145f33f628..599ac9344e73 100644 --- a/services/incremental/IncrementalService.cpp +++ b/services/incremental/IncrementalService.cpp @@ -1801,6 +1801,31 @@ bool IncrementalService::unregisterLoadingProgressListener(StorageId storage) { return removeTimedJobs(*mProgressUpdateJobQueue, storage); } +bool IncrementalService::registerStorageHealthListener( + StorageId storage, StorageHealthCheckParams&& healthCheckParams, + const StorageHealthListener& healthListener) { + DataLoaderStubPtr dataLoaderStub; + { + std::unique_lock l(mLock); + const auto& ifs = getIfsLocked(storage); + if (!ifs) { + return false; + } + dataLoaderStub = ifs->dataLoaderStub; + if (!dataLoaderStub) { + return false; + } + } + dataLoaderStub->setHealthListener(std::move(healthCheckParams), &healthListener); + return true; +} + +void IncrementalService::unregisterStorageHealthListener(StorageId storage) { + StorageHealthCheckParams invalidCheckParams; + invalidCheckParams.blockedTimeoutMs = -1; + registerStorageHealthListener(storage, std::move(invalidCheckParams), {}); +} + bool IncrementalService::perfLoggingEnabled() { static const bool enabled = base::GetBoolProperty("incremental.perflogging", false); return enabled; @@ -2137,6 +2162,19 @@ binder::Status IncrementalService::DataLoaderStub::onStatusChanged(MountId mount binder::Status IncrementalService::DataLoaderStub::reportStreamHealth(MountId mountId, int newStatus) { + if (!isValid()) { + return binder::Status:: + fromServiceSpecificError(-EINVAL, + "reportStreamHealth came to invalid DataLoaderStub"); + } + if (id() != mountId) { + LOG(ERROR) << "Mount ID mismatch: expected " << id() << ", but got: " << mountId; + return binder::Status::fromServiceSpecificError(-EPERM, "Mount ID mismatch."); + } + { + std::lock_guard lock(mMutex); + mStreamStatus = newStatus; + } return binder::Status::ok(); } @@ -2153,6 +2191,33 @@ void IncrementalService::DataLoaderStub::onHealthStatus(StorageHealthListener he } } +static int adjustHealthStatus(int healthStatus, int streamStatus) { + if (healthStatus == IStorageHealthListener::HEALTH_STATUS_OK) { + // everything is good; no need to change status + return healthStatus; + } + int newHeathStatus = healthStatus; + switch (streamStatus) { + case IDataLoaderStatusListener::STREAM_STORAGE_ERROR: + // storage is limited and storage not healthy + newHeathStatus = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY_STORAGE; + break; + case IDataLoaderStatusListener::STREAM_INTEGRITY_ERROR: + // fall through + case IDataLoaderStatusListener::STREAM_SOURCE_ERROR: + // fall through + case IDataLoaderStatusListener::STREAM_TRANSPORT_ERROR: + if (healthStatus == IStorageHealthListener::HEALTH_STATUS_UNHEALTHY) { + newHeathStatus = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY_TRANSPORT; + } + // pending/blocked status due to transportation issues is not regarded as unhealthy + break; + default: + break; + } + return newHeathStatus; +} + void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) { LOG(DEBUG) << id() << ": updateHealthStatus" << (baseline ? " (baseline)" : ""); @@ -2232,6 +2297,8 @@ void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) { checkBackAfter = unhealthyMonitoring; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY; } + // Adjust health status based on stream status + healthStatusToReport = adjustHealthStatus(healthStatusToReport, mStreamStatus); LOG(DEBUG) << id() << ": updateHealthStatus in " << double(checkBackAfter.count()) / 1000.0 << "secs"; mService.addTimedJob(*mService.mTimedQueue, id(), checkBackAfter, @@ -2321,6 +2388,18 @@ void IncrementalService::DataLoaderStub::unregisterFromPendingReads() { mService.mLooper->wake(); } +void IncrementalService::DataLoaderStub::setHealthListener( + StorageHealthCheckParams&& healthCheckParams, const StorageHealthListener* healthListener) { + std::lock_guard lock(mMutex); + mHealthCheckParams = std::move(healthCheckParams); + if (healthListener == nullptr) { + // reset listener and params + mHealthListener = {}; + } else { + mHealthListener = *healthListener; + } +} + void IncrementalService::DataLoaderStub::onDump(int fd) { dprintf(fd, " dataLoader: {\n"); dprintf(fd, " currentStatus: %d\n", mCurrentStatus); |