diff options
-rw-r--r-- | logd/SerializedFlushToState.cpp | 62 | ||||
-rw-r--r-- | logd/SerializedFlushToState.h | 40 | ||||
-rw-r--r-- | logd/SerializedFlushToStateTest.cpp | 19 | ||||
-rw-r--r-- | logd/SerializedLogBuffer.cpp | 2 | ||||
-rw-r--r-- | logd/SerializedLogChunk.h | 3 |
5 files changed, 69 insertions, 57 deletions
diff --git a/logd/SerializedFlushToState.cpp b/logd/SerializedFlushToState.cpp index b02ccc349..378cf20fe 100644 --- a/logd/SerializedFlushToState.cpp +++ b/logd/SerializedFlushToState.cpp @@ -16,6 +16,8 @@ #include "SerializedFlushToState.h" +#include <limits> + #include <android-base/logging.h> SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask) @@ -63,14 +65,13 @@ void SerializedFlushToState::CreateLogPosition(log_id_t log_id) { log_positions_[log_id].emplace(log_position); } -void SerializedFlushToState::AddMinHeapEntry(log_id_t log_id) { +void SerializedFlushToState::UpdateLogsNeeded(log_id_t log_id) { auto& buffer_it = log_positions_[log_id]->buffer_it; auto read_offset = log_positions_[log_id]->read_offset; - // If there is another log to read in this buffer, add it to the min heap. + // If there is another log to read in this buffer, let it be read. if (read_offset < buffer_it->write_offset()) { - auto* entry = buffer_it->log_entry(read_offset); - min_heap_.emplace(log_id, entry); + logs_needed_from_next_position_[log_id] = false; } else if (read_offset == buffer_it->write_offset()) { // If there are no more logs to read in this buffer and it's the last buffer, then // set logs_needed_from_next_position_ to wait until more logs get logged. @@ -85,8 +86,7 @@ void SerializedFlushToState::AddMinHeapEntry(log_id_t log_id) { if (buffer_it->write_offset() == 0) { logs_needed_from_next_position_[log_id] = true; } else { - auto* entry = buffer_it->log_entry(0); - min_heap_.emplace(log_id, entry); + logs_needed_from_next_position_[log_id] = false; } } } else { @@ -106,24 +106,41 @@ void SerializedFlushToState::CheckForNewLogs() { } CreateLogPosition(i); } - logs_needed_from_next_position_[i] = false; - // If it wasn't possible to insert, logs_needed_from_next_position will be set back to true. - AddMinHeapEntry(i); + UpdateLogsNeeded(i); } } -MinHeapElement SerializedFlushToState::PopNextUnreadLog() { - auto top = min_heap_.top(); - min_heap_.pop(); +bool SerializedFlushToState::HasUnreadLogs() { + CheckForNewLogs(); + log_id_for_each(i) { + if (log_positions_[i] && !logs_needed_from_next_position_[i]) { + return true; + } + } + return false; +} - auto* entry = top.entry; - auto log_id = top.log_id; +LogWithId SerializedFlushToState::PopNextUnreadLog() { + uint64_t min_sequence = std::numeric_limits<uint64_t>::max(); + log_id_t log_id; + const SerializedLogEntry* entry = nullptr; + log_id_for_each(i) { + if (!log_positions_[i] || logs_needed_from_next_position_[i]) { + continue; + } + if (log_positions_[i]->log_entry()->sequence() < min_sequence) { + log_id = i; + entry = log_positions_[i]->log_entry(); + min_sequence = entry->sequence(); + } + } + CHECK_NE(nullptr, entry); log_positions_[log_id]->read_offset += entry->total_len(); logs_needed_from_next_position_[log_id] = true; - return top; + return {log_id, entry}; } void SerializedFlushToState::Prune(log_id_t log_id, @@ -133,25 +150,12 @@ void SerializedFlushToState::Prune(log_id_t log_id, return; } - // // Decrease the ref count since we're deleting our reference. + // Decrease the ref count since we're deleting our reference. buffer_it->DecReaderRefCount(); // Delete in the reference. log_positions_[log_id].reset(); - // Remove the MinHeapElement referencing log_id, if it exists, but retain the others. - std::vector<MinHeapElement> old_elements; - while (!min_heap_.empty()) { - auto& element = min_heap_.top(); - if (element.log_id != log_id) { - old_elements.emplace_back(element); - } - min_heap_.pop(); - } - for (auto&& element : old_elements) { - min_heap_.emplace(element); - } - // Finally set logs_needed_from_next_position_, so CheckForNewLogs() will re-create the // log_position_ object during the next read. logs_needed_from_next_position_[log_id] = true; diff --git a/logd/SerializedFlushToState.h b/logd/SerializedFlushToState.h index 0b20822dd..c953a16df 100644 --- a/logd/SerializedFlushToState.h +++ b/logd/SerializedFlushToState.h @@ -27,26 +27,19 @@ struct LogPosition { std::list<SerializedLogChunk>::iterator buffer_it; int read_offset; + + const SerializedLogEntry* log_entry() const { return buffer_it->log_entry(read_offset); } }; -struct MinHeapElement { - MinHeapElement(log_id_t log_id, const SerializedLogEntry* entry) - : log_id(log_id), entry(entry) {} +struct LogWithId { log_id_t log_id; const SerializedLogEntry* entry; - // The change of comparison operators is intentional, std::priority_queue uses operator<() to - // compare but creates a max heap. Since we want a min heap, we return the opposite result. - bool operator<(const MinHeapElement& rhs) const { - return entry->sequence() > rhs.entry->sequence(); - } }; // This class tracks the specific point where a FlushTo client has read through the logs. It // directly references the std::list<> iterators from the parent SerializedLogBuffer and the offset // into each log chunk where it has last read. All interactions with this class, except for its -// construction, must be done with SerializedLogBuffer::lock_ held. No log chunks that it -// references may be pruned, which is handled by ensuring prune does not touch any log chunk with -// highest sequence number greater or equal to start(). +// construction, must be done with SerializedLogBuffer::lock_ held. class SerializedFlushToState : public FlushToState { public: // Initializes this state object. For each log buffer set in log_mask, this sets @@ -61,31 +54,29 @@ class SerializedFlushToState : public FlushToState { if (logs_ == nullptr) logs_ = logs; } - bool HasUnreadLogs() { - CheckForNewLogs(); - return !min_heap_.empty(); - } + // Updates the state of log_positions_ and logs_needed_from_next_position_ then returns true if + // there are any unread logs, false otherwise. + bool HasUnreadLogs(); - // Pops the next unread log from the min heap and sets logs_needed_from_next_position_ to - // indicate that we're waiting for more logs from the associated log buffer. - MinHeapElement PopNextUnreadLog(); + // Returns the next unread log and sets logs_needed_from_next_position_ to indicate that we're + // waiting for more logs from the associated log buffer. + LogWithId PopNextUnreadLog(); // If the parent log buffer prunes logs, the reference that this class contains may become // invalid, so this must be called first to drop the reference to buffer_it, if any. void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it); private: - // If there is a log in the serialized log buffer for `log_id` at the read_offset, add it to the - // min heap for reading, otherwise set logs_needed_from_next_position_ to indicate that we're - // waiting for the next log. - void AddMinHeapEntry(log_id_t log_id); + // Set logs_needed_from_next_position_[i] to indicate if log_positions_[i] points to an unread + // log or to the point at which the next log will appear. + void UpdateLogsNeeded(log_id_t log_id); // Create a LogPosition object for the given log_id by searching through the log chunks for the // first chunk and then first log entry within that chunk that is greater or equal to start(). void CreateLogPosition(log_id_t log_id); // Checks to see if any log buffers set in logs_needed_from_next_position_ have new logs and - // calls AddMinHeapEntry() if so. + // calls UpdateLogsNeeded() if so. void CheckForNewLogs(); std::list<SerializedLogChunk>* logs_ = nullptr; @@ -97,7 +88,4 @@ class SerializedFlushToState : public FlushToState { // next_log_position == logs_write_position_)`. These will be re-checked in each // loop in case new logs came in. std::bitset<LOG_ID_MAX> logs_needed_from_next_position_ = {}; - // A min heap that has up to one entry per log buffer, sorted by sequence number, of the next - // element that this reader should read. - std::priority_queue<MinHeapElement> min_heap_; }; diff --git a/logd/SerializedFlushToStateTest.cpp b/logd/SerializedFlushToStateTest.cpp index f4515c8e3..88f4052a7 100644 --- a/logd/SerializedFlushToStateTest.cpp +++ b/logd/SerializedFlushToStateTest.cpp @@ -287,4 +287,21 @@ TEST_F(SerializedFlushToStateTest, no_dangling_references) { EXPECT_EQ(second_chunk->reader_ref_count(), 1U); EXPECT_FALSE(state.HasUnreadLogs()); -}
\ No newline at end of file +} + +TEST(SerializedFlushToState, Prune) { + auto chunk = SerializedLogChunk{kChunkSize}; + chunk.Log(1, log_time(), 0, 1, 1, "abc", 3); + chunk.Log(2, log_time(), 0, 1, 1, "abc", 3); + chunk.Log(3, log_time(), 0, 1, 1, "abc", 3); + chunk.FinishWriting(); + + std::list<SerializedLogChunk> log_chunks[LOG_ID_MAX]; + log_chunks[LOG_ID_MAIN].emplace_back(std::move(chunk)); + + auto state = SerializedFlushToState{1, kLogMaskAll}; + state.InitializeLogs(log_chunks); + ASSERT_TRUE(state.HasUnreadLogs()); + + state.Prune(LOG_ID_MAIN, log_chunks[LOG_ID_MAIN].begin()); +} diff --git a/logd/SerializedLogBuffer.cpp b/logd/SerializedLogBuffer.cpp index 5012d3d2b..acd093b25 100644 --- a/logd/SerializedLogBuffer.cpp +++ b/logd/SerializedLogBuffer.cpp @@ -211,7 +211,7 @@ bool SerializedLogBuffer::FlushTo( state.InitializeLogs(logs_); while (state.HasUnreadLogs()) { - MinHeapElement top = state.PopNextUnreadLog(); + LogWithId top = state.PopNextUnreadLog(); auto* entry = top.entry; auto log_id = top.log_id; diff --git a/logd/SerializedLogChunk.h b/logd/SerializedLogChunk.h index 0991eacb6..645433d2b 100644 --- a/logd/SerializedLogChunk.h +++ b/logd/SerializedLogChunk.h @@ -18,6 +18,8 @@ #include <sys/types.h> +#include <android-base/logging.h> + #include "LogWriter.h" #include "SerializedData.h" #include "SerializedLogEntry.h" @@ -55,6 +57,7 @@ class SerializedLogChunk { } const SerializedLogEntry* log_entry(int offset) const { + CHECK(writer_active_ || reader_ref_count_ > 0); return reinterpret_cast<const SerializedLogEntry*>(data() + offset); } const uint8_t* data() const { return contents_.data(); } |