diff options
Diffstat (limited to 'services/incremental/IncrementalService.cpp')
-rw-r--r-- | services/incremental/IncrementalService.cpp | 288 |
1 files changed, 239 insertions, 49 deletions
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp index 66c7717d7987..2fcb005e6df8 100644 --- a/services/incremental/IncrementalService.cpp +++ b/services/incremental/IncrementalService.cpp @@ -294,6 +294,10 @@ IncrementalService::IncrementalService(ServiceManagerWrapper&& sm, std::string_v mJni->initializeForCurrentThread(); runCmdLooper(); }); + mTimerThread = std::thread([this]() { + mJni->initializeForCurrentThread(); + runTimers(); + }); const auto mountedRootNames = adoptMountedInstances(); mountExistingImages(mountedRootNames); @@ -306,7 +310,13 @@ IncrementalService::~IncrementalService() { } mJobCondition.notify_all(); mJobProcessor.join(); + mTimerCondition.notify_all(); + mTimerThread.join(); mCmdLooperThread.join(); + mTimedJobs.clear(); + // Ensure that mounts are destroyed while the service is still valid. + mBindsByPath.clear(); + mMounts.clear(); } static const char* toString(IncrementalService::BindKind kind) { @@ -1700,6 +1710,55 @@ void IncrementalService::onAppOpChanged(const std::string& packageName) { } } +void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) { + if (id == kInvalidStorageId) { + return; + } + { + std::unique_lock lock(mTimerMutex); + mTimedJobs.insert(TimedJob{id, when, std::move(what)}); + } + mTimerCondition.notify_all(); +} + +void IncrementalService::removeTimedJobs(MountId id) { + if (id == kInvalidStorageId) { + return; + } + { + std::unique_lock lock(mTimerMutex); + std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; }); + } +} + +void IncrementalService::runTimers() { + static constexpr TimePoint kInfinityTs{Clock::duration::max()}; + TimePoint nextTaskTs = kInfinityTs; + for (;;) { + std::unique_lock lock(mTimerMutex); + mTimerCondition.wait_until(lock, nextTaskTs, [this]() { + auto now = Clock::now(); + return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now); + }); + if (!mRunning) { + return; + } + + auto now = Clock::now(); + auto it = mTimedJobs.begin(); + // Always acquire begin(). We can't use it after unlock as mTimedJobs can change. + for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) { + auto job = it->what; + mTimedJobs.erase(it); + + lock.unlock(); + job(); + lock.lock(); + } + nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs; + } +} + IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id, DataLoaderParamsParcel&& params, FileSystemControlParcel&& control, @@ -1713,10 +1772,17 @@ IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, mControl(std::move(control)), mStatusListener(statusListener ? *statusListener : DataLoaderStatusListener()), mHealthListener(healthListener ? *healthListener : StorageHealthListener()), - mHealthPath(std::move(healthPath)) { - // TODO(b/153874006): enable external health listener. - mHealthListener = {}; - healthStatusOk(); + mHealthPath(std::move(healthPath)), + mHealthCheckParams(std::move(healthCheckParams)) { + if (mHealthListener) { + if (!isHealthParamsValid()) { + mHealthListener = {}; + } + } else { + // Disable advanced health check statuses. + mHealthCheckParams.blockedTimeoutMs = -1; + } + updateHealthStatus(); } IncrementalService::DataLoaderStub::~DataLoaderStub() { @@ -1726,21 +1792,29 @@ IncrementalService::DataLoaderStub::~DataLoaderStub() { } void IncrementalService::DataLoaderStub::cleanupResources() { - requestDestroy(); - auto now = Clock::now(); - std::unique_lock lock(mMutex); + { + std::unique_lock lock(mMutex); + mHealthPath.clear(); + unregisterFromPendingReads(); + resetHealthControl(); + mService.removeTimedJobs(mId); + } - unregisterFromPendingReads(); + requestDestroy(); - mParams = {}; - mControl = {}; - mStatusCondition.wait_until(lock, now + 60s, [this] { - return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED; - }); - mStatusListener = {}; - mHealthListener = {}; - mId = kInvalidStorageId; + { + std::unique_lock lock(mMutex); + mParams = {}; + mControl = {}; + mHealthControl = {}; + mHealthListener = {}; + mStatusCondition.wait_until(lock, now + 60s, [this] { + return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED; + }); + mStatusListener = {}; + mId = kInvalidStorageId; + } } sp<content::pm::IDataLoader> IncrementalService::DataLoaderStub::getDataLoader() { @@ -1838,7 +1912,7 @@ bool IncrementalService::DataLoaderStub::fsmStep() { targetStatus = mTargetStatus; } - LOG(DEBUG) << "fsmStep: " << mId << ": " << currentStatus << " -> " << targetStatus; + LOG(DEBUG) << "fsmStep: " << id() << ": " << currentStatus << " -> " << targetStatus; if (currentStatus == targetStatus) { return true; @@ -1920,42 +1994,167 @@ binder::Status IncrementalService::DataLoaderStub::onStatusChanged(MountId mount return binder::Status::ok(); } -void IncrementalService::DataLoaderStub::healthStatusOk() { - LOG(DEBUG) << "healthStatusOk: " << mId; - std::unique_lock lock(mMutex); - registerForPendingReads(); +bool IncrementalService::DataLoaderStub::isHealthParamsValid() const { + return mHealthCheckParams.blockedTimeoutMs > 0 && + mHealthCheckParams.blockedTimeoutMs < mHealthCheckParams.unhealthyTimeoutMs; +} + +void IncrementalService::DataLoaderStub::onHealthStatus(StorageHealthListener healthListener, + int healthStatus) { + LOG(DEBUG) << id() << ": healthStatus: " << healthStatus; + if (healthListener) { + healthListener->onHealthStatus(id(), healthStatus); + } +} + +void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) { + LOG(DEBUG) << id() << ": updateHealthStatus" << (baseline ? " (baseline)" : ""); + + int healthStatusToReport = -1; + StorageHealthListener healthListener; + + { + std::unique_lock lock(mMutex); + unregisterFromPendingReads(); + + healthListener = mHealthListener; + + // Healthcheck depends on timestamp of the oldest pending read. + // To get it, we need to re-open a pendingReads FD to get a full list of reads. + // Additionally we need to re-register for epoll with fresh FDs in case there are no reads. + const auto now = Clock::now(); + const auto kernelTsUs = getOldestPendingReadTs(); + if (baseline) { + // Updating baseline only on looper/epoll callback, i.e. on new set of pending reads. + mHealthBase = {now, kernelTsUs}; + } + + if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now || + mHealthBase.kernelTsUs > kernelTsUs) { + LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait."; + registerForPendingReads(); + healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK; + lock.unlock(); + onHealthStatus(healthListener, healthStatusToReport); + return; + } + + resetHealthControl(); + + // Always make sure the data loader is started. + setTargetStatusLocked(IDataLoaderStatusListener::DATA_LOADER_STARTED); + + // Skip any further processing if health check params are invalid. + if (!isHealthParamsValid()) { + LOG(DEBUG) << id() + << ": Skip any further processing if health check params are invalid."; + healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING; + lock.unlock(); + onHealthStatus(healthListener, healthStatusToReport); + // Triggering data loader start. This is a one-time action. + fsmStep(); + return; + } + + const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs); + const auto unhealthyTimeout = + std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs); + const auto unhealthyMonitoring = + std::max(1000ms, + std::chrono::milliseconds(mHealthCheckParams.unhealthyMonitoringMs)); + + const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs; + const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs); + const auto delta = now - userTs; + + TimePoint whenToCheckBack; + if (delta < blockedTimeout) { + LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status."; + whenToCheckBack = userTs + blockedTimeout; + healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING; + } else if (delta < unhealthyTimeout) { + LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy."; + whenToCheckBack = userTs + unhealthyTimeout; + healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED; + } else { + LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring."; + whenToCheckBack = now + unhealthyMonitoring; + healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY; + } + LOG(DEBUG) << id() << ": updateHealthStatus in " + << double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack - + now) + .count()) / + 1000.0 + << "secs"; + mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); }); + } + + if (healthStatusToReport != -1) { + onHealthStatus(healthListener, healthStatusToReport); + } + + fsmStep(); } -void IncrementalService::DataLoaderStub::healthStatusReadsPending() { - LOG(DEBUG) << "healthStatusReadsPending: " << mId; - requestStart(); +const incfs::UniqueControl& IncrementalService::DataLoaderStub::initializeHealthControl() { + if (mHealthPath.empty()) { + resetHealthControl(); + return mHealthControl; + } + if (mHealthControl.pendingReads() < 0) { + mHealthControl = mService.mIncFs->openMount(mHealthPath); + } + if (mHealthControl.pendingReads() < 0) { + LOG(ERROR) << "Failed to open health control for: " << id() << ", path: " << mHealthPath + << "(" << mHealthControl.cmd() << ":" << mHealthControl.pendingReads() << ":" + << mHealthControl.logs() << ")"; + } + return mHealthControl; +} - std::unique_lock lock(mMutex); - unregisterFromPendingReads(); +void IncrementalService::DataLoaderStub::resetHealthControl() { + mHealthControl = {}; } -void IncrementalService::DataLoaderStub::healthStatusBlocked() {} +BootClockTsUs IncrementalService::DataLoaderStub::getOldestPendingReadTs() { + auto result = kMaxBootClockTsUs; + + const auto& control = initializeHealthControl(); + if (control.pendingReads() < 0) { + return result; + } + + std::vector<incfs::ReadInfo> pendingReads; + if (mService.mIncFs->waitForPendingReads(control, 0ms, &pendingReads) != + android::incfs::WaitResult::HaveData || + pendingReads.empty()) { + return result; + } + + LOG(DEBUG) << id() << ": pendingReads: " << control.pendingReads() << ", " + << pendingReads.size() << ": " << pendingReads.front().bootClockTsUs; -void IncrementalService::DataLoaderStub::healthStatusUnhealthy() {} + for (auto&& pendingRead : pendingReads) { + result = std::min(result, pendingRead.bootClockTsUs); + } + return result; +} void IncrementalService::DataLoaderStub::registerForPendingReads() { - auto pendingReadsFd = mHealthControl.pendingReads(); + const auto pendingReadsFd = mHealthControl.pendingReads(); if (pendingReadsFd < 0) { - mHealthControl = mService.mIncFs->openMount(mHealthPath); - pendingReadsFd = mHealthControl.pendingReads(); - if (pendingReadsFd < 0) { - LOG(ERROR) << "Failed to open health control for: " << mId << ", path: " << mHealthPath - << "(" << mHealthControl.cmd() << ":" << mHealthControl.pendingReads() << ":" - << mHealthControl.logs() << ")"; - return; - } + return; } + LOG(DEBUG) << id() << ": addFd(pendingReadsFd): " << pendingReadsFd; + mService.mLooper->addFd( pendingReadsFd, android::Looper::POLL_CALLBACK, android::Looper::EVENT_INPUT, [](int, int, void* data) -> int { auto&& self = (DataLoaderStub*)data; - return self->onPendingReads(); + self->updateHealthStatus(/*baseline=*/true); + return 0; }, this); mService.mLooper->wake(); @@ -1967,19 +2166,10 @@ void IncrementalService::DataLoaderStub::unregisterFromPendingReads() { return; } + LOG(DEBUG) << id() << ": removeFd(pendingReadsFd): " << pendingReadsFd; + mService.mLooper->removeFd(pendingReadsFd); mService.mLooper->wake(); - - mHealthControl = {}; -} - -int IncrementalService::DataLoaderStub::onPendingReads() { - if (!mService.mRunning.load(std::memory_order_relaxed)) { - return 0; - } - - healthStatusReadsPending(); - return 0; } void IncrementalService::DataLoaderStub::onDump(int fd) { |