summaryrefslogtreecommitdiff
path: root/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
diff options
context:
space:
mode:
authorAlex Vakulenko <avakulenko@google.com>2017-01-27 14:41:04 -0800
committerJiwen 'Steve' Cai <jwcai@google.com>2017-01-28 15:04:54 -0800
commite4eec20f6263f4a42ae462456f60ea6c4518bb0a (patch)
tree306fdfb6c03485758748b180f98839e32461ce3f /libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
parentc34e059b3b044ec5346838e5b3d467c4f4bb6d65 (diff)
Add DaydreamVR native libraries and services
Upstreaming the main VR system components from master-dreamos-dev into goog/master. Bug: None Test: `m -j32` succeeds. Sailfish boots and basic_vr sample app works Change-Id: I853015872afc443aecee10411ef2d6b79184d051
Diffstat (limited to 'libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp')
-rw-r--r--libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp414
1 files changed, 414 insertions, 0 deletions
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
new file mode 100644
index 0000000000..4fbfcf680c
--- /dev/null
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
@@ -0,0 +1,414 @@
+#include "include/private/dvr/buffer_hub_queue_client.h"
+
+#include <base/logging.h>
+#include <sys/epoll.h>
+
+#include <array>
+
+#include <pdx/default_transport/client_channel.h>
+#include <pdx/default_transport/client_channel_factory.h>
+#include <pdx/file_handle.h>
+#include <private/dvr/bufferhub_rpc.h>
+
+using android::pdx::LocalHandle;
+using android::pdx::LocalChannelHandle;
+
+namespace android {
+namespace dvr {
+
+BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle,
+ size_t meta_size)
+ : Client{pdx::default_transport::ClientChannel::Create(
+ std::move(channel_handle))},
+ meta_size_(meta_size),
+ meta_buffer_tmp_(meta_size ? new uint8_t[meta_size] : nullptr),
+ buffers_(BufferHubQueue::kMaxQueueCapacity),
+ available_buffers_(BufferHubQueue::kMaxQueueCapacity),
+ capacity_(0) {
+ Initialize();
+}
+
+BufferHubQueue::BufferHubQueue(const std::string& endpoint_path,
+ size_t meta_size)
+ : Client{pdx::default_transport::ClientChannelFactory::Create(
+ endpoint_path)},
+ meta_size_(meta_size),
+ meta_buffer_tmp_(meta_size ? new uint8_t[meta_size] : nullptr),
+ buffers_(BufferHubQueue::kMaxQueueCapacity),
+ available_buffers_(BufferHubQueue::kMaxQueueCapacity),
+ capacity_(0) {
+ Initialize();
+}
+
+void BufferHubQueue::Initialize() {
+ int ret = epoll_fd_.Create();
+ if (ret < 0) {
+ LOG(ERROR) << "BufferHubQueue::BufferHubQueue: Failed to create epoll fd:"
+ << strerror(-ret);
+ return;
+ }
+
+ epoll_event event = {.events = EPOLLIN | EPOLLET,
+ .data = {.u64 = static_cast<uint64_t>(
+ BufferHubQueue::kEpollQueueEventIndex)}};
+ ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
+ if (ret < 0) {
+ LOG(ERROR) << "Failed to register ConsumerQueue into epoll event: "
+ << strerror(-ret);
+ }
+}
+
+std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
+ Status<std::pair<LocalChannelHandle, size_t>> status =
+ InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>();
+
+ if (!status) {
+ LOG(ERROR) << "Cannot create ConsumerQueue: " << status.GetErrorMessage();
+ return nullptr;
+ }
+
+ auto return_value = status.take();
+
+ VLOG(1) << "CreateConsumerQueue: meta_size_bytes=" << return_value.second;
+ return ConsumerQueue::Create(std::move(return_value.first),
+ return_value.second);
+}
+
+bool BufferHubQueue::WaitForBuffers(int timeout) {
+ std::array<epoll_event, kMaxEvents> events;
+
+ while (count() == 0) {
+ int ret = epoll_fd_.Wait(events.data(), events.size(), timeout);
+
+ if (ret == 0) {
+ VLOG(1) << "Wait on epoll returns nothing before timeout.";
+ return false;
+ }
+
+ if (ret < 0 && ret != -EINTR) {
+ LOG(ERROR) << "Failed to wait for buffers:" << strerror(-ret);
+ return false;
+ }
+
+ const int num_events = ret;
+
+ // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
+ // one for each buffer, in the queue and one extra event for the queue
+ // client itself.
+ for (int i = 0; i < num_events; i++) {
+ int64_t index = static_cast<int64_t>(events[i].data.u64);
+
+ VLOG(1) << "New BufferHubQueue event " << i << ": index=" << index;
+
+ if (is_buffer_event_index(index) && (events[i].events & EPOLLIN)) {
+ auto buffer = buffers_[index];
+ ret = OnBufferReady(buffer);
+ if (ret < 0) {
+ LOG(ERROR) << "Failed to set buffer ready:" << strerror(-ret);
+ continue;
+ }
+ Enqueue(buffer, index);
+ } else if (is_buffer_event_index(index) &&
+ (events[i].events & EPOLLHUP)) {
+ // This maybe caused by producer replacing an exising buffer slot.
+ // Currently the epoll FD is cleaned up when the replacement consumer
+ // client is imported.
+ LOG(WARNING) << "Receives EPOLLHUP at slot: " << index;
+ } else if (is_queue_event_index(index) && (events[i].events & EPOLLIN)) {
+ // Note that after buffer imports, if |count()| still returns 0, epoll
+ // wait will be tried again to acquire the newly imported buffer.
+ ret = OnBufferAllocated();
+ if (ret < 0) {
+ LOG(ERROR) << "Failed to import buffer:" << strerror(-ret);
+ continue;
+ }
+ } else {
+ LOG(WARNING) << "Unknown event " << i << ": u64=" << index
+ << ": events=" << events[i].events;
+ }
+ }
+ }
+
+ return true;
+}
+
+int BufferHubQueue::AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf,
+ size_t slot) {
+ if (is_full()) {
+ // TODO(jwcai) Move the check into Producer's AllocateBuffer and consumer's
+ // import buffer.
+ LOG(ERROR) << "BufferHubQueue::AddBuffer queue is at maximum capacity: "
+ << capacity_;
+ return -E2BIG;
+ }
+
+ if (buffers_[slot] != nullptr) {
+ // Replace the buffer if the slot is preoccupied. This could happen when the
+ // producer side replaced the slot with a newly allocated buffer. Detach the
+ // buffer and set up with the new one.
+ DetachBuffer(slot);
+ }
+
+ epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u64 = slot}};
+ const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buf->event_fd(), &event);
+ if (ret < 0) {
+ LOG(ERROR)
+ << "BufferHubQueue::AddBuffer: Failed to add buffer to epoll set:"
+ << strerror(-ret);
+ return ret;
+ }
+
+ buffers_[slot] = buf;
+ capacity_++;
+ return 0;
+}
+
+int BufferHubQueue::DetachBuffer(size_t slot) {
+ auto& buf = buffers_[slot];
+ if (buf == nullptr) {
+ LOG(ERROR) << "BufferHubQueue::DetachBuffer: Invalid slot: " << slot;
+ return -EINVAL;
+ }
+
+ const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, buf->event_fd(), nullptr);
+ if (ret < 0) {
+ LOG(ERROR) << "BufferHubQueue::DetachBuffer: Failed to detach buffer from "
+ "epoll set:"
+ << strerror(-ret);
+ return ret;
+ }
+
+ buffers_[slot] = nullptr;
+ capacity_--;
+ return 0;
+}
+
+void BufferHubQueue::Enqueue(std::shared_ptr<BufferHubBuffer> buf,
+ size_t slot) {
+ if (count() == capacity_) {
+ LOG(ERROR) << "Buffer queue is full!";
+ return;
+ }
+
+ // Set slot buffer back to vector.
+ // TODO(jwcai) Here have to dynamically allocate BufferInfo::metadata due to
+ // the limitation of the RingBuffer we are using. Would be better to refactor
+ // that.
+ BufferInfo buffer_info(slot, meta_size_);
+ // Swap buffer into vector.
+ std::swap(buffer_info.buffer, buf);
+ // Swap metadata loaded during onBufferReady into vector.
+ std::swap(buffer_info.metadata, meta_buffer_tmp_);
+
+ available_buffers_.Append(std::move(buffer_info));
+}
+
+std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout,
+ size_t* slot,
+ void* meta) {
+ VLOG(1) << "Dequeue: count=" << count() << ", timeout=" << timeout;
+
+ if (count() == 0 && !WaitForBuffers(timeout))
+ return nullptr;
+
+ std::shared_ptr<BufferHubBuffer> buf;
+ BufferInfo& buffer_info = available_buffers_.Front();
+
+ // Report current pos as the output slot.
+ std::swap(buffer_info.slot, *slot);
+ // Swap buffer from vector to be returned later.
+ std::swap(buffer_info.buffer, buf);
+ // Swap metadata from vector into tmp so that we can write out to |meta|.
+ std::swap(buffer_info.metadata, meta_buffer_tmp_);
+
+ available_buffers_.PopFront();
+
+ if (!buf) {
+ LOG(ERROR) << "Dequeue: Buffer to be dequeued is nullptr";
+ return nullptr;
+ }
+
+ if (meta) {
+ std::copy(meta_buffer_tmp_.get(), meta_buffer_tmp_.get() + meta_size_,
+ reinterpret_cast<uint8_t*>(meta));
+ }
+
+ return buf;
+}
+
+ProducerQueue::ProducerQueue(size_t meta_size)
+ : ProducerQueue(meta_size, 0, 0, 0, 0) {}
+
+ProducerQueue::ProducerQueue(LocalChannelHandle handle, size_t meta_size)
+ : BASE(std::move(handle), meta_size) {}
+
+ProducerQueue::ProducerQueue(size_t meta_size, int usage_set_mask,
+ int usage_clear_mask, int usage_deny_set_mask,
+ int usage_deny_clear_mask)
+ : BASE(BufferHubRPC::kClientPath, meta_size) {
+ auto status = InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(
+ meta_size_, usage_set_mask, usage_clear_mask, usage_deny_set_mask,
+ usage_deny_clear_mask);
+ if (!status) {
+ LOG(ERROR)
+ << "ProducerQueue::ProducerQueue: Failed to create producer queue: %s"
+ << status.GetErrorMessage();
+ Close(-status.error());
+ return;
+ }
+}
+
+int ProducerQueue::AllocateBuffer(int width, int height, int format, int usage,
+ size_t slice_count, size_t* out_slot) {
+ if (out_slot == nullptr) {
+ LOG(ERROR) << "Parameter out_slot cannot be null.";
+ return -EINVAL;
+ }
+
+ if (is_full()) {
+ LOG(ERROR) << "ProducerQueue::AllocateBuffer queue is at maximum capacity: "
+ << capacity();
+ return -E2BIG;
+ }
+
+ const size_t kBufferCount = 1U;
+
+ Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
+ InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
+ width, height, format, usage, slice_count, kBufferCount);
+ if (!status) {
+ LOG(ERROR) << "ProducerQueue::AllocateBuffer failed to create producer "
+ "buffer through BufferHub.";
+ return -status.error();
+ }
+
+ auto buffer_handle_slots = status.take();
+ CHECK_EQ(buffer_handle_slots.size(), kBufferCount)
+ << "BufferHubRPC::ProducerQueueAllocateBuffers should return one and "
+ "only one buffer handle.";
+
+ // We only allocate one buffer at a time.
+ auto& buffer_handle = buffer_handle_slots[0].first;
+ size_t buffer_slot = buffer_handle_slots[0].second;
+ VLOG(1) << "ProducerQueue::AllocateBuffer, new buffer, channel_handle: "
+ << buffer_handle.value();
+
+ *out_slot = buffer_slot;
+ return AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
+ buffer_slot);
+}
+
+int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf,
+ size_t slot) {
+ // For producer buffer, we need to enqueue the newly added buffer
+ // immediately. Producer queue starts with all buffers in available state.
+ const int ret = BufferHubQueue::AddBuffer(buf, slot);
+ if (ret < 0)
+ return ret;
+
+ Enqueue(buf, slot);
+ return 0;
+}
+
+int ProducerQueue::DetachBuffer(size_t slot) {
+ Status<int> status =
+ InvokeRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(slot);
+ if (!status) {
+ LOG(ERROR) << "ProducerQueue::DetachBuffer failed to detach producer "
+ "buffer through BufferHub, error: "
+ << status.GetErrorMessage();
+ return -status.error();
+ }
+
+ return BufferHubQueue::DetachBuffer(slot);
+}
+
+std::shared_ptr<BufferProducer> ProducerQueue::Dequeue(int timeout,
+ size_t* slot) {
+ auto buf = BufferHubQueue::Dequeue(timeout, slot, nullptr);
+ return std::static_pointer_cast<BufferProducer>(buf);
+}
+
+int ProducerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) {
+ auto buffer = std::static_pointer_cast<BufferProducer>(buf);
+ return buffer->GainAsync();
+}
+
+ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, size_t meta_size)
+ : BASE(std::move(handle), meta_size) {
+ // TODO(b/34387835) Import consumer queue in case the ProducerQueue we are
+ // based on was not empty.
+}
+
+int ConsumerQueue::ImportBuffers() {
+ Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
+ InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
+ if (!status) {
+ LOG(ERROR) << "ConsumerQueue::ImportBuffers failed to import consumer "
+ "buffer through BufferBub, error: "
+ << status.GetErrorMessage();
+ return -status.error();
+ }
+
+ int last_error = 0;
+ int imported_buffers = 0;
+
+ auto buffer_handle_slots = status.take();
+ for (auto& buffer_handle_slot : buffer_handle_slots) {
+ VLOG(1) << "ConsumerQueue::ImportBuffers, new buffer, buffer_handle: "
+ << buffer_handle_slot.first.value();
+
+ std::unique_ptr<BufferConsumer> buffer_consumer =
+ BufferConsumer::Import(std::move(buffer_handle_slot.first));
+ int ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
+ if (ret < 0) {
+ LOG(ERROR) << "ConsumerQueue::ImportBuffers failed to add buffer, ret: "
+ << strerror(-ret);
+ last_error = ret;
+ continue;
+ } else {
+ imported_buffers++;
+ }
+ }
+
+ return imported_buffers > 0 ? imported_buffers : last_error;
+}
+
+int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
+ size_t slot) {
+ // Consumer queue starts with all buffers in unavailable state.
+ return BufferHubQueue::AddBuffer(buf, slot);
+}
+
+std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(int timeout,
+ size_t* slot, void* meta,
+ size_t meta_size) {
+ if (meta_size != meta_size_) {
+ LOG(ERROR) << "metadata size (" << meta_size
+ << ") for the dequeuing buffer does not match metadata size ("
+ << meta_size_ << ") for the queue.";
+ return nullptr;
+ }
+ auto buf = BufferHubQueue::Dequeue(timeout, slot, meta);
+ return std::static_pointer_cast<BufferConsumer>(buf);
+}
+
+int ConsumerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) {
+ auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
+ LocalHandle fence;
+ return buffer->Acquire(&fence, meta_buffer_tmp_.get(), meta_size_);
+}
+
+int ConsumerQueue::OnBufferAllocated() {
+ const int ret = ImportBuffers();
+ if (ret == 0) {
+ LOG(WARNING) << "No new buffer can be imported on buffer allocated event.";
+ } else if (ret < 0) {
+ LOG(ERROR) << "Failed to import buffers on buffer allocated event.";
+ }
+ VLOG(1) << "Imported " << ret << " consumer buffers.";
+ return ret;
+}
+
+} // namespace dvr
+} // namespace android