diff options
Diffstat (limited to 'services/incremental/IncrementalService.cpp')
-rw-r--r-- | services/incremental/IncrementalService.cpp | 111 |
1 files changed, 36 insertions, 75 deletions
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp index 2fcb005e6df8..885f4d2d34d7 100644 --- a/services/incremental/IncrementalService.cpp +++ b/services/incremental/IncrementalService.cpp @@ -268,22 +268,14 @@ IncrementalService::IncrementalService(ServiceManagerWrapper&& sm, std::string_v mAppOpsManager(sm.getAppOpsManager()), mJni(sm.getJni()), mLooper(sm.getLooper()), + mTimedQueue(sm.getTimedQueue()), mIncrementalDir(rootDir) { - if (!mVold) { - LOG(FATAL) << "Vold service is unavailable"; - } - if (!mDataLoaderManager) { - LOG(FATAL) << "DataLoaderManagerService is unavailable"; - } - if (!mAppOpsManager) { - LOG(FATAL) << "AppOpsManager is unavailable"; - } - if (!mJni) { - LOG(FATAL) << "JNI is unavailable"; - } - if (!mLooper) { - LOG(FATAL) << "Looper is unavailable"; - } + CHECK(mVold) << "Vold service is unavailable"; + CHECK(mDataLoaderManager) << "DataLoaderManagerService is unavailable"; + CHECK(mAppOpsManager) << "AppOpsManager is unavailable"; + CHECK(mJni) << "JNI is unavailable"; + CHECK(mLooper) << "Looper is unavailable"; + CHECK(mTimedQueue) << "TimedQueue is unavailable"; mJobQueue.reserve(16); mJobProcessor = std::thread([this]() { @@ -294,10 +286,6 @@ IncrementalService::IncrementalService(ServiceManagerWrapper&& sm, std::string_v mJni->initializeForCurrentThread(); runCmdLooper(); }); - mTimerThread = std::thread([this]() { - mJni->initializeForCurrentThread(); - runTimers(); - }); const auto mountedRootNames = adoptMountedInstances(); mountExistingImages(mountedRootNames); @@ -310,10 +298,8 @@ IncrementalService::~IncrementalService() { } mJobCondition.notify_all(); mJobProcessor.join(); - mTimerCondition.notify_all(); - mTimerThread.join(); mCmdLooperThread.join(); - mTimedJobs.clear(); + mTimedQueue->stop(); // Ensure that mounts are destroyed while the service is still valid. mBindsByPath.clear(); mMounts.clear(); @@ -1710,53 +1696,18 @@ void IncrementalService::onAppOpChanged(const std::string& packageName) { } } -void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) { +void IncrementalService::addTimedJob(MountId id, Milliseconds after, Job what) { if (id == kInvalidStorageId) { return; } - { - std::unique_lock lock(mTimerMutex); - mTimedJobs.insert(TimedJob{id, when, std::move(what)}); - } - mTimerCondition.notify_all(); + mTimedQueue->addJob(id, after, std::move(what)); } void IncrementalService::removeTimedJobs(MountId id) { if (id == kInvalidStorageId) { return; } - { - std::unique_lock lock(mTimerMutex); - std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; }); - } -} - -void IncrementalService::runTimers() { - static constexpr TimePoint kInfinityTs{Clock::duration::max()}; - TimePoint nextTaskTs = kInfinityTs; - for (;;) { - std::unique_lock lock(mTimerMutex); - mTimerCondition.wait_until(lock, nextTaskTs, [this]() { - auto now = Clock::now(); - return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now); - }); - if (!mRunning) { - return; - } - - auto now = Clock::now(); - auto it = mTimedJobs.begin(); - // Always acquire begin(). We can't use it after unlock as mTimedJobs can change. - for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) { - auto job = it->what; - mTimedJobs.erase(it); - - lock.unlock(); - job(); - lock.lock(); - } - nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs; - } + mTimedQueue->removeJobs(id); } IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id, @@ -2029,8 +1980,8 @@ void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) { mHealthBase = {now, kernelTsUs}; } - if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now || - mHealthBase.kernelTsUs > kernelTsUs) { + if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.kernelTsUs == kMaxBootClockTsUs || + mHealthBase.userTs > now) { LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait."; registerForPendingReads(); healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK; @@ -2056,6 +2007,9 @@ void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) { return; } + // Don't schedule timer job less than 500ms in advance. + static constexpr auto kTolerance = 500ms; + const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs); const auto unhealthyTimeout = std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs); @@ -2065,31 +2019,28 @@ void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) { const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs; const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs); - const auto delta = now - userTs; + const auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(now - userTs); - TimePoint whenToCheckBack; - if (delta < blockedTimeout) { + Milliseconds checkBackAfter; + if (delta + kTolerance < blockedTimeout) { LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status."; - whenToCheckBack = userTs + blockedTimeout; + checkBackAfter = blockedTimeout - delta; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING; - } else if (delta < unhealthyTimeout) { + } else if (delta + kTolerance < unhealthyTimeout) { LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy."; - whenToCheckBack = userTs + unhealthyTimeout; + checkBackAfter = unhealthyTimeout - delta; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED; } else { LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring."; - whenToCheckBack = now + unhealthyMonitoring; + checkBackAfter = unhealthyMonitoring; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY; } - LOG(DEBUG) << id() << ": updateHealthStatus in " - << double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack - - now) - .count()) / - 1000.0 + LOG(DEBUG) << id() << ": updateHealthStatus in " << double(checkBackAfter.count()) / 1000.0 << "secs"; - mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); }); + mService.addTimedJob(id(), checkBackAfter, [this]() { updateHealthStatus(); }); } + // With kTolerance we are expecting these to execute before the next update. if (healthStatusToReport != -1) { onHealthStatus(healthListener, healthStatusToReport); } @@ -2178,6 +2129,16 @@ void IncrementalService::DataLoaderStub::onDump(int fd) { dprintf(fd, " targetStatus: %d\n", mTargetStatus); dprintf(fd, " targetStatusTs: %lldmcs\n", (long long)(elapsedMcs(mTargetStatusTs, Clock::now()))); + dprintf(fd, " health: {\n"); + dprintf(fd, " path: %s\n", mHealthPath.c_str()); + dprintf(fd, " base: %lldmcs (%lld)\n", + (long long)(elapsedMcs(mHealthBase.userTs, Clock::now())), + (long long)mHealthBase.kernelTsUs); + dprintf(fd, " blockedTimeoutMs: %d\n", int(mHealthCheckParams.blockedTimeoutMs)); + dprintf(fd, " unhealthyTimeoutMs: %d\n", int(mHealthCheckParams.unhealthyTimeoutMs)); + dprintf(fd, " unhealthyMonitoringMs: %d\n", + int(mHealthCheckParams.unhealthyMonitoringMs)); + dprintf(fd, " }\n"); const auto& params = mParams; dprintf(fd, " dataLoaderParams: {\n"); dprintf(fd, " type: %s\n", toString(params.type).c_str()); |