diff options
Diffstat (limited to 'adb/client/incremental_server.cpp')
-rw-r--r-- | adb/client/incremental_server.cpp | 260 |
1 files changed, 195 insertions, 65 deletions
diff --git a/adb/client/incremental_server.cpp b/adb/client/incremental_server.cpp index 4a131ce20f..bfe18c0b29 100644 --- a/adb/client/incremental_server.cpp +++ b/adb/client/incremental_server.cpp @@ -1,4 +1,4 @@ -/* +/* * Copyright (C) 2020 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -44,9 +44,10 @@ namespace incremental { -static constexpr int kBlockSize = 4096; +static constexpr int kHashesPerBlock = kBlockSize / kDigestSize; static constexpr int kCompressedSizeMax = kBlockSize * 0.95; static constexpr int8_t kTypeData = 0; +static constexpr int8_t kTypeHash = 1; static constexpr int8_t kCompressionNone = 0; static constexpr int8_t kCompressionLZ4 = 1; static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize)); @@ -132,41 +133,64 @@ struct ResponseHeader { CompressionType compression_type; // 1 byte BlockIdx block_idx; // 4 bytes BlockSize block_size; // 2 bytes + + static constexpr size_t responseSizeFor(size_t dataSize) { + return dataSize + sizeof(ResponseHeader); + } +} __attribute__((packed)); + +template <size_t Size = kBlockSize> +struct BlockBuffer { + ResponseHeader header; + char data[Size]; } __attribute__((packed)); // Holds streaming state for a file class File { public: // Plain file - File(const char* filepath, FileId id, int64_t size, unique_fd fd) : File(filepath, id, size) { + File(const char* filepath, FileId id, int64_t size, unique_fd fd, int64_t tree_offset, + unique_fd tree_fd) + : File(filepath, id, size, tree_offset) { this->fd_ = std::move(fd); + this->tree_fd_ = std::move(tree_fd); priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size); } - int64_t ReadBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed, - std::string* error) const { - char* buf_ptr = static_cast<char*>(buf); + int64_t ReadDataBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed) const { int64_t bytes_read = -1; const off64_t offsetStart = blockIndexToOffset(block_idx); - bytes_read = adb_pread(fd_, &buf_ptr[sizeof(ResponseHeader)], kBlockSize, offsetStart); + bytes_read = adb_pread(fd_, buf, kBlockSize, offsetStart); + return bytes_read; + } + int64_t ReadTreeBlock(BlockIdx block_idx, void* buf) const { + int64_t bytes_read = -1; + const off64_t offsetStart = tree_offset_ + blockIndexToOffset(block_idx); + bytes_read = adb_pread(tree_fd_, buf, kBlockSize, offsetStart); return bytes_read; } - const unique_fd& RawFd() const { return fd_; } const std::vector<BlockIdx>& PriorityBlocks() const { return priority_blocks_; } std::vector<bool> sentBlocks; NumBlocks sentBlocksCount = 0; + std::vector<bool> sentTreeBlocks; + const char* const filepath; const FileId id; const int64_t size; private: - File(const char* filepath, FileId id, int64_t size) : filepath(filepath), id(id), size(size) { + File(const char* filepath, FileId id, int64_t size, int64_t tree_offset) + : filepath(filepath), id(id), size(size), tree_offset_(tree_offset) { sentBlocks.resize(numBytesToNumBlocks(size)); + sentTreeBlocks.resize(verity_tree_blocks_for_file(size)); } unique_fd fd_; std::vector<BlockIdx> priority_blocks_; + + unique_fd tree_fd_; + const int64_t tree_offset_; }; class IncrementalServer { @@ -174,6 +198,8 @@ class IncrementalServer { IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector<File> files) : adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) { buffer_.reserve(kReadBufferSize); + pendingBlocksBuffer_.resize(kChunkFlushSize + 2 * kBlockSize); + pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader); } bool Serve(); @@ -208,7 +234,11 @@ class IncrementalServer { void erase_buffer_head(int count) { buffer_.erase(buffer_.begin(), buffer_.begin() + count); } enum class SendResult { Sent, Skipped, Error }; - SendResult SendBlock(FileId fileId, BlockIdx blockIdx, bool flush = false); + SendResult SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush = false); + + bool SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx); + bool SendTreeBlocksForDataBlock(FileId fileId, BlockIdx blockIdx); + bool SendDone(); void RunPrefetching(); @@ -228,7 +258,10 @@ class IncrementalServer { int compressed_ = 0, uncompressed_ = 0; long long sentSize_ = 0; - std::vector<char> pendingBlocks_; + static constexpr auto kChunkFlushSize = 31 * kBlockSize; + + std::vector<char> pendingBlocksBuffer_; + char* pendingBlocks_ = nullptr; // True when client notifies that all the data has been received bool servingComplete_ = false; @@ -250,7 +283,7 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) if (bcur > 0) { // output the rest. - WriteFdExactly(output_fd_, buffer_.data(), bcur); + (void)WriteFdExactly(output_fd_, buffer_.data(), bcur); erase_buffer_head(bcur); } @@ -265,9 +298,10 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0); if (res != 1) { - WriteFdExactly(output_fd_, buffer_.data(), buffer_.size()); + auto err = errno; + (void)WriteFdExactly(output_fd_, buffer_.data(), buffer_.size()); if (res < 0) { - D("Failed to poll: %s\n", strerror(errno)); + D("Failed to poll: %s", strerror(err)); return false; } if (blocking) { @@ -289,7 +323,7 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) continue; } - D("Failed to read from fd %d: %d. Exit\n", adb_fd_.get(), errno); + D("Failed to read from fd %d: %d. Exit", adb_fd_.get(), errno); break; } // socket is closed. print remaining messages @@ -313,56 +347,113 @@ std::optional<RequestCommand> IncrementalServer::ReadRequest(bool blocking) { return request; } -auto IncrementalServer::SendBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult { +bool IncrementalServer::SendTreeBlocksForDataBlock(const FileId fileId, const BlockIdx blockIdx) { auto& file = files_[fileId]; - if (blockIdx >= static_cast<long>(file.sentBlocks.size())) { - fprintf(stderr, "Failed to read file %s at block %" PRId32 " (past end).\n", file.filepath, + const int32_t data_block_count = numBytesToNumBlocks(file.size); + + const int32_t total_nodes_count(file.sentTreeBlocks.size()); + const int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock; + + const int32_t leaf_nodes_offset = total_nodes_count - leaf_nodes_count; + + // Leaf level, sending only 1 block. + const int32_t leaf_idx = leaf_nodes_offset + blockIdx / kHashesPerBlock; + if (file.sentTreeBlocks[leaf_idx]) { + return true; + } + if (!SendTreeBlock(fileId, blockIdx, leaf_idx)) { + return false; + } + file.sentTreeBlocks[leaf_idx] = true; + + // Non-leaf, sending EVERYTHING. This should be done only once. + if (leaf_nodes_offset == 0 || file.sentTreeBlocks[0]) { + return true; + } + + for (int32_t i = 0; i < leaf_nodes_offset; ++i) { + if (!SendTreeBlock(fileId, blockIdx, i)) { + return false; + } + file.sentTreeBlocks[i] = true; + } + return true; +} + +bool IncrementalServer::SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx) { + const auto& file = files_[fileId]; + + BlockBuffer buffer; + const int64_t bytesRead = file.ReadTreeBlock(blockIdx, buffer.data); + if (bytesRead <= 0) { + fprintf(stderr, "Failed to get data for %s.idsig at blockIdx=%d.\n", file.filepath, blockIdx); - return SendResult::Error; + return false; + } + + buffer.header.compression_type = kCompressionNone; + buffer.header.block_type = kTypeHash; + buffer.header.file_id = toBigEndian(fileId); + buffer.header.block_size = toBigEndian(int16_t(bytesRead)); + buffer.header.block_idx = toBigEndian(blockIdx); + + Send(&buffer, ResponseHeader::responseSizeFor(bytesRead), /*flush=*/false); + + return true; +} + +auto IncrementalServer::SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult { + auto& file = files_[fileId]; + if (blockIdx >= static_cast<long>(file.sentBlocks.size())) { + // may happen as we schedule some extra blocks for reported page misses + D("Skipped reading file %s at block %" PRId32 " (past end).", file.filepath, blockIdx); + return SendResult::Skipped; } if (file.sentBlocks[blockIdx]) { return SendResult::Skipped; } - std::string error; - char raw[sizeof(ResponseHeader) + kBlockSize]; + + if (!SendTreeBlocksForDataBlock(fileId, blockIdx)) { + return SendResult::Error; + } + + BlockBuffer raw; bool isZipCompressed = false; - const int64_t bytesRead = file.ReadBlock(blockIdx, &raw, &isZipCompressed, &error); + const int64_t bytesRead = file.ReadDataBlock(blockIdx, raw.data, &isZipCompressed); if (bytesRead < 0) { - fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%s).\n", file.filepath, blockIdx, - error.c_str()); + fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%d).\n", file.filepath, blockIdx, + errno); return SendResult::Error; } - ResponseHeader* header = nullptr; - char data[sizeof(ResponseHeader) + kCompressBound]; - char* compressed = data + sizeof(*header); + BlockBuffer<kCompressBound> compressed; int16_t compressedSize = 0; if (!isZipCompressed) { - compressedSize = - LZ4_compress_default(raw + sizeof(*header), compressed, bytesRead, kCompressBound); + compressedSize = LZ4_compress_default(raw.data, compressed.data, bytesRead, kCompressBound); } int16_t blockSize; + ResponseHeader* header; if (compressedSize > 0 && compressedSize < kCompressedSizeMax) { ++compressed_; blockSize = compressedSize; - header = reinterpret_cast<ResponseHeader*>(data); + header = &compressed.header; header->compression_type = kCompressionLZ4; } else { ++uncompressed_; blockSize = bytesRead; - header = reinterpret_cast<ResponseHeader*>(raw); + header = &raw.header; header->compression_type = kCompressionNone; } header->block_type = kTypeData; - header->file_id = toBigEndian(fileId); header->block_size = toBigEndian(blockSize); header->block_idx = toBigEndian(blockIdx); file.sentBlocks[blockIdx] = true; file.sentBlocksCount += 1; - Send(header, sizeof(*header) + blockSize, flush); + Send(header, ResponseHeader::responseSizeFor(blockSize), flush); + return SendResult::Sent; } @@ -388,7 +479,8 @@ void IncrementalServer::RunPrefetching() { if (!priority_blocks.empty()) { for (auto& i = prefetch.priorityIndex; blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) { - if (auto res = SendBlock(file.id, priority_blocks[i]); res == SendResult::Sent) { + if (auto res = SendDataBlock(file.id, priority_blocks[i]); + res == SendResult::Sent) { --blocksToSend; } else if (res == SendResult::Error) { fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i); @@ -396,7 +488,7 @@ void IncrementalServer::RunPrefetching() { } } for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) { - if (auto res = SendBlock(file.id, i); res == SendResult::Sent) { + if (auto res = SendDataBlock(file.id, i); res == SendResult::Sent) { --blocksToSend; } else if (res == SendResult::Error) { fprintf(stderr, "Failed to send block %" PRId32 "\n", i); @@ -409,30 +501,25 @@ void IncrementalServer::RunPrefetching() { } void IncrementalServer::Send(const void* data, size_t size, bool flush) { - constexpr auto kChunkFlushSize = 31 * kBlockSize; - - if (pendingBlocks_.empty()) { - pendingBlocks_.resize(sizeof(ChunkHeader)); - } - pendingBlocks_.insert(pendingBlocks_.end(), static_cast<const char*>(data), - static_cast<const char*>(data) + size); - if (flush || pendingBlocks_.size() > kChunkFlushSize) { + pendingBlocks_ = std::copy_n(static_cast<const char*>(data), size, pendingBlocks_); + if (flush || pendingBlocks_ - pendingBlocksBuffer_.data() > kChunkFlushSize) { Flush(); } } void IncrementalServer::Flush() { - if (pendingBlocks_.empty()) { + auto dataBytes = pendingBlocks_ - (pendingBlocksBuffer_.data() + sizeof(ChunkHeader)); + if (dataBytes == 0) { return; } - *(ChunkHeader*)pendingBlocks_.data() = - toBigEndian<int32_t>(pendingBlocks_.size() - sizeof(ChunkHeader)); - if (!WriteFdExactly(adb_fd_, pendingBlocks_.data(), pendingBlocks_.size())) { - fprintf(stderr, "Failed to write %d bytes\n", int(pendingBlocks_.size())); + *(ChunkHeader*)pendingBlocksBuffer_.data() = toBigEndian<int32_t>(dataBytes); + auto totalBytes = sizeof(ChunkHeader) + dataBytes; + if (!WriteFdExactly(adb_fd_, pendingBlocksBuffer_.data(), totalBytes)) { + fprintf(stderr, "Failed to write %d bytes\n", int(totalBytes)); } - sentSize_ += pendingBlocks_.size(); - pendingBlocks_.clear(); + sentSize_ += totalBytes; + pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader); } bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int missesCount, @@ -443,7 +530,7 @@ bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int D("Streaming completed.\n" "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: " "%d, mb: %.3f\n" - "Total time taken: %.3fms\n", + "Total time taken: %.3fms", missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0, duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0); return true; @@ -510,9 +597,21 @@ bool IncrementalServer::Serve() { fileId, blockIdx); break; } - // fprintf(stderr, "\treading file %d block %04d\n", (int)fileId, - // (int)blockIdx); - if (auto res = SendBlock(fileId, blockIdx, true); res == SendResult::Error) { + + if (VLOG_IS_ON(INCREMENTAL)) { + auto& file = files_[fileId]; + auto posP = std::find(file.PriorityBlocks().begin(), + file.PriorityBlocks().end(), blockIdx); + D("\tMISSING BLOCK: reading file %d block %04d (in priority: %d of %d)", + (int)fileId, (int)blockIdx, + posP == file.PriorityBlocks().end() + ? -1 + : int(posP - file.PriorityBlocks().begin()), + int(file.PriorityBlocks().size())); + } + + if (auto res = SendDataBlock(fileId, blockIdx, true); + res == SendResult::Error) { fprintf(stderr, "Failed to send block %" PRId32 ".\n", blockIdx); } else if (res == SendResult::Sent) { ++missesSent; @@ -536,7 +635,7 @@ bool IncrementalServer::Serve() { fileId); break; } - D("Received prefetch request for file_id %" PRId16 ".\n", fileId); + D("Received prefetch request for file_id %" PRId16 ".", fileId); prefetches_.emplace_back(files_[fileId]); break; } @@ -551,6 +650,43 @@ bool IncrementalServer::Serve() { } } +static std::pair<unique_fd, int64_t> open_fd(const char* filepath) { + struct stat st; + if (stat(filepath, &st)) { + error_exit("inc-server: failed to stat input file '%s'.", filepath); + } + + unique_fd fd(adb_open(filepath, O_RDONLY)); + if (fd < 0) { + error_exit("inc-server: failed to open file '%s'.", filepath); + } + + return {std::move(fd), st.st_size}; +} + +static std::pair<unique_fd, int64_t> open_signature(int64_t file_size, const char* filepath) { + std::string signature_file(filepath); + signature_file += IDSIG; + + unique_fd fd(adb_open(signature_file.c_str(), O_RDONLY)); + if (fd < 0) { + error_exit("inc-server: failed to open file '%s'.", signature_file.c_str()); + } + + auto [tree_offset, tree_size] = skip_id_sig_headers(fd); + if (auto expected = verity_tree_size_for_file(file_size); tree_size != expected) { + error_exit("Verity tree size mismatch in signature file: %s [was %lld, expected %lld].\n", + signature_file.c_str(), (long long)tree_size, (long long)expected); + } + + int32_t data_block_count = numBytesToNumBlocks(file_size); + int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock; + D("Verity tree loaded: %s, tree size: %d (%d blocks, %d leafs)", signature_file.c_str(), + int(tree_size), int(numBytesToNumBlocks(tree_size)), int(leaf_nodes_count)); + + return {std::move(fd), tree_offset}; +} + bool serve(int connection_fd, int output_fd, int argc, const char** argv) { auto connection_ufd = unique_fd(connection_fd); auto output_ufd = unique_fd(output_fd); @@ -563,17 +699,11 @@ bool serve(int connection_fd, int output_fd, int argc, const char** argv) { for (int i = 0; i < argc; ++i) { auto filepath = argv[i]; - struct stat st; - if (stat(filepath, &st)) { - fprintf(stderr, "Failed to stat input file %s. Abort.\n", filepath); - return {}; - } + auto [file_fd, file_size] = open_fd(filepath); + auto [sign_fd, sign_offset] = open_signature(file_size, filepath); - unique_fd fd(adb_open(filepath, O_RDONLY)); - if (fd < 0) { - error_exit("inc-server: failed to open file '%s'.", filepath); - } - files.emplace_back(filepath, i, st.st_size, std::move(fd)); + files.emplace_back(filepath, i, file_size, std::move(file_fd), sign_offset, + std::move(sign_fd)); } IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files)); |