diff options
author | TreeHugger Robot <treehugger-gerrit@google.com> | 2020-05-29 19:19:06 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2020-05-29 19:19:06 +0000 |
commit | 27ee72a7943b96e0465fa093606c7c2c07aa09be (patch) | |
tree | c7f2a48e184592fa2c9cc548c40f23ee0daaf55d | |
parent | 4f919cdef00e4ce0b850214189609736cc0821ee (diff) | |
parent | 825ad11167f28574c76ae239396877a5a4115857 (diff) |
Merge "Lifecycle: detecting blocked and unhealthy, part 2." into rvc-dev am: 825ad11167
Change-Id: I5dd609fb62c658b056c2b32efe85354ec90ffea3
-rw-r--r-- | services/incremental/IncrementalService.cpp | 288 | ||||
-rw-r--r-- | services/incremental/IncrementalService.h | 59 | ||||
-rw-r--r-- | services/incremental/test/IncrementalServiceTest.cpp | 2 |
3 files changed, 282 insertions, 67 deletions
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp index 66c7717d7987..2fcb005e6df8 100644 --- a/services/incremental/IncrementalService.cpp +++ b/services/incremental/IncrementalService.cpp @@ -294,6 +294,10 @@ IncrementalService::IncrementalService(ServiceManagerWrapper&& sm, std::string_v mJni->initializeForCurrentThread(); runCmdLooper(); }); + mTimerThread = std::thread([this]() { + mJni->initializeForCurrentThread(); + runTimers(); + }); const auto mountedRootNames = adoptMountedInstances(); mountExistingImages(mountedRootNames); @@ -306,7 +310,13 @@ IncrementalService::~IncrementalService() { } mJobCondition.notify_all(); mJobProcessor.join(); + mTimerCondition.notify_all(); + mTimerThread.join(); mCmdLooperThread.join(); + mTimedJobs.clear(); + // Ensure that mounts are destroyed while the service is still valid. + mBindsByPath.clear(); + mMounts.clear(); } static const char* toString(IncrementalService::BindKind kind) { @@ -1700,6 +1710,55 @@ void IncrementalService::onAppOpChanged(const std::string& packageName) { } } +void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) { + if (id == kInvalidStorageId) { + return; + } + { + std::unique_lock lock(mTimerMutex); + mTimedJobs.insert(TimedJob{id, when, std::move(what)}); + } + mTimerCondition.notify_all(); +} + +void IncrementalService::removeTimedJobs(MountId id) { + if (id == kInvalidStorageId) { + return; + } + { + std::unique_lock lock(mTimerMutex); + std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; }); + } +} + +void IncrementalService::runTimers() { + static constexpr TimePoint kInfinityTs{Clock::duration::max()}; + TimePoint nextTaskTs = kInfinityTs; + for (;;) { + std::unique_lock lock(mTimerMutex); + mTimerCondition.wait_until(lock, nextTaskTs, [this]() { + auto now = Clock::now(); + return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now); + }); + if (!mRunning) { + return; + } + + auto now = Clock::now(); + auto it = mTimedJobs.begin(); + // Always acquire begin(). We can't use it after unlock as mTimedJobs can change. + for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) { + auto job = it->what; + mTimedJobs.erase(it); + + lock.unlock(); + job(); + lock.lock(); + } + nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs; + } +} + IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id, DataLoaderParamsParcel&& params, FileSystemControlParcel&& control, @@ -1713,10 +1772,17 @@ IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, mControl(std::move(control)), mStatusListener(statusListener ? *statusListener : DataLoaderStatusListener()), mHealthListener(healthListener ? *healthListener : StorageHealthListener()), - mHealthPath(std::move(healthPath)) { - // TODO(b/153874006): enable external health listener. - mHealthListener = {}; - healthStatusOk(); + mHealthPath(std::move(healthPath)), + mHealthCheckParams(std::move(healthCheckParams)) { + if (mHealthListener) { + if (!isHealthParamsValid()) { + mHealthListener = {}; + } + } else { + // Disable advanced health check statuses. + mHealthCheckParams.blockedTimeoutMs = -1; + } + updateHealthStatus(); } IncrementalService::DataLoaderStub::~DataLoaderStub() { @@ -1726,21 +1792,29 @@ IncrementalService::DataLoaderStub::~DataLoaderStub() { } void IncrementalService::DataLoaderStub::cleanupResources() { - requestDestroy(); - auto now = Clock::now(); - std::unique_lock lock(mMutex); + { + std::unique_lock lock(mMutex); + mHealthPath.clear(); + unregisterFromPendingReads(); + resetHealthControl(); + mService.removeTimedJobs(mId); + } - unregisterFromPendingReads(); + requestDestroy(); - mParams = {}; - mControl = {}; - mStatusCondition.wait_until(lock, now + 60s, [this] { - return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED; - }); - mStatusListener = {}; - mHealthListener = {}; - mId = kInvalidStorageId; + { + std::unique_lock lock(mMutex); + mParams = {}; + mControl = {}; + mHealthControl = {}; + mHealthListener = {}; + mStatusCondition.wait_until(lock, now + 60s, [this] { + return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED; + }); + mStatusListener = {}; + mId = kInvalidStorageId; + } } sp<content::pm::IDataLoader> IncrementalService::DataLoaderStub::getDataLoader() { @@ -1838,7 +1912,7 @@ bool IncrementalService::DataLoaderStub::fsmStep() { targetStatus = mTargetStatus; } - LOG(DEBUG) << "fsmStep: " << mId << ": " << currentStatus << " -> " << targetStatus; + LOG(DEBUG) << "fsmStep: " << id() << ": " << currentStatus << " -> " << targetStatus; if (currentStatus == targetStatus) { return true; @@ -1920,42 +1994,167 @@ binder::Status IncrementalService::DataLoaderStub::onStatusChanged(MountId mount return binder::Status::ok(); } -void IncrementalService::DataLoaderStub::healthStatusOk() { - LOG(DEBUG) << "healthStatusOk: " << mId; - std::unique_lock lock(mMutex); - registerForPendingReads(); +bool IncrementalService::DataLoaderStub::isHealthParamsValid() const { + return mHealthCheckParams.blockedTimeoutMs > 0 && + mHealthCheckParams.blockedTimeoutMs < mHealthCheckParams.unhealthyTimeoutMs; +} + +void IncrementalService::DataLoaderStub::onHealthStatus(StorageHealthListener healthListener, + int healthStatus) { + LOG(DEBUG) << id() << ": healthStatus: " << healthStatus; + if (healthListener) { + healthListener->onHealthStatus(id(), healthStatus); + } +} + +void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) { + LOG(DEBUG) << id() << ": updateHealthStatus" << (baseline ? " (baseline)" : ""); + + int healthStatusToReport = -1; + StorageHealthListener healthListener; + + { + std::unique_lock lock(mMutex); + unregisterFromPendingReads(); + + healthListener = mHealthListener; + + // Healthcheck depends on timestamp of the oldest pending read. + // To get it, we need to re-open a pendingReads FD to get a full list of reads. + // Additionally we need to re-register for epoll with fresh FDs in case there are no reads. + const auto now = Clock::now(); + const auto kernelTsUs = getOldestPendingReadTs(); + if (baseline) { + // Updating baseline only on looper/epoll callback, i.e. on new set of pending reads. + mHealthBase = {now, kernelTsUs}; + } + + if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now || + mHealthBase.kernelTsUs > kernelTsUs) { + LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait."; + registerForPendingReads(); + healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK; + lock.unlock(); + onHealthStatus(healthListener, healthStatusToReport); + return; + } + + resetHealthControl(); + + // Always make sure the data loader is started. + setTargetStatusLocked(IDataLoaderStatusListener::DATA_LOADER_STARTED); + + // Skip any further processing if health check params are invalid. + if (!isHealthParamsValid()) { + LOG(DEBUG) << id() + << ": Skip any further processing if health check params are invalid."; + healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING; + lock.unlock(); + onHealthStatus(healthListener, healthStatusToReport); + // Triggering data loader start. This is a one-time action. + fsmStep(); + return; + } + + const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs); + const auto unhealthyTimeout = + std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs); + const auto unhealthyMonitoring = + std::max(1000ms, + std::chrono::milliseconds(mHealthCheckParams.unhealthyMonitoringMs)); + + const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs; + const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs); + const auto delta = now - userTs; + + TimePoint whenToCheckBack; + if (delta < blockedTimeout) { + LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status."; + whenToCheckBack = userTs + blockedTimeout; + healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING; + } else if (delta < unhealthyTimeout) { + LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy."; + whenToCheckBack = userTs + unhealthyTimeout; + healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED; + } else { + LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring."; + whenToCheckBack = now + unhealthyMonitoring; + healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY; + } + LOG(DEBUG) << id() << ": updateHealthStatus in " + << double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack - + now) + .count()) / + 1000.0 + << "secs"; + mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); }); + } + + if (healthStatusToReport != -1) { + onHealthStatus(healthListener, healthStatusToReport); + } + + fsmStep(); } -void IncrementalService::DataLoaderStub::healthStatusReadsPending() { - LOG(DEBUG) << "healthStatusReadsPending: " << mId; - requestStart(); +const incfs::UniqueControl& IncrementalService::DataLoaderStub::initializeHealthControl() { + if (mHealthPath.empty()) { + resetHealthControl(); + return mHealthControl; + } + if (mHealthControl.pendingReads() < 0) { + mHealthControl = mService.mIncFs->openMount(mHealthPath); + } + if (mHealthControl.pendingReads() < 0) { + LOG(ERROR) << "Failed to open health control for: " << id() << ", path: " << mHealthPath + << "(" << mHealthControl.cmd() << ":" << mHealthControl.pendingReads() << ":" + << mHealthControl.logs() << ")"; + } + return mHealthControl; +} - std::unique_lock lock(mMutex); - unregisterFromPendingReads(); +void IncrementalService::DataLoaderStub::resetHealthControl() { + mHealthControl = {}; } -void IncrementalService::DataLoaderStub::healthStatusBlocked() {} +BootClockTsUs IncrementalService::DataLoaderStub::getOldestPendingReadTs() { + auto result = kMaxBootClockTsUs; + + const auto& control = initializeHealthControl(); + if (control.pendingReads() < 0) { + return result; + } + + std::vector<incfs::ReadInfo> pendingReads; + if (mService.mIncFs->waitForPendingReads(control, 0ms, &pendingReads) != + android::incfs::WaitResult::HaveData || + pendingReads.empty()) { + return result; + } + + LOG(DEBUG) << id() << ": pendingReads: " << control.pendingReads() << ", " + << pendingReads.size() << ": " << pendingReads.front().bootClockTsUs; -void IncrementalService::DataLoaderStub::healthStatusUnhealthy() {} + for (auto&& pendingRead : pendingReads) { + result = std::min(result, pendingRead.bootClockTsUs); + } + return result; +} void IncrementalService::DataLoaderStub::registerForPendingReads() { - auto pendingReadsFd = mHealthControl.pendingReads(); + const auto pendingReadsFd = mHealthControl.pendingReads(); if (pendingReadsFd < 0) { - mHealthControl = mService.mIncFs->openMount(mHealthPath); - pendingReadsFd = mHealthControl.pendingReads(); - if (pendingReadsFd < 0) { - LOG(ERROR) << "Failed to open health control for: " << mId << ", path: " << mHealthPath - << "(" << mHealthControl.cmd() << ":" << mHealthControl.pendingReads() << ":" - << mHealthControl.logs() << ")"; - return; - } + return; } + LOG(DEBUG) << id() << ": addFd(pendingReadsFd): " << pendingReadsFd; + mService.mLooper->addFd( pendingReadsFd, android::Looper::POLL_CALLBACK, android::Looper::EVENT_INPUT, [](int, int, void* data) -> int { auto&& self = (DataLoaderStub*)data; - return self->onPendingReads(); + self->updateHealthStatus(/*baseline=*/true); + return 0; }, this); mService.mLooper->wake(); @@ -1967,19 +2166,10 @@ void IncrementalService::DataLoaderStub::unregisterFromPendingReads() { return; } + LOG(DEBUG) << id() << ": removeFd(pendingReadsFd): " << pendingReadsFd; + mService.mLooper->removeFd(pendingReadsFd); mService.mLooper->wake(); - - mHealthControl = {}; -} - -int IncrementalService::DataLoaderStub::onPendingReads() { - if (!mService.mRunning.load(std::memory_order_relaxed)) { - return 0; - } - - healthStatusReadsPending(); - return 0; } void IncrementalService::DataLoaderStub::onDump(int fd) { diff --git a/services/incremental/IncrementalService.h b/services/incremental/IncrementalService.h index 05f62b977a85..57e4669d53de 100644 --- a/services/incremental/IncrementalService.h +++ b/services/incremental/IncrementalService.h @@ -35,6 +35,7 @@ #include <limits> #include <map> #include <mutex> +#include <set> #include <span> #include <string> #include <string_view> @@ -186,17 +187,12 @@ private: void onDump(int fd); - MountId id() const { return mId; } + MountId id() const { return mId.load(std::memory_order_relaxed); } const content::pm::DataLoaderParamsParcel& params() const { return mParams; } private: binder::Status onStatusChanged(MountId mount, int newStatus) final; - void registerForPendingReads(); - void unregisterFromPendingReads(); - int onPendingReads(); - - bool isValid() const { return mId != kInvalidStorageId; } sp<content::pm::IDataLoader> getDataLoader(); bool bind(); @@ -208,21 +204,27 @@ private: void setTargetStatusLocked(int status); bool fsmStep(); + bool fsmStep(int currentStatus, int targetStatus); + + void onHealthStatus(StorageHealthListener healthListener, int healthStatus); + void updateHealthStatus(bool baseline = false); + + bool isValid() const { return id() != kInvalidStorageId; } + + bool isHealthParamsValid() const; + + const incfs::UniqueControl& initializeHealthControl(); + void resetHealthControl(); + + BootClockTsUs getOldestPendingReadTs(); - // Watching for pending reads. - void healthStatusOk(); - // Pending reads detected, waiting for Xsecs to confirm blocked state. - void healthStatusReadsPending(); - // There are reads pending for X+secs, waiting for additional Ysecs to confirm unhealthy - // state. - void healthStatusBlocked(); - // There are reads pending for X+Ysecs, marking storage as unhealthy. - void healthStatusUnhealthy(); + void registerForPendingReads(); + void unregisterFromPendingReads(); IncrementalService& mService; std::mutex mMutex; - MountId mId = kInvalidStorageId; + std::atomic<MountId> mId = kInvalidStorageId; content::pm::DataLoaderParamsParcel mParams; content::pm::FileSystemControlParcel mControl; DataLoaderStatusListener mStatusListener; @@ -235,6 +237,11 @@ private: std::string mHealthPath; incfs::UniqueControl mHealthControl; + struct { + TimePoint userTs; + BootClockTsUs kernelTsUs; + } mHealthBase = {TimePoint::max(), kMaxBootClockTsUs}; + StorageHealthCheckParams mHealthCheckParams; }; using DataLoaderStubPtr = sp<DataLoaderStub>; @@ -331,6 +338,8 @@ private: bool unregisterAppOpsCallback(const std::string& packageName); void onAppOpChanged(const std::string& packageName); + using Job = std::function<void()>; + void runJobProcessing(); void extractZipFile(const IfsMountPtr& ifs, ZipArchiveHandle zipFile, ZipEntry& entry, const incfs::FileId& libFileId, std::string_view targetLibPath, @@ -338,6 +347,10 @@ private: void runCmdLooper(); + void addTimedJob(MountId id, TimePoint when, Job what); + void removeTimedJobs(MountId id); + void runTimers(); + private: const std::unique_ptr<VoldServiceWrapper> mVold; const std::unique_ptr<DataLoaderManagerWrapper> mDataLoaderManager; @@ -360,7 +373,6 @@ private: std::atomic_bool mRunning{true}; - using Job = std::function<void()>; std::unordered_map<MountId, std::vector<Job>> mJobQueue; MountId mPendingJobsMount = kInvalidStorageId; std::condition_variable mJobCondition; @@ -368,6 +380,19 @@ private: std::thread mJobProcessor; std::thread mCmdLooperThread; + + struct TimedJob { + MountId id; + TimePoint when; + Job what; + friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) { + return lhs.when < rhs.when; + } + }; + std::set<TimedJob> mTimedJobs; + std::condition_variable mTimerCondition; + std::mutex mTimerMutex; + std::thread mTimerThread; }; } // namespace android::incremental diff --git a/services/incremental/test/IncrementalServiceTest.cpp b/services/incremental/test/IncrementalServiceTest.cpp index 2948b6a0f293..84ec7d3b2c24 100644 --- a/services/incremental/test/IncrementalServiceTest.cpp +++ b/services/incremental/test/IncrementalServiceTest.cpp @@ -371,7 +371,7 @@ class MockJniWrapper : public JniWrapper { public: MOCK_CONST_METHOD0(initializeForCurrentThread, void()); - MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(2); } + MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(3); } }; class MockLooperWrapper : public LooperWrapper { |