diff options
author | Alex Buynytskyy <alexbuy@google.com> | 2020-06-01 16:04:42 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2020-06-01 16:04:42 +0000 |
commit | 4115191cc48c433d294307a946e02655753b795f (patch) | |
tree | 03431b14fa4d668027c7d4115d0a6f39aff3364f | |
parent | 9710f8eaab61a368cebaaeb0b15cd0892394b04a (diff) | |
parent | c4ac9d2321dfced8ffd398da827ad7e56e242991 (diff) |
Merge "Healthcheck: proper job allocation and test." into rvc-dev am: c4ac9d2321
Change-Id: I9257064a3c0a064f2217adbe8ab5c00f26027896
-rw-r--r-- | services/incremental/IncrementalService.cpp | 111 | ||||
-rw-r--r-- | services/incremental/IncrementalService.h | 21 | ||||
-rw-r--r-- | services/incremental/ServiceWrappers.cpp | 88 | ||||
-rw-r--r-- | services/incremental/ServiceWrappers.h | 15 | ||||
-rw-r--r-- | services/incremental/test/IncrementalServiceTest.cpp | 227 |
5 files changed, 352 insertions, 110 deletions
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp index 2fcb005e6df8..885f4d2d34d7 100644 --- a/services/incremental/IncrementalService.cpp +++ b/services/incremental/IncrementalService.cpp @@ -268,22 +268,14 @@ IncrementalService::IncrementalService(ServiceManagerWrapper&& sm, std::string_v mAppOpsManager(sm.getAppOpsManager()), mJni(sm.getJni()), mLooper(sm.getLooper()), + mTimedQueue(sm.getTimedQueue()), mIncrementalDir(rootDir) { - if (!mVold) { - LOG(FATAL) << "Vold service is unavailable"; - } - if (!mDataLoaderManager) { - LOG(FATAL) << "DataLoaderManagerService is unavailable"; - } - if (!mAppOpsManager) { - LOG(FATAL) << "AppOpsManager is unavailable"; - } - if (!mJni) { - LOG(FATAL) << "JNI is unavailable"; - } - if (!mLooper) { - LOG(FATAL) << "Looper is unavailable"; - } + CHECK(mVold) << "Vold service is unavailable"; + CHECK(mDataLoaderManager) << "DataLoaderManagerService is unavailable"; + CHECK(mAppOpsManager) << "AppOpsManager is unavailable"; + CHECK(mJni) << "JNI is unavailable"; + CHECK(mLooper) << "Looper is unavailable"; + CHECK(mTimedQueue) << "TimedQueue is unavailable"; mJobQueue.reserve(16); mJobProcessor = std::thread([this]() { @@ -294,10 +286,6 @@ IncrementalService::IncrementalService(ServiceManagerWrapper&& sm, std::string_v mJni->initializeForCurrentThread(); runCmdLooper(); }); - mTimerThread = std::thread([this]() { - mJni->initializeForCurrentThread(); - runTimers(); - }); const auto mountedRootNames = adoptMountedInstances(); mountExistingImages(mountedRootNames); @@ -310,10 +298,8 @@ IncrementalService::~IncrementalService() { } mJobCondition.notify_all(); mJobProcessor.join(); - mTimerCondition.notify_all(); - mTimerThread.join(); mCmdLooperThread.join(); - mTimedJobs.clear(); + mTimedQueue->stop(); // Ensure that mounts are destroyed while the service is still valid. mBindsByPath.clear(); mMounts.clear(); @@ -1710,53 +1696,18 @@ void IncrementalService::onAppOpChanged(const std::string& packageName) { } } -void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) { +void IncrementalService::addTimedJob(MountId id, Milliseconds after, Job what) { if (id == kInvalidStorageId) { return; } - { - std::unique_lock lock(mTimerMutex); - mTimedJobs.insert(TimedJob{id, when, std::move(what)}); - } - mTimerCondition.notify_all(); + mTimedQueue->addJob(id, after, std::move(what)); } void IncrementalService::removeTimedJobs(MountId id) { if (id == kInvalidStorageId) { return; } - { - std::unique_lock lock(mTimerMutex); - std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; }); - } -} - -void IncrementalService::runTimers() { - static constexpr TimePoint kInfinityTs{Clock::duration::max()}; - TimePoint nextTaskTs = kInfinityTs; - for (;;) { - std::unique_lock lock(mTimerMutex); - mTimerCondition.wait_until(lock, nextTaskTs, [this]() { - auto now = Clock::now(); - return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now); - }); - if (!mRunning) { - return; - } - - auto now = Clock::now(); - auto it = mTimedJobs.begin(); - // Always acquire begin(). We can't use it after unlock as mTimedJobs can change. - for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) { - auto job = it->what; - mTimedJobs.erase(it); - - lock.unlock(); - job(); - lock.lock(); - } - nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs; - } + mTimedQueue->removeJobs(id); } IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id, @@ -2029,8 +1980,8 @@ void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) { mHealthBase = {now, kernelTsUs}; } - if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now || - mHealthBase.kernelTsUs > kernelTsUs) { + if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.kernelTsUs == kMaxBootClockTsUs || + mHealthBase.userTs > now) { LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait."; registerForPendingReads(); healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK; @@ -2056,6 +2007,9 @@ void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) { return; } + // Don't schedule timer job less than 500ms in advance. + static constexpr auto kTolerance = 500ms; + const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs); const auto unhealthyTimeout = std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs); @@ -2065,31 +2019,28 @@ void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) { const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs; const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs); - const auto delta = now - userTs; + const auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(now - userTs); - TimePoint whenToCheckBack; - if (delta < blockedTimeout) { + Milliseconds checkBackAfter; + if (delta + kTolerance < blockedTimeout) { LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status."; - whenToCheckBack = userTs + blockedTimeout; + checkBackAfter = blockedTimeout - delta; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING; - } else if (delta < unhealthyTimeout) { + } else if (delta + kTolerance < unhealthyTimeout) { LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy."; - whenToCheckBack = userTs + unhealthyTimeout; + checkBackAfter = unhealthyTimeout - delta; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED; } else { LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring."; - whenToCheckBack = now + unhealthyMonitoring; + checkBackAfter = unhealthyMonitoring; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY; } - LOG(DEBUG) << id() << ": updateHealthStatus in " - << double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack - - now) - .count()) / - 1000.0 + LOG(DEBUG) << id() << ": updateHealthStatus in " << double(checkBackAfter.count()) / 1000.0 << "secs"; - mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); }); + mService.addTimedJob(id(), checkBackAfter, [this]() { updateHealthStatus(); }); } + // With kTolerance we are expecting these to execute before the next update. if (healthStatusToReport != -1) { onHealthStatus(healthListener, healthStatusToReport); } @@ -2178,6 +2129,16 @@ void IncrementalService::DataLoaderStub::onDump(int fd) { dprintf(fd, " targetStatus: %d\n", mTargetStatus); dprintf(fd, " targetStatusTs: %lldmcs\n", (long long)(elapsedMcs(mTargetStatusTs, Clock::now()))); + dprintf(fd, " health: {\n"); + dprintf(fd, " path: %s\n", mHealthPath.c_str()); + dprintf(fd, " base: %lldmcs (%lld)\n", + (long long)(elapsedMcs(mHealthBase.userTs, Clock::now())), + (long long)mHealthBase.kernelTsUs); + dprintf(fd, " blockedTimeoutMs: %d\n", int(mHealthCheckParams.blockedTimeoutMs)); + dprintf(fd, " unhealthyTimeoutMs: %d\n", int(mHealthCheckParams.unhealthyTimeoutMs)); + dprintf(fd, " unhealthyMonitoringMs: %d\n", + int(mHealthCheckParams.unhealthyMonitoringMs)); + dprintf(fd, " }\n"); const auto& params = mParams; dprintf(fd, " dataLoaderParams: {\n"); dprintf(fd, " type: %s\n", toString(params.type).c_str()); diff --git a/services/incremental/IncrementalService.h b/services/incremental/IncrementalService.h index 57e4669d53de..918531b7921c 100644 --- a/services/incremental/IncrementalService.h +++ b/services/incremental/IncrementalService.h @@ -56,8 +56,6 @@ using StorageId = int; using FileId = incfs::FileId; using BlockIndex = incfs::BlockIndex; using RawMetadata = incfs::RawMetadata; -using Clock = std::chrono::steady_clock; -using TimePoint = std::chrono::time_point<Clock>; using Seconds = std::chrono::seconds; using BootClockTsUs = uint64_t; @@ -338,8 +336,6 @@ private: bool unregisterAppOpsCallback(const std::string& packageName); void onAppOpChanged(const std::string& packageName); - using Job = std::function<void()>; - void runJobProcessing(); void extractZipFile(const IfsMountPtr& ifs, ZipArchiveHandle zipFile, ZipEntry& entry, const incfs::FileId& libFileId, std::string_view targetLibPath, @@ -347,9 +343,8 @@ private: void runCmdLooper(); - void addTimedJob(MountId id, TimePoint when, Job what); + void addTimedJob(MountId id, Milliseconds after, Job what); void removeTimedJobs(MountId id); - void runTimers(); private: const std::unique_ptr<VoldServiceWrapper> mVold; @@ -358,6 +353,7 @@ private: const std::unique_ptr<AppOpsManagerWrapper> mAppOpsManager; const std::unique_ptr<JniWrapper> mJni; const std::unique_ptr<LooperWrapper> mLooper; + const std::unique_ptr<TimedQueueWrapper> mTimedQueue; const std::string mIncrementalDir; mutable std::mutex mLock; @@ -380,19 +376,6 @@ private: std::thread mJobProcessor; std::thread mCmdLooperThread; - - struct TimedJob { - MountId id; - TimePoint when; - Job what; - friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) { - return lhs.when < rhs.when; - } - }; - std::set<TimedJob> mTimedJobs; - std::condition_variable mTimerCondition; - std::mutex mTimerMutex; - std::thread mTimerThread; }; } // namespace android::incremental diff --git a/services/incremental/ServiceWrappers.cpp b/services/incremental/ServiceWrappers.cpp index a76aa625ebc6..99a35adb5074 100644 --- a/services/incremental/ServiceWrappers.cpp +++ b/services/incremental/ServiceWrappers.cpp @@ -25,6 +25,8 @@ #include <binder/AppOpsManager.h> #include <utils/String16.h> +#include <thread> + #include "IncrementalServiceValidation.h" using namespace std::literals; @@ -181,6 +183,88 @@ public: } }; +static JNIEnv* getOrAttachJniEnv(JavaVM* jvm); + +class RealTimedQueueWrapper : public TimedQueueWrapper { +public: + RealTimedQueueWrapper(JavaVM* jvm) { + mThread = std::thread([this, jvm]() { + (void)getOrAttachJniEnv(jvm); + runTimers(); + }); + } + ~RealTimedQueueWrapper() final { + CHECK(!mRunning) << "call stop first"; + CHECK(!mThread.joinable()) << "call stop first"; + } + + void addJob(MountId id, Milliseconds after, Job what) final { + const auto now = Clock::now(); + { + std::unique_lock lock(mMutex); + mJobs.insert(TimedJob{id, now + after, std::move(what)}); + } + mCondition.notify_all(); + } + void removeJobs(MountId id) final { + std::unique_lock lock(mMutex); + std::erase_if(mJobs, [id](auto&& item) { return item.id == id; }); + } + void stop() final { + { + std::unique_lock lock(mMutex); + mRunning = false; + } + mCondition.notify_all(); + mThread.join(); + mJobs.clear(); + } + +private: + void runTimers() { + static constexpr TimePoint kInfinityTs{Clock::duration::max()}; + TimePoint nextJobTs = kInfinityTs; + std::unique_lock lock(mMutex); + for (;;) { + mCondition.wait_until(lock, nextJobTs, [this, nextJobTs]() { + const auto now = Clock::now(); + const auto firstJobTs = !mJobs.empty() ? mJobs.begin()->when : kInfinityTs; + return !mRunning || firstJobTs <= now || firstJobTs < nextJobTs; + }); + if (!mRunning) { + return; + } + + const auto now = Clock::now(); + auto it = mJobs.begin(); + // Always acquire begin(). We can't use it after unlock as mTimedJobs can change. + for (; it != mJobs.end() && it->when <= now; it = mJobs.begin()) { + auto job = std::move(it->what); + mJobs.erase(it); + + lock.unlock(); + job(); + lock.lock(); + } + nextJobTs = it != mJobs.end() ? it->when : kInfinityTs; + } + } + + struct TimedJob { + MountId id; + TimePoint when; + Job what; + friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) { + return lhs.when < rhs.when; + } + }; + bool mRunning = true; + std::set<TimedJob> mJobs; + std::condition_variable mCondition; + std::mutex mMutex; + std::thread mThread; +}; + RealServiceManager::RealServiceManager(sp<IServiceManager> serviceManager, JNIEnv* env) : mServiceManager(std::move(serviceManager)), mJvm(RealJniWrapper::getJvm(env)) {} @@ -228,6 +312,10 @@ std::unique_ptr<LooperWrapper> RealServiceManager::getLooper() { return std::make_unique<RealLooperWrapper>(); } +std::unique_ptr<TimedQueueWrapper> RealServiceManager::getTimedQueue() { + return std::make_unique<RealTimedQueueWrapper>(mJvm); +} + static JavaVM* getJavaVm(JNIEnv* env) { CHECK(env); JavaVM* jvm = nullptr; diff --git a/services/incremental/ServiceWrappers.h b/services/incremental/ServiceWrappers.h index a935ab99267e..8cd726fdc0f1 100644 --- a/services/incremental/ServiceWrappers.h +++ b/services/incremental/ServiceWrappers.h @@ -35,6 +35,11 @@ namespace android::incremental { +using Clock = std::chrono::steady_clock; +using TimePoint = std::chrono::time_point<Clock>; +using Milliseconds = std::chrono::milliseconds; +using Job = std::function<void()>; + // --- Wrapper interfaces --- using MountId = int32_t; @@ -121,6 +126,14 @@ public: virtual int pollAll(int timeoutMillis) = 0; }; +class TimedQueueWrapper { +public: + virtual ~TimedQueueWrapper() = default; + virtual void addJob(MountId id, Milliseconds after, Job what) = 0; + virtual void removeJobs(MountId id) = 0; + virtual void stop() = 0; +}; + class ServiceManagerWrapper { public: virtual ~ServiceManagerWrapper() = default; @@ -130,6 +143,7 @@ public: virtual std::unique_ptr<AppOpsManagerWrapper> getAppOpsManager() = 0; virtual std::unique_ptr<JniWrapper> getJni() = 0; virtual std::unique_ptr<LooperWrapper> getLooper() = 0; + virtual std::unique_ptr<TimedQueueWrapper> getTimedQueue() = 0; }; // --- Real stuff --- @@ -144,6 +158,7 @@ public: std::unique_ptr<AppOpsManagerWrapper> getAppOpsManager() final; std::unique_ptr<JniWrapper> getJni() final; std::unique_ptr<LooperWrapper> getLooper() final; + std::unique_ptr<TimedQueueWrapper> getTimedQueue() final; private: template <class INTERFACE> diff --git a/services/incremental/test/IncrementalServiceTest.cpp b/services/incremental/test/IncrementalServiceTest.cpp index 84ec7d3b2c24..26b5094a795a 100644 --- a/services/incremental/test/IncrementalServiceTest.cpp +++ b/services/incremental/test/IncrementalServiceTest.cpp @@ -22,6 +22,7 @@ #include <gtest/gtest.h> #include <utils/Log.h> +#include <chrono> #include <future> #include "IncrementalService.h" @@ -295,9 +296,21 @@ public: void openMountSuccess() { ON_CALL(*this, openMount(_)).WillByDefault(Invoke(this, &MockIncFs::openMountForHealth)); } - void waitForPendingReadsSuccess() { + + // 1000ms + void waitForPendingReadsSuccess(uint64_t ts = 0) { + ON_CALL(*this, waitForPendingReads(_, _, _)) + .WillByDefault( + Invoke([ts](const Control& control, std::chrono::milliseconds timeout, + std::vector<incfs::ReadInfo>* pendingReadsBuffer) { + pendingReadsBuffer->push_back({.bootClockTsUs = ts}); + return android::incfs::WaitResult::HaveData; + })); + } + + void waitForPendingReadsTimeout() { ON_CALL(*this, waitForPendingReads(_, _, _)) - .WillByDefault(Invoke(this, &MockIncFs::waitForPendingReadsForHealth)); + .WillByDefault(Return(android::incfs::WaitResult::Timeout)); } static constexpr auto kPendingReadsFd = 42; @@ -305,13 +318,6 @@ public: return UniqueControl(IncFs_CreateControl(-1, kPendingReadsFd, -1)); } - WaitResult waitForPendingReadsForHealth( - const Control& control, std::chrono::milliseconds timeout, - std::vector<incfs::ReadInfo>* pendingReadsBuffer) const { - pendingReadsBuffer->push_back({.bootClockTsUs = 0}); - return android::incfs::WaitResult::HaveData; - } - RawMetadata getMountInfoMetadata(const Control& control, std::string_view path) { metadata::Mount m; m.mutable_storage()->set_id(100); @@ -371,7 +377,7 @@ class MockJniWrapper : public JniWrapper { public: MOCK_CONST_METHOD0(initializeForCurrentThread, void()); - MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(3); } + MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(2); } }; class MockLooperWrapper : public LooperWrapper { @@ -385,7 +391,7 @@ public: ON_CALL(*this, addFd(_, _, _, _, _)) .WillByDefault(Invoke(this, &MockLooperWrapper::storeCallback)); ON_CALL(*this, removeFd(_)).WillByDefault(Invoke(this, &MockLooperWrapper::clearCallback)); - ON_CALL(*this, pollAll(_)).WillByDefault(Invoke(this, &MockLooperWrapper::sleepFor)); + ON_CALL(*this, pollAll(_)).WillByDefault(Invoke(this, &MockLooperWrapper::wait10Ms)); } int storeCallback(int, int, int, android::Looper_callbackFunc callback, void* data) { @@ -400,8 +406,10 @@ public: return 0; } - int sleepFor(int timeoutMillis) { - std::this_thread::sleep_for(std::chrono::milliseconds(timeoutMillis)); + int wait10Ms(int) { + // This is called from a loop in runCmdLooper. + // Sleeping for 10ms only to avoid busy looping. + std::this_thread::sleep_for(10ms); return 0; } @@ -409,6 +417,55 @@ public: void* mCallbackData = nullptr; }; +class MockTimedQueueWrapper : public TimedQueueWrapper { +public: + MOCK_METHOD3(addJob, void(MountId, Milliseconds, Job)); + MOCK_METHOD1(removeJobs, void(MountId)); + MOCK_METHOD0(stop, void()); + + MockTimedQueueWrapper() { + ON_CALL(*this, addJob(_, _, _)) + .WillByDefault(Invoke(this, &MockTimedQueueWrapper::storeJob)); + ON_CALL(*this, removeJobs(_)).WillByDefault(Invoke(this, &MockTimedQueueWrapper::clearJob)); + } + + void storeJob(MountId id, Milliseconds after, Job what) { + mId = id; + mAfter = after; + mWhat = std::move(what); + } + + void clearJob(MountId id) { + if (mId == id) { + mAfter = {}; + mWhat = {}; + } + } + + MountId mId = -1; + Milliseconds mAfter; + Job mWhat; +}; + +class MockStorageHealthListener : public os::incremental::BnStorageHealthListener { +public: + MOCK_METHOD2(onHealthStatus, binder::Status(int32_t storageId, int32_t status)); + + MockStorageHealthListener() { + ON_CALL(*this, onHealthStatus(_, _)) + .WillByDefault(Invoke(this, &MockStorageHealthListener::storeStorageIdAndStatus)); + } + + binder::Status storeStorageIdAndStatus(int32_t storageId, int32_t status) { + mStorageId = storageId; + mStatus = status; + return binder::Status::ok(); + } + + int32_t mStorageId = -1; + int32_t mStatus = -1; +}; + class MockServiceManager : public ServiceManagerWrapper { public: MockServiceManager(std::unique_ptr<MockVoldService> vold, @@ -416,13 +473,15 @@ public: std::unique_ptr<MockIncFs> incfs, std::unique_ptr<MockAppOpsManager> appOpsManager, std::unique_ptr<MockJniWrapper> jni, - std::unique_ptr<MockLooperWrapper> looper) + std::unique_ptr<MockLooperWrapper> looper, + std::unique_ptr<MockTimedQueueWrapper> timedQueue) : mVold(std::move(vold)), mDataLoaderManager(std::move(dataLoaderManager)), mIncFs(std::move(incfs)), mAppOpsManager(std::move(appOpsManager)), mJni(std::move(jni)), - mLooper(std::move(looper)) {} + mLooper(std::move(looper)), + mTimedQueue(std::move(timedQueue)) {} std::unique_ptr<VoldServiceWrapper> getVoldService() final { return std::move(mVold); } std::unique_ptr<DataLoaderManagerWrapper> getDataLoaderManager() final { return std::move(mDataLoaderManager); @@ -431,6 +490,7 @@ public: std::unique_ptr<AppOpsManagerWrapper> getAppOpsManager() final { return std::move(mAppOpsManager); } std::unique_ptr<JniWrapper> getJni() final { return std::move(mJni); } std::unique_ptr<LooperWrapper> getLooper() final { return std::move(mLooper); } + std::unique_ptr<TimedQueueWrapper> getTimedQueue() final { return std::move(mTimedQueue); } private: std::unique_ptr<MockVoldService> mVold; @@ -439,6 +499,7 @@ private: std::unique_ptr<MockAppOpsManager> mAppOpsManager; std::unique_ptr<MockJniWrapper> mJni; std::unique_ptr<MockLooperWrapper> mLooper; + std::unique_ptr<MockTimedQueueWrapper> mTimedQueue; }; // --- IncrementalServiceTest --- @@ -460,6 +521,8 @@ public: mJni = jni.get(); auto looper = std::make_unique<NiceMock<MockLooperWrapper>>(); mLooper = looper.get(); + auto timedQueue = std::make_unique<NiceMock<MockTimedQueueWrapper>>(); + mTimedQueue = timedQueue.get(); mIncrementalService = std::make_unique<IncrementalService>(MockServiceManager(std::move(vold), std::move( @@ -467,7 +530,8 @@ public: std::move(incFs), std::move(appOps), std::move(jni), - std::move(looper)), + std::move(looper), + std::move(timedQueue)), mRootDir.path); mDataLoaderParcel.packageName = "com.test"; mDataLoaderParcel.arguments = "uri"; @@ -503,6 +567,7 @@ protected: NiceMock<MockAppOpsManager>* mAppOpsManager = nullptr; NiceMock<MockJniWrapper>* mJni = nullptr; NiceMock<MockLooperWrapper>* mLooper = nullptr; + NiceMock<MockTimedQueueWrapper>* mTimedQueue = nullptr; NiceMock<MockDataLoader>* mDataLoader = nullptr; std::unique_ptr<IncrementalService> mIncrementalService; TemporaryDir mRootDir; @@ -710,6 +775,136 @@ TEST_F(IncrementalServiceTest, testStartDataLoaderRecreateOnPendingReads) { mLooper->mCallback(-1, -1, mLooper->mCallbackData); } +TEST_F(IncrementalServiceTest, testStartDataLoaderUnhealthyStorage) { + mVold->mountIncFsSuccess(); + mIncFs->makeFileSuccess(); + mIncFs->openMountSuccess(); + mVold->bindMountSuccess(); + mDataLoaderManager->bindToDataLoaderSuccess(); + mDataLoaderManager->getDataLoaderSuccess(); + EXPECT_CALL(*mDataLoaderManager, bindToDataLoader(_, _, _, _)).Times(1); + EXPECT_CALL(*mDataLoaderManager, unbindFromDataLoader(_)).Times(1); + EXPECT_CALL(*mDataLoader, create(_, _, _, _)).Times(1); + EXPECT_CALL(*mDataLoader, start(_)).Times(1); + EXPECT_CALL(*mDataLoader, destroy(_)).Times(1); + EXPECT_CALL(*mVold, unmountIncFs(_)).Times(2); + EXPECT_CALL(*mLooper, addFd(MockIncFs::kPendingReadsFd, _, _, _, _)).Times(2); + EXPECT_CALL(*mLooper, removeFd(MockIncFs::kPendingReadsFd)).Times(2); + EXPECT_CALL(*mTimedQueue, addJob(_, _, _)).Times(4); + + sp<NiceMock<MockStorageHealthListener>> listener{new NiceMock<MockStorageHealthListener>}; + NiceMock<MockStorageHealthListener>* listenerMock = listener.get(); + EXPECT_CALL(*listenerMock, onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_OK)) + .Times(2); + EXPECT_CALL(*listenerMock, + onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_READS_PENDING)) + .Times(1); + EXPECT_CALL(*listenerMock, onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_BLOCKED)) + .Times(1); + EXPECT_CALL(*listenerMock, onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_UNHEALTHY)) + .Times(2); + + StorageHealthCheckParams params; + params.blockedTimeoutMs = 10000; + params.unhealthyTimeoutMs = 20000; + params.unhealthyMonitoringMs = 30000; + + using MS = std::chrono::milliseconds; + using MCS = std::chrono::microseconds; + + const auto blockedTimeout = MS(params.blockedTimeoutMs); + const auto unhealthyTimeout = MS(params.unhealthyTimeoutMs); + const auto unhealthyMonitoring = MS(params.unhealthyMonitoringMs); + + const uint64_t kFirstTimestampUs = 1000000000ll; + const uint64_t kBlockedTimestampUs = + kFirstTimestampUs - std::chrono::duration_cast<MCS>(blockedTimeout).count(); + const uint64_t kUnhealthyTimestampUs = + kFirstTimestampUs - std::chrono::duration_cast<MCS>(unhealthyTimeout).count(); + + TemporaryDir tempDir; + int storageId = mIncrementalService->createStorage(tempDir.path, std::move(mDataLoaderParcel), + IncrementalService::CreateOptions::CreateNew, + {}, std::move(params), listener); + ASSERT_GE(storageId, 0); + + // Healthy state, registered for pending reads. + ASSERT_NE(nullptr, mLooper->mCallback); + ASSERT_NE(nullptr, mLooper->mCallbackData); + ASSERT_EQ(storageId, listener->mStorageId); + ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_OK, listener->mStatus); + + // Looper/epoll callback. + mIncFs->waitForPendingReadsSuccess(kFirstTimestampUs); + mLooper->mCallback(-1, -1, mLooper->mCallbackData); + + // Unregister from pending reads and wait. + ASSERT_EQ(nullptr, mLooper->mCallback); + ASSERT_EQ(nullptr, mLooper->mCallbackData); + ASSERT_EQ(storageId, listener->mStorageId); + ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_READS_PENDING, listener->mStatus); + // Timed callback present. + ASSERT_EQ(storageId, mTimedQueue->mId); + ASSERT_GE(mTimedQueue->mAfter, blockedTimeout); + auto timedCallback = mTimedQueue->mWhat; + mTimedQueue->clearJob(storageId); + + // Timed job callback for blocked. + mIncFs->waitForPendingReadsSuccess(kBlockedTimestampUs); + timedCallback(); + + // Still not registered, and blocked. + ASSERT_EQ(nullptr, mLooper->mCallback); + ASSERT_EQ(nullptr, mLooper->mCallbackData); + ASSERT_EQ(storageId, listener->mStorageId); + ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_BLOCKED, listener->mStatus); + // Timed callback present. + ASSERT_EQ(storageId, mTimedQueue->mId); + ASSERT_GE(mTimedQueue->mAfter, 1000ms); + timedCallback = mTimedQueue->mWhat; + mTimedQueue->clearJob(storageId); + + // Timed job callback for unhealthy. + mIncFs->waitForPendingReadsSuccess(kUnhealthyTimestampUs); + timedCallback(); + + // Still not registered, and blocked. + ASSERT_EQ(nullptr, mLooper->mCallback); + ASSERT_EQ(nullptr, mLooper->mCallbackData); + ASSERT_EQ(storageId, listener->mStorageId); + ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_UNHEALTHY, listener->mStatus); + // Timed callback present. + ASSERT_EQ(storageId, mTimedQueue->mId); + ASSERT_GE(mTimedQueue->mAfter, unhealthyMonitoring); + timedCallback = mTimedQueue->mWhat; + mTimedQueue->clearJob(storageId); + + // One more unhealthy. + mIncFs->waitForPendingReadsSuccess(kUnhealthyTimestampUs); + timedCallback(); + + // Still not registered, and blocked. + ASSERT_EQ(nullptr, mLooper->mCallback); + ASSERT_EQ(nullptr, mLooper->mCallbackData); + ASSERT_EQ(storageId, listener->mStorageId); + ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_UNHEALTHY, listener->mStatus); + // Timed callback present. + ASSERT_EQ(storageId, mTimedQueue->mId); + ASSERT_GE(mTimedQueue->mAfter, unhealthyMonitoring); + timedCallback = mTimedQueue->mWhat; + mTimedQueue->clearJob(storageId); + + // And now healthy. + mIncFs->waitForPendingReadsTimeout(); + timedCallback(); + + // Healthy state, registered for pending reads. + ASSERT_NE(nullptr, mLooper->mCallback); + ASSERT_NE(nullptr, mLooper->mCallbackData); + ASSERT_EQ(storageId, listener->mStorageId); + ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_OK, listener->mStatus); +} + TEST_F(IncrementalServiceTest, testSetIncFsMountOptionsSuccess) { mVold->mountIncFsSuccess(); mIncFs->makeFileSuccess(); |