diff options
author | Alex Vakulenko <avakulenko@google.com> | 2017-01-27 14:41:04 -0800 |
---|---|---|
committer | Jiwen 'Steve' Cai <jwcai@google.com> | 2017-01-28 15:04:54 -0800 |
commit | e4eec20f6263f4a42ae462456f60ea6c4518bb0a (patch) | |
tree | 306fdfb6c03485758748b180f98839e32461ce3f /libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp | |
parent | c34e059b3b044ec5346838e5b3d467c4f4bb6d65 (diff) |
Add DaydreamVR native libraries and services
Upstreaming the main VR system components from master-dreamos-dev
into goog/master.
Bug: None
Test: `m -j32` succeeds. Sailfish boots and basic_vr sample app works
Change-Id: I853015872afc443aecee10411ef2d6b79184d051
Diffstat (limited to 'libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp')
-rw-r--r-- | libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp | 414 |
1 files changed, 414 insertions, 0 deletions
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp new file mode 100644 index 0000000000..4fbfcf680c --- /dev/null +++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp @@ -0,0 +1,414 @@ +#include "include/private/dvr/buffer_hub_queue_client.h" + +#include <base/logging.h> +#include <sys/epoll.h> + +#include <array> + +#include <pdx/default_transport/client_channel.h> +#include <pdx/default_transport/client_channel_factory.h> +#include <pdx/file_handle.h> +#include <private/dvr/bufferhub_rpc.h> + +using android::pdx::LocalHandle; +using android::pdx::LocalChannelHandle; + +namespace android { +namespace dvr { + +BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle, + size_t meta_size) + : Client{pdx::default_transport::ClientChannel::Create( + std::move(channel_handle))}, + meta_size_(meta_size), + meta_buffer_tmp_(meta_size ? new uint8_t[meta_size] : nullptr), + buffers_(BufferHubQueue::kMaxQueueCapacity), + available_buffers_(BufferHubQueue::kMaxQueueCapacity), + capacity_(0) { + Initialize(); +} + +BufferHubQueue::BufferHubQueue(const std::string& endpoint_path, + size_t meta_size) + : Client{pdx::default_transport::ClientChannelFactory::Create( + endpoint_path)}, + meta_size_(meta_size), + meta_buffer_tmp_(meta_size ? new uint8_t[meta_size] : nullptr), + buffers_(BufferHubQueue::kMaxQueueCapacity), + available_buffers_(BufferHubQueue::kMaxQueueCapacity), + capacity_(0) { + Initialize(); +} + +void BufferHubQueue::Initialize() { + int ret = epoll_fd_.Create(); + if (ret < 0) { + LOG(ERROR) << "BufferHubQueue::BufferHubQueue: Failed to create epoll fd:" + << strerror(-ret); + return; + } + + epoll_event event = {.events = EPOLLIN | EPOLLET, + .data = {.u64 = static_cast<uint64_t>( + BufferHubQueue::kEpollQueueEventIndex)}}; + ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event); + if (ret < 0) { + LOG(ERROR) << "Failed to register ConsumerQueue into epoll event: " + << strerror(-ret); + } +} + +std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() { + Status<std::pair<LocalChannelHandle, size_t>> status = + InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(); + + if (!status) { + LOG(ERROR) << "Cannot create ConsumerQueue: " << status.GetErrorMessage(); + return nullptr; + } + + auto return_value = status.take(); + + VLOG(1) << "CreateConsumerQueue: meta_size_bytes=" << return_value.second; + return ConsumerQueue::Create(std::move(return_value.first), + return_value.second); +} + +bool BufferHubQueue::WaitForBuffers(int timeout) { + std::array<epoll_event, kMaxEvents> events; + + while (count() == 0) { + int ret = epoll_fd_.Wait(events.data(), events.size(), timeout); + + if (ret == 0) { + VLOG(1) << "Wait on epoll returns nothing before timeout."; + return false; + } + + if (ret < 0 && ret != -EINTR) { + LOG(ERROR) << "Failed to wait for buffers:" << strerror(-ret); + return false; + } + + const int num_events = ret; + + // A BufferQueue's epoll fd tracks N+1 events, where there are N events, + // one for each buffer, in the queue and one extra event for the queue + // client itself. + for (int i = 0; i < num_events; i++) { + int64_t index = static_cast<int64_t>(events[i].data.u64); + + VLOG(1) << "New BufferHubQueue event " << i << ": index=" << index; + + if (is_buffer_event_index(index) && (events[i].events & EPOLLIN)) { + auto buffer = buffers_[index]; + ret = OnBufferReady(buffer); + if (ret < 0) { + LOG(ERROR) << "Failed to set buffer ready:" << strerror(-ret); + continue; + } + Enqueue(buffer, index); + } else if (is_buffer_event_index(index) && + (events[i].events & EPOLLHUP)) { + // This maybe caused by producer replacing an exising buffer slot. + // Currently the epoll FD is cleaned up when the replacement consumer + // client is imported. + LOG(WARNING) << "Receives EPOLLHUP at slot: " << index; + } else if (is_queue_event_index(index) && (events[i].events & EPOLLIN)) { + // Note that after buffer imports, if |count()| still returns 0, epoll + // wait will be tried again to acquire the newly imported buffer. + ret = OnBufferAllocated(); + if (ret < 0) { + LOG(ERROR) << "Failed to import buffer:" << strerror(-ret); + continue; + } + } else { + LOG(WARNING) << "Unknown event " << i << ": u64=" << index + << ": events=" << events[i].events; + } + } + } + + return true; +} + +int BufferHubQueue::AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf, + size_t slot) { + if (is_full()) { + // TODO(jwcai) Move the check into Producer's AllocateBuffer and consumer's + // import buffer. + LOG(ERROR) << "BufferHubQueue::AddBuffer queue is at maximum capacity: " + << capacity_; + return -E2BIG; + } + + if (buffers_[slot] != nullptr) { + // Replace the buffer if the slot is preoccupied. This could happen when the + // producer side replaced the slot with a newly allocated buffer. Detach the + // buffer and set up with the new one. + DetachBuffer(slot); + } + + epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u64 = slot}}; + const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buf->event_fd(), &event); + if (ret < 0) { + LOG(ERROR) + << "BufferHubQueue::AddBuffer: Failed to add buffer to epoll set:" + << strerror(-ret); + return ret; + } + + buffers_[slot] = buf; + capacity_++; + return 0; +} + +int BufferHubQueue::DetachBuffer(size_t slot) { + auto& buf = buffers_[slot]; + if (buf == nullptr) { + LOG(ERROR) << "BufferHubQueue::DetachBuffer: Invalid slot: " << slot; + return -EINVAL; + } + + const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, buf->event_fd(), nullptr); + if (ret < 0) { + LOG(ERROR) << "BufferHubQueue::DetachBuffer: Failed to detach buffer from " + "epoll set:" + << strerror(-ret); + return ret; + } + + buffers_[slot] = nullptr; + capacity_--; + return 0; +} + +void BufferHubQueue::Enqueue(std::shared_ptr<BufferHubBuffer> buf, + size_t slot) { + if (count() == capacity_) { + LOG(ERROR) << "Buffer queue is full!"; + return; + } + + // Set slot buffer back to vector. + // TODO(jwcai) Here have to dynamically allocate BufferInfo::metadata due to + // the limitation of the RingBuffer we are using. Would be better to refactor + // that. + BufferInfo buffer_info(slot, meta_size_); + // Swap buffer into vector. + std::swap(buffer_info.buffer, buf); + // Swap metadata loaded during onBufferReady into vector. + std::swap(buffer_info.metadata, meta_buffer_tmp_); + + available_buffers_.Append(std::move(buffer_info)); +} + +std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout, + size_t* slot, + void* meta) { + VLOG(1) << "Dequeue: count=" << count() << ", timeout=" << timeout; + + if (count() == 0 && !WaitForBuffers(timeout)) + return nullptr; + + std::shared_ptr<BufferHubBuffer> buf; + BufferInfo& buffer_info = available_buffers_.Front(); + + // Report current pos as the output slot. + std::swap(buffer_info.slot, *slot); + // Swap buffer from vector to be returned later. + std::swap(buffer_info.buffer, buf); + // Swap metadata from vector into tmp so that we can write out to |meta|. + std::swap(buffer_info.metadata, meta_buffer_tmp_); + + available_buffers_.PopFront(); + + if (!buf) { + LOG(ERROR) << "Dequeue: Buffer to be dequeued is nullptr"; + return nullptr; + } + + if (meta) { + std::copy(meta_buffer_tmp_.get(), meta_buffer_tmp_.get() + meta_size_, + reinterpret_cast<uint8_t*>(meta)); + } + + return buf; +} + +ProducerQueue::ProducerQueue(size_t meta_size) + : ProducerQueue(meta_size, 0, 0, 0, 0) {} + +ProducerQueue::ProducerQueue(LocalChannelHandle handle, size_t meta_size) + : BASE(std::move(handle), meta_size) {} + +ProducerQueue::ProducerQueue(size_t meta_size, int usage_set_mask, + int usage_clear_mask, int usage_deny_set_mask, + int usage_deny_clear_mask) + : BASE(BufferHubRPC::kClientPath, meta_size) { + auto status = InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>( + meta_size_, usage_set_mask, usage_clear_mask, usage_deny_set_mask, + usage_deny_clear_mask); + if (!status) { + LOG(ERROR) + << "ProducerQueue::ProducerQueue: Failed to create producer queue: %s" + << status.GetErrorMessage(); + Close(-status.error()); + return; + } +} + +int ProducerQueue::AllocateBuffer(int width, int height, int format, int usage, + size_t slice_count, size_t* out_slot) { + if (out_slot == nullptr) { + LOG(ERROR) << "Parameter out_slot cannot be null."; + return -EINVAL; + } + + if (is_full()) { + LOG(ERROR) << "ProducerQueue::AllocateBuffer queue is at maximum capacity: " + << capacity(); + return -E2BIG; + } + + const size_t kBufferCount = 1U; + + Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status = + InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>( + width, height, format, usage, slice_count, kBufferCount); + if (!status) { + LOG(ERROR) << "ProducerQueue::AllocateBuffer failed to create producer " + "buffer through BufferHub."; + return -status.error(); + } + + auto buffer_handle_slots = status.take(); + CHECK_EQ(buffer_handle_slots.size(), kBufferCount) + << "BufferHubRPC::ProducerQueueAllocateBuffers should return one and " + "only one buffer handle."; + + // We only allocate one buffer at a time. + auto& buffer_handle = buffer_handle_slots[0].first; + size_t buffer_slot = buffer_handle_slots[0].second; + VLOG(1) << "ProducerQueue::AllocateBuffer, new buffer, channel_handle: " + << buffer_handle.value(); + + *out_slot = buffer_slot; + return AddBuffer(BufferProducer::Import(std::move(buffer_handle)), + buffer_slot); +} + +int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf, + size_t slot) { + // For producer buffer, we need to enqueue the newly added buffer + // immediately. Producer queue starts with all buffers in available state. + const int ret = BufferHubQueue::AddBuffer(buf, slot); + if (ret < 0) + return ret; + + Enqueue(buf, slot); + return 0; +} + +int ProducerQueue::DetachBuffer(size_t slot) { + Status<int> status = + InvokeRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(slot); + if (!status) { + LOG(ERROR) << "ProducerQueue::DetachBuffer failed to detach producer " + "buffer through BufferHub, error: " + << status.GetErrorMessage(); + return -status.error(); + } + + return BufferHubQueue::DetachBuffer(slot); +} + +std::shared_ptr<BufferProducer> ProducerQueue::Dequeue(int timeout, + size_t* slot) { + auto buf = BufferHubQueue::Dequeue(timeout, slot, nullptr); + return std::static_pointer_cast<BufferProducer>(buf); +} + +int ProducerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) { + auto buffer = std::static_pointer_cast<BufferProducer>(buf); + return buffer->GainAsync(); +} + +ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, size_t meta_size) + : BASE(std::move(handle), meta_size) { + // TODO(b/34387835) Import consumer queue in case the ProducerQueue we are + // based on was not empty. +} + +int ConsumerQueue::ImportBuffers() { + Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status = + InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(); + if (!status) { + LOG(ERROR) << "ConsumerQueue::ImportBuffers failed to import consumer " + "buffer through BufferBub, error: " + << status.GetErrorMessage(); + return -status.error(); + } + + int last_error = 0; + int imported_buffers = 0; + + auto buffer_handle_slots = status.take(); + for (auto& buffer_handle_slot : buffer_handle_slots) { + VLOG(1) << "ConsumerQueue::ImportBuffers, new buffer, buffer_handle: " + << buffer_handle_slot.first.value(); + + std::unique_ptr<BufferConsumer> buffer_consumer = + BufferConsumer::Import(std::move(buffer_handle_slot.first)); + int ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second); + if (ret < 0) { + LOG(ERROR) << "ConsumerQueue::ImportBuffers failed to add buffer, ret: " + << strerror(-ret); + last_error = ret; + continue; + } else { + imported_buffers++; + } + } + + return imported_buffers > 0 ? imported_buffers : last_error; +} + +int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf, + size_t slot) { + // Consumer queue starts with all buffers in unavailable state. + return BufferHubQueue::AddBuffer(buf, slot); +} + +std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(int timeout, + size_t* slot, void* meta, + size_t meta_size) { + if (meta_size != meta_size_) { + LOG(ERROR) << "metadata size (" << meta_size + << ") for the dequeuing buffer does not match metadata size (" + << meta_size_ << ") for the queue."; + return nullptr; + } + auto buf = BufferHubQueue::Dequeue(timeout, slot, meta); + return std::static_pointer_cast<BufferConsumer>(buf); +} + +int ConsumerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) { + auto buffer = std::static_pointer_cast<BufferConsumer>(buf); + LocalHandle fence; + return buffer->Acquire(&fence, meta_buffer_tmp_.get(), meta_size_); +} + +int ConsumerQueue::OnBufferAllocated() { + const int ret = ImportBuffers(); + if (ret == 0) { + LOG(WARNING) << "No new buffer can be imported on buffer allocated event."; + } else if (ret < 0) { + LOG(ERROR) << "Failed to import buffers on buffer allocated event."; + } + VLOG(1) << "Imported " << ret << " consumer buffers."; + return ret; +} + +} // namespace dvr +} // namespace android |