summaryrefslogtreecommitdiff
path: root/adb/client/incremental_server.cpp
diff options
context:
space:
mode:
authorSteven Laver <lavers@google.com>2020-04-14 08:17:16 -0700
committerSteven Laver <lavers@google.com>2020-04-14 08:17:16 -0700
commit6377bb8fa531e15ee5fa10ffbcd1882fda46a1cc (patch)
tree5e1c70a0dc02885ec5c47ecc270f4a7b65eb0513 /adb/client/incremental_server.cpp
parentd9f287a3f37aae38c47f76e18b6f9d0d300c9247 (diff)
parent9403fa4ac16cece623145b139c523fe684cad6ee (diff)
Merge RP1A.200414.001
Change-Id: I5c1d78b2229f3cd244dc4cb29c8f21ae0d1b2fe8
Diffstat (limited to 'adb/client/incremental_server.cpp')
-rw-r--r--adb/client/incremental_server.cpp260
1 files changed, 195 insertions, 65 deletions
diff --git a/adb/client/incremental_server.cpp b/adb/client/incremental_server.cpp
index 4a131ce20f..bfe18c0b29 100644
--- a/adb/client/incremental_server.cpp
+++ b/adb/client/incremental_server.cpp
@@ -1,4 +1,4 @@
-/*
+/*
* Copyright (C) 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -44,9 +44,10 @@
namespace incremental {
-static constexpr int kBlockSize = 4096;
+static constexpr int kHashesPerBlock = kBlockSize / kDigestSize;
static constexpr int kCompressedSizeMax = kBlockSize * 0.95;
static constexpr int8_t kTypeData = 0;
+static constexpr int8_t kTypeHash = 1;
static constexpr int8_t kCompressionNone = 0;
static constexpr int8_t kCompressionLZ4 = 1;
static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize));
@@ -132,41 +133,64 @@ struct ResponseHeader {
CompressionType compression_type; // 1 byte
BlockIdx block_idx; // 4 bytes
BlockSize block_size; // 2 bytes
+
+ static constexpr size_t responseSizeFor(size_t dataSize) {
+ return dataSize + sizeof(ResponseHeader);
+ }
+} __attribute__((packed));
+
+template <size_t Size = kBlockSize>
+struct BlockBuffer {
+ ResponseHeader header;
+ char data[Size];
} __attribute__((packed));
// Holds streaming state for a file
class File {
public:
// Plain file
- File(const char* filepath, FileId id, int64_t size, unique_fd fd) : File(filepath, id, size) {
+ File(const char* filepath, FileId id, int64_t size, unique_fd fd, int64_t tree_offset,
+ unique_fd tree_fd)
+ : File(filepath, id, size, tree_offset) {
this->fd_ = std::move(fd);
+ this->tree_fd_ = std::move(tree_fd);
priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size);
}
- int64_t ReadBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed,
- std::string* error) const {
- char* buf_ptr = static_cast<char*>(buf);
+ int64_t ReadDataBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed) const {
int64_t bytes_read = -1;
const off64_t offsetStart = blockIndexToOffset(block_idx);
- bytes_read = adb_pread(fd_, &buf_ptr[sizeof(ResponseHeader)], kBlockSize, offsetStart);
+ bytes_read = adb_pread(fd_, buf, kBlockSize, offsetStart);
+ return bytes_read;
+ }
+ int64_t ReadTreeBlock(BlockIdx block_idx, void* buf) const {
+ int64_t bytes_read = -1;
+ const off64_t offsetStart = tree_offset_ + blockIndexToOffset(block_idx);
+ bytes_read = adb_pread(tree_fd_, buf, kBlockSize, offsetStart);
return bytes_read;
}
- const unique_fd& RawFd() const { return fd_; }
const std::vector<BlockIdx>& PriorityBlocks() const { return priority_blocks_; }
std::vector<bool> sentBlocks;
NumBlocks sentBlocksCount = 0;
+ std::vector<bool> sentTreeBlocks;
+
const char* const filepath;
const FileId id;
const int64_t size;
private:
- File(const char* filepath, FileId id, int64_t size) : filepath(filepath), id(id), size(size) {
+ File(const char* filepath, FileId id, int64_t size, int64_t tree_offset)
+ : filepath(filepath), id(id), size(size), tree_offset_(tree_offset) {
sentBlocks.resize(numBytesToNumBlocks(size));
+ sentTreeBlocks.resize(verity_tree_blocks_for_file(size));
}
unique_fd fd_;
std::vector<BlockIdx> priority_blocks_;
+
+ unique_fd tree_fd_;
+ const int64_t tree_offset_;
};
class IncrementalServer {
@@ -174,6 +198,8 @@ class IncrementalServer {
IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector<File> files)
: adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) {
buffer_.reserve(kReadBufferSize);
+ pendingBlocksBuffer_.resize(kChunkFlushSize + 2 * kBlockSize);
+ pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
}
bool Serve();
@@ -208,7 +234,11 @@ class IncrementalServer {
void erase_buffer_head(int count) { buffer_.erase(buffer_.begin(), buffer_.begin() + count); }
enum class SendResult { Sent, Skipped, Error };
- SendResult SendBlock(FileId fileId, BlockIdx blockIdx, bool flush = false);
+ SendResult SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush = false);
+
+ bool SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx);
+ bool SendTreeBlocksForDataBlock(FileId fileId, BlockIdx blockIdx);
+
bool SendDone();
void RunPrefetching();
@@ -228,7 +258,10 @@ class IncrementalServer {
int compressed_ = 0, uncompressed_ = 0;
long long sentSize_ = 0;
- std::vector<char> pendingBlocks_;
+ static constexpr auto kChunkFlushSize = 31 * kBlockSize;
+
+ std::vector<char> pendingBlocksBuffer_;
+ char* pendingBlocks_ = nullptr;
// True when client notifies that all the data has been received
bool servingComplete_ = false;
@@ -250,7 +283,7 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking)
if (bcur > 0) {
// output the rest.
- WriteFdExactly(output_fd_, buffer_.data(), bcur);
+ (void)WriteFdExactly(output_fd_, buffer_.data(), bcur);
erase_buffer_head(bcur);
}
@@ -265,9 +298,10 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking)
auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0);
if (res != 1) {
- WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
+ auto err = errno;
+ (void)WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
if (res < 0) {
- D("Failed to poll: %s\n", strerror(errno));
+ D("Failed to poll: %s", strerror(err));
return false;
}
if (blocking) {
@@ -289,7 +323,7 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking)
continue;
}
- D("Failed to read from fd %d: %d. Exit\n", adb_fd_.get(), errno);
+ D("Failed to read from fd %d: %d. Exit", adb_fd_.get(), errno);
break;
}
// socket is closed. print remaining messages
@@ -313,56 +347,113 @@ std::optional<RequestCommand> IncrementalServer::ReadRequest(bool blocking) {
return request;
}
-auto IncrementalServer::SendBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult {
+bool IncrementalServer::SendTreeBlocksForDataBlock(const FileId fileId, const BlockIdx blockIdx) {
auto& file = files_[fileId];
- if (blockIdx >= static_cast<long>(file.sentBlocks.size())) {
- fprintf(stderr, "Failed to read file %s at block %" PRId32 " (past end).\n", file.filepath,
+ const int32_t data_block_count = numBytesToNumBlocks(file.size);
+
+ const int32_t total_nodes_count(file.sentTreeBlocks.size());
+ const int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
+
+ const int32_t leaf_nodes_offset = total_nodes_count - leaf_nodes_count;
+
+ // Leaf level, sending only 1 block.
+ const int32_t leaf_idx = leaf_nodes_offset + blockIdx / kHashesPerBlock;
+ if (file.sentTreeBlocks[leaf_idx]) {
+ return true;
+ }
+ if (!SendTreeBlock(fileId, blockIdx, leaf_idx)) {
+ return false;
+ }
+ file.sentTreeBlocks[leaf_idx] = true;
+
+ // Non-leaf, sending EVERYTHING. This should be done only once.
+ if (leaf_nodes_offset == 0 || file.sentTreeBlocks[0]) {
+ return true;
+ }
+
+ for (int32_t i = 0; i < leaf_nodes_offset; ++i) {
+ if (!SendTreeBlock(fileId, blockIdx, i)) {
+ return false;
+ }
+ file.sentTreeBlocks[i] = true;
+ }
+ return true;
+}
+
+bool IncrementalServer::SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx) {
+ const auto& file = files_[fileId];
+
+ BlockBuffer buffer;
+ const int64_t bytesRead = file.ReadTreeBlock(blockIdx, buffer.data);
+ if (bytesRead <= 0) {
+ fprintf(stderr, "Failed to get data for %s.idsig at blockIdx=%d.\n", file.filepath,
blockIdx);
- return SendResult::Error;
+ return false;
+ }
+
+ buffer.header.compression_type = kCompressionNone;
+ buffer.header.block_type = kTypeHash;
+ buffer.header.file_id = toBigEndian(fileId);
+ buffer.header.block_size = toBigEndian(int16_t(bytesRead));
+ buffer.header.block_idx = toBigEndian(blockIdx);
+
+ Send(&buffer, ResponseHeader::responseSizeFor(bytesRead), /*flush=*/false);
+
+ return true;
+}
+
+auto IncrementalServer::SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult {
+ auto& file = files_[fileId];
+ if (blockIdx >= static_cast<long>(file.sentBlocks.size())) {
+ // may happen as we schedule some extra blocks for reported page misses
+ D("Skipped reading file %s at block %" PRId32 " (past end).", file.filepath, blockIdx);
+ return SendResult::Skipped;
}
if (file.sentBlocks[blockIdx]) {
return SendResult::Skipped;
}
- std::string error;
- char raw[sizeof(ResponseHeader) + kBlockSize];
+
+ if (!SendTreeBlocksForDataBlock(fileId, blockIdx)) {
+ return SendResult::Error;
+ }
+
+ BlockBuffer raw;
bool isZipCompressed = false;
- const int64_t bytesRead = file.ReadBlock(blockIdx, &raw, &isZipCompressed, &error);
+ const int64_t bytesRead = file.ReadDataBlock(blockIdx, raw.data, &isZipCompressed);
if (bytesRead < 0) {
- fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%s).\n", file.filepath, blockIdx,
- error.c_str());
+ fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%d).\n", file.filepath, blockIdx,
+ errno);
return SendResult::Error;
}
- ResponseHeader* header = nullptr;
- char data[sizeof(ResponseHeader) + kCompressBound];
- char* compressed = data + sizeof(*header);
+ BlockBuffer<kCompressBound> compressed;
int16_t compressedSize = 0;
if (!isZipCompressed) {
- compressedSize =
- LZ4_compress_default(raw + sizeof(*header), compressed, bytesRead, kCompressBound);
+ compressedSize = LZ4_compress_default(raw.data, compressed.data, bytesRead, kCompressBound);
}
int16_t blockSize;
+ ResponseHeader* header;
if (compressedSize > 0 && compressedSize < kCompressedSizeMax) {
++compressed_;
blockSize = compressedSize;
- header = reinterpret_cast<ResponseHeader*>(data);
+ header = &compressed.header;
header->compression_type = kCompressionLZ4;
} else {
++uncompressed_;
blockSize = bytesRead;
- header = reinterpret_cast<ResponseHeader*>(raw);
+ header = &raw.header;
header->compression_type = kCompressionNone;
}
header->block_type = kTypeData;
-
header->file_id = toBigEndian(fileId);
header->block_size = toBigEndian(blockSize);
header->block_idx = toBigEndian(blockIdx);
file.sentBlocks[blockIdx] = true;
file.sentBlocksCount += 1;
- Send(header, sizeof(*header) + blockSize, flush);
+ Send(header, ResponseHeader::responseSizeFor(blockSize), flush);
+
return SendResult::Sent;
}
@@ -388,7 +479,8 @@ void IncrementalServer::RunPrefetching() {
if (!priority_blocks.empty()) {
for (auto& i = prefetch.priorityIndex;
blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) {
- if (auto res = SendBlock(file.id, priority_blocks[i]); res == SendResult::Sent) {
+ if (auto res = SendDataBlock(file.id, priority_blocks[i]);
+ res == SendResult::Sent) {
--blocksToSend;
} else if (res == SendResult::Error) {
fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i);
@@ -396,7 +488,7 @@ void IncrementalServer::RunPrefetching() {
}
}
for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) {
- if (auto res = SendBlock(file.id, i); res == SendResult::Sent) {
+ if (auto res = SendDataBlock(file.id, i); res == SendResult::Sent) {
--blocksToSend;
} else if (res == SendResult::Error) {
fprintf(stderr, "Failed to send block %" PRId32 "\n", i);
@@ -409,30 +501,25 @@ void IncrementalServer::RunPrefetching() {
}
void IncrementalServer::Send(const void* data, size_t size, bool flush) {
- constexpr auto kChunkFlushSize = 31 * kBlockSize;
-
- if (pendingBlocks_.empty()) {
- pendingBlocks_.resize(sizeof(ChunkHeader));
- }
- pendingBlocks_.insert(pendingBlocks_.end(), static_cast<const char*>(data),
- static_cast<const char*>(data) + size);
- if (flush || pendingBlocks_.size() > kChunkFlushSize) {
+ pendingBlocks_ = std::copy_n(static_cast<const char*>(data), size, pendingBlocks_);
+ if (flush || pendingBlocks_ - pendingBlocksBuffer_.data() > kChunkFlushSize) {
Flush();
}
}
void IncrementalServer::Flush() {
- if (pendingBlocks_.empty()) {
+ auto dataBytes = pendingBlocks_ - (pendingBlocksBuffer_.data() + sizeof(ChunkHeader));
+ if (dataBytes == 0) {
return;
}
- *(ChunkHeader*)pendingBlocks_.data() =
- toBigEndian<int32_t>(pendingBlocks_.size() - sizeof(ChunkHeader));
- if (!WriteFdExactly(adb_fd_, pendingBlocks_.data(), pendingBlocks_.size())) {
- fprintf(stderr, "Failed to write %d bytes\n", int(pendingBlocks_.size()));
+ *(ChunkHeader*)pendingBlocksBuffer_.data() = toBigEndian<int32_t>(dataBytes);
+ auto totalBytes = sizeof(ChunkHeader) + dataBytes;
+ if (!WriteFdExactly(adb_fd_, pendingBlocksBuffer_.data(), totalBytes)) {
+ fprintf(stderr, "Failed to write %d bytes\n", int(totalBytes));
}
- sentSize_ += pendingBlocks_.size();
- pendingBlocks_.clear();
+ sentSize_ += totalBytes;
+ pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
}
bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int missesCount,
@@ -443,7 +530,7 @@ bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int
D("Streaming completed.\n"
"Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: "
"%d, mb: %.3f\n"
- "Total time taken: %.3fms\n",
+ "Total time taken: %.3fms",
missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0,
duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0);
return true;
@@ -510,9 +597,21 @@ bool IncrementalServer::Serve() {
fileId, blockIdx);
break;
}
- // fprintf(stderr, "\treading file %d block %04d\n", (int)fileId,
- // (int)blockIdx);
- if (auto res = SendBlock(fileId, blockIdx, true); res == SendResult::Error) {
+
+ if (VLOG_IS_ON(INCREMENTAL)) {
+ auto& file = files_[fileId];
+ auto posP = std::find(file.PriorityBlocks().begin(),
+ file.PriorityBlocks().end(), blockIdx);
+ D("\tMISSING BLOCK: reading file %d block %04d (in priority: %d of %d)",
+ (int)fileId, (int)blockIdx,
+ posP == file.PriorityBlocks().end()
+ ? -1
+ : int(posP - file.PriorityBlocks().begin()),
+ int(file.PriorityBlocks().size()));
+ }
+
+ if (auto res = SendDataBlock(fileId, blockIdx, true);
+ res == SendResult::Error) {
fprintf(stderr, "Failed to send block %" PRId32 ".\n", blockIdx);
} else if (res == SendResult::Sent) {
++missesSent;
@@ -536,7 +635,7 @@ bool IncrementalServer::Serve() {
fileId);
break;
}
- D("Received prefetch request for file_id %" PRId16 ".\n", fileId);
+ D("Received prefetch request for file_id %" PRId16 ".", fileId);
prefetches_.emplace_back(files_[fileId]);
break;
}
@@ -551,6 +650,43 @@ bool IncrementalServer::Serve() {
}
}
+static std::pair<unique_fd, int64_t> open_fd(const char* filepath) {
+ struct stat st;
+ if (stat(filepath, &st)) {
+ error_exit("inc-server: failed to stat input file '%s'.", filepath);
+ }
+
+ unique_fd fd(adb_open(filepath, O_RDONLY));
+ if (fd < 0) {
+ error_exit("inc-server: failed to open file '%s'.", filepath);
+ }
+
+ return {std::move(fd), st.st_size};
+}
+
+static std::pair<unique_fd, int64_t> open_signature(int64_t file_size, const char* filepath) {
+ std::string signature_file(filepath);
+ signature_file += IDSIG;
+
+ unique_fd fd(adb_open(signature_file.c_str(), O_RDONLY));
+ if (fd < 0) {
+ error_exit("inc-server: failed to open file '%s'.", signature_file.c_str());
+ }
+
+ auto [tree_offset, tree_size] = skip_id_sig_headers(fd);
+ if (auto expected = verity_tree_size_for_file(file_size); tree_size != expected) {
+ error_exit("Verity tree size mismatch in signature file: %s [was %lld, expected %lld].\n",
+ signature_file.c_str(), (long long)tree_size, (long long)expected);
+ }
+
+ int32_t data_block_count = numBytesToNumBlocks(file_size);
+ int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
+ D("Verity tree loaded: %s, tree size: %d (%d blocks, %d leafs)", signature_file.c_str(),
+ int(tree_size), int(numBytesToNumBlocks(tree_size)), int(leaf_nodes_count));
+
+ return {std::move(fd), tree_offset};
+}
+
bool serve(int connection_fd, int output_fd, int argc, const char** argv) {
auto connection_ufd = unique_fd(connection_fd);
auto output_ufd = unique_fd(output_fd);
@@ -563,17 +699,11 @@ bool serve(int connection_fd, int output_fd, int argc, const char** argv) {
for (int i = 0; i < argc; ++i) {
auto filepath = argv[i];
- struct stat st;
- if (stat(filepath, &st)) {
- fprintf(stderr, "Failed to stat input file %s. Abort.\n", filepath);
- return {};
- }
+ auto [file_fd, file_size] = open_fd(filepath);
+ auto [sign_fd, sign_offset] = open_signature(file_size, filepath);
- unique_fd fd(adb_open(filepath, O_RDONLY));
- if (fd < 0) {
- error_exit("inc-server: failed to open file '%s'.", filepath);
- }
- files.emplace_back(filepath, i, st.st_size, std::move(fd));
+ files.emplace_back(filepath, i, file_size, std::move(file_fd), sign_offset,
+ std::move(sign_fd));
}
IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files));