summaryrefslogtreecommitdiff
path: root/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
diff options
context:
space:
mode:
authorTianyu Jiang <tianyuj@google.com>2018-11-07 14:56:56 -0800
committerTianyu Jiang <tianyuj@google.com>2018-11-08 13:19:43 -0800
commitf09f63f716de45558fa56870a8881305b91a67ce (patch)
tree846cf842eef49fa4aea857ad0e8e500fef75274c /libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
parentcc8d6c1aba0c4b66f6c6c7ceeeab9ace3c3cc491 (diff)
Let producer gain posted buffer if no released buffer exist.
ProducerQueue::Dequeue functionality does not change with the default param gain_posted_buffer = false. It does the following otherwise: 1. try to gain a released buffer and return. 2. try to find the oldest posted buffer and return. Bug: 80164475 Test: on Taimen with the following tests buffer_hub-test buffer_hub_queue-test buffer_hub_queue_producer-test dvr_api-test dvr_buffer_queue-test dvr_display-test Change-Id: I6b27740388e53348024054396b784dbc6c08ca7e
Diffstat (limited to 'libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp')
-rw-r--r--libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp170
1 files changed, 109 insertions, 61 deletions
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
index e1c1aa96c0..b8e2f9dec0 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
@@ -69,7 +69,7 @@ void BufferHubQueue::Initialize() {
.data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
if (ret < 0) {
- ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
+ ALOGE("%s: Failed to add event fd to epoll set: %s", __FUNCTION__,
strerror(-ret));
}
}
@@ -77,7 +77,7 @@ void BufferHubQueue::Initialize() {
Status<void> BufferHubQueue::ImportQueue() {
auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
if (!status) {
- ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
+ ALOGE("%s: Failed to import queue: %s", __FUNCTION__,
status.GetErrorMessage().c_str());
return ErrorStatus(status.error());
} else {
@@ -136,9 +136,7 @@ BufferHubQueue::CreateConsumerQueueParcelable(bool silent) {
consumer_queue->GetChannel()->TakeChannelParcelable());
if (!queue_parcelable.IsValid()) {
- ALOGE(
- "BufferHubQueue::CreateConsumerQueueParcelable: Failed to create "
- "consumer queue parcelable.");
+ ALOGE("%s: Failed to create consumer queue parcelable.", __FUNCTION__);
return ErrorStatus(EINVAL);
}
@@ -169,8 +167,7 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
}
if (ret < 0 && ret != -EINTR) {
- ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
- strerror(-ret));
+ ALOGE("%s: Failed to wait for buffers: %s", __FUNCTION__, strerror(-ret));
return false;
}
@@ -264,14 +261,14 @@ Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
// wait will be tried again to acquire the newly imported buffer.
auto buffer_status = OnBufferAllocated();
if (!buffer_status) {
- ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
+ ALOGE("%s: Failed to import buffer: %s", __FUNCTION__,
buffer_status.GetErrorMessage().c_str());
}
} else if (events & EPOLLHUP) {
- ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
+ ALOGD_IF(TRACE, "%s: hang up event!", __FUNCTION__);
hung_up_ = true;
} else {
- ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
+ ALOGW("%s: Unknown epoll events=%x", __FUNCTION__, events);
}
return {};
@@ -279,12 +276,11 @@ Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
Status<void> BufferHubQueue::AddBuffer(
const std::shared_ptr<BufferHubBase>& buffer, size_t slot) {
- ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu",
- buffer->id(), slot);
+ ALOGD_IF(TRACE, "%s: buffer_id=%d slot=%zu", __FUNCTION__, buffer->id(),
+ slot);
if (is_full()) {
- ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
- capacity_);
+ ALOGE("%s: queue is at maximum capacity: %zu", __FUNCTION__, capacity_);
return ErrorStatus(E2BIG);
}
@@ -303,7 +299,7 @@ Status<void> BufferHubQueue::AddBuffer(
const int ret =
epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event);
if (ret < 0) {
- ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
+ ALOGE("%s: Failed to add buffer to epoll set: %s", __FUNCTION__,
strerror(-ret));
return ErrorStatus(-ret);
}
@@ -315,17 +311,15 @@ Status<void> BufferHubQueue::AddBuffer(
}
Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
- ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot);
+ ALOGD_IF(TRACE, "%s: slot=%zu", __FUNCTION__, slot);
if (buffers_[slot]) {
for (const auto& event_source : buffers_[slot]->GetEventSources()) {
const int ret =
epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr);
if (ret < 0) {
- ALOGE(
- "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll "
- "set: %s",
- strerror(-ret));
+ ALOGE("%s: Failed to remove buffer from epoll set: %s", __FUNCTION__,
+ strerror(-ret));
return ErrorStatus(-ret);
}
}
@@ -345,23 +339,31 @@ Status<void> BufferHubQueue::Enqueue(Entry entry) {
if (!is_full()) {
available_buffers_.push(std::move(entry));
+ // Find and remove the enqueued buffer from unavailable_buffers_slot if
+ // exist.
+ auto enqueued_buffer_iter = std::find_if(
+ unavailable_buffers_slot_.begin(), unavailable_buffers_slot_.end(),
+ [&entry](size_t slot) -> bool { return slot == entry.slot; });
+ if (enqueued_buffer_iter != unavailable_buffers_slot_.end()) {
+ unavailable_buffers_slot_.erase(enqueued_buffer_iter);
+ }
+
// Trigger OnBufferAvailable callback if registered.
if (on_buffer_available_)
on_buffer_available_();
return {};
} else {
- ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
+ ALOGE("%s: Buffer queue is full!", __FUNCTION__);
return ErrorStatus(E2BIG);
}
}
Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout,
size_t* slot) {
- ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(),
- timeout);
+ ALOGD_IF(TRACE, "%s: count=%zu, timeout=%d", __FUNCTION__, count(), timeout);
- PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count());
+ PDX_TRACE_FORMAT("%s|count=%zu|", __FUNCTION__, count());
if (count() == 0) {
if (!WaitForBuffers(timeout))
@@ -376,6 +378,7 @@ Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout,
*slot = entry.slot;
available_buffers_.pop();
+ unavailable_buffers_slot_.push_back(*slot);
return {std::move(buffer)};
}
@@ -564,7 +567,7 @@ Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
auto status =
InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot);
if (!status) {
- ALOGE("ProducerQueue::RemoveBuffer: Failed to remove producer buffer: %s",
+ ALOGE("%s: Failed to remove producer buffer: %s", __FUNCTION__,
status.GetErrorMessage().c_str());
return status.error_status();
}
@@ -580,31 +583,81 @@ Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
- pdx::LocalHandle* release_fence) {
+ pdx::LocalHandle* release_fence, bool gain_posted_buffer) {
ATRACE_NAME("ProducerQueue::Dequeue");
if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) {
- ALOGE("ProducerQueue::Dequeue: Invalid parameter.");
+ ALOGE("%s: Invalid parameter.", __FUNCTION__);
return ErrorStatus(EINVAL);
}
- auto status = BufferHubQueue::Dequeue(timeout, slot);
- if (!status)
- return status.error_status();
-
- auto buffer = std::static_pointer_cast<BufferProducer>(status.take());
- const int ret = buffer->GainAsync(out_meta, release_fence);
+ std::shared_ptr<BufferProducer> buffer;
+ Status<std::shared_ptr<BufferHubBase>> dequeue_status =
+ BufferHubQueue::Dequeue(timeout, slot);
+ if (dequeue_status.ok()) {
+ buffer = std::static_pointer_cast<BufferProducer>(dequeue_status.take());
+ } else {
+ if (gain_posted_buffer) {
+ Status<std::shared_ptr<BufferProducer>> dequeue_unacquired_status =
+ ProducerQueue::DequeueUnacquiredBuffer(slot);
+ if (!dequeue_unacquired_status.ok()) {
+ ALOGE("%s: DequeueUnacquiredBuffer returned error: %d", __FUNCTION__,
+ dequeue_unacquired_status.error());
+ return dequeue_unacquired_status.error_status();
+ }
+ buffer = dequeue_unacquired_status.take();
+ } else {
+ return dequeue_status.error_status();
+ }
+ }
+ const int ret =
+ buffer->GainAsync(out_meta, release_fence, gain_posted_buffer);
if (ret < 0 && ret != -EALREADY)
return ErrorStatus(-ret);
return {std::move(buffer)};
}
+Status<std::shared_ptr<BufferProducer>> ProducerQueue::DequeueUnacquiredBuffer(
+ size_t* slot) {
+ if (unavailable_buffers_slot_.size() < 1) {
+ ALOGE(
+ "%s: Failed to dequeue un-acquired buffer. All buffer(s) are in "
+ "acquired state if exist.",
+ __FUNCTION__);
+ return ErrorStatus(ENOMEM);
+ }
+
+ // Find the first buffer that is not in acquired state from
+ // unavailable_buffers_slot_.
+ for (auto iter = unavailable_buffers_slot_.begin();
+ iter != unavailable_buffers_slot_.end(); iter++) {
+ std::shared_ptr<BufferProducer> buffer = ProducerQueue::GetBuffer(*iter);
+ if (buffer == nullptr) {
+ ALOGE("%s failed. Buffer slot %d is null.", __FUNCTION__,
+ static_cast<int>(*slot));
+ return ErrorStatus(EIO);
+ }
+ if (!BufferHubDefs::IsBufferAcquired(buffer->buffer_state())) {
+ *slot = *iter;
+ unavailable_buffers_slot_.erase(iter);
+ unavailable_buffers_slot_.push_back(*slot);
+ ALOGD("%s: Producer queue dequeue unacquired buffer in slot %d",
+ __FUNCTION__, static_cast<int>(*slot));
+ return {std::move(buffer)};
+ }
+ }
+ ALOGE(
+ "%s: Failed to dequeue un-acquired buffer. No un-acquired buffer exist.",
+ __FUNCTION__);
+ return ErrorStatus(EBUSY);
+}
+
pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() {
if (capacity() != 0) {
ALOGE(
- "ProducerQueue::TakeAsParcelable: producer queue can only be taken out"
- " as a parcelable when empty. Current queue capacity: %zu",
- capacity());
+ "%s: producer queue can only be taken out as a parcelable when empty. "
+ "Current queue capacity: %zu",
+ __FUNCTION__, capacity());
return ErrorStatus(EINVAL);
}
@@ -628,17 +681,16 @@ ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
: BufferHubQueue(std::move(handle)) {
auto status = ImportQueue();
if (!status) {
- ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
+ ALOGE("%s: Failed to import queue: %s", __FUNCTION__,
status.GetErrorMessage().c_str());
Close(-status.error());
}
auto import_status = ImportBuffers();
if (import_status) {
- ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
- import_status.get());
+ ALOGI("%s: Imported %zu buffers.", __FUNCTION__, import_status.get());
} else {
- ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
+ ALOGE("%s: Failed to import buffers: %s", __FUNCTION__,
import_status.GetErrorMessage().c_str());
}
}
@@ -647,14 +699,11 @@ Status<size_t> ConsumerQueue::ImportBuffers() {
auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
if (!status) {
if (status.error() == EBADR) {
- ALOGI(
- "ConsumerQueue::ImportBuffers: Queue is silent, no buffers "
- "imported.");
+ ALOGI("%s: Queue is silent, no buffers imported.", __FUNCTION__);
return {0};
} else {
- ALOGE(
- "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
- status.GetErrorMessage().c_str());
+ ALOGE("%s: Failed to import consumer buffer: %s", __FUNCTION__,
+ status.GetErrorMessage().c_str());
return status.error_status();
}
}
@@ -665,13 +714,13 @@ Status<size_t> ConsumerQueue::ImportBuffers() {
auto buffer_handle_slots = status.take();
for (auto& buffer_handle_slot : buffer_handle_slots) {
- ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
+ ALOGD_IF(TRACE, ": buffer_handle=%d", __FUNCTION__,
buffer_handle_slot.first.value());
std::unique_ptr<BufferConsumer> buffer_consumer =
BufferConsumer::Import(std::move(buffer_handle_slot.first));
if (!buffer_consumer) {
- ALOGE("ConsumerQueue::ImportBuffers: Failed to import buffer: slot=%zu",
+ ALOGE("%s: Failed to import buffer: slot=%zu", __FUNCTION__,
buffer_handle_slot.second);
last_error = ErrorStatus(EPIPE);
continue;
@@ -680,7 +729,7 @@ Status<size_t> ConsumerQueue::ImportBuffers() {
auto add_status =
AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
if (!add_status) {
- ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
+ ALOGE("%s: Failed to add buffer: %s", __FUNCTION__,
add_status.GetErrorMessage().c_str());
last_error = add_status;
} else {
@@ -696,8 +745,8 @@ Status<size_t> ConsumerQueue::ImportBuffers() {
Status<void> ConsumerQueue::AddBuffer(
const std::shared_ptr<BufferConsumer>& buffer, size_t slot) {
- ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
- id(), buffer->id(), slot);
+ ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu", __FUNCTION__, id(),
+ buffer->id(), slot);
return BufferHubQueue::AddBuffer(buffer, slot);
}
@@ -706,9 +755,9 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
LocalHandle* acquire_fence) {
if (user_metadata_size != user_metadata_size_) {
ALOGE(
- "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
- "does not match metadata size (%zu) for the queue.",
- user_metadata_size, user_metadata_size_);
+ "%s: Metadata size (%zu) for the dequeuing buffer does not match "
+ "metadata size (%zu) for the queue.",
+ __FUNCTION__, user_metadata_size, user_metadata_size_);
return ErrorStatus(EINVAL);
}
@@ -723,7 +772,7 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
if (metadata_src) {
memcpy(meta, metadata_src, user_metadata_size);
} else {
- ALOGW("ConsumerQueue::Dequeue: no user-defined metadata.");
+ ALOGW("%s: no user-defined metadata.", __FUNCTION__);
}
}
@@ -735,7 +784,7 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
pdx::LocalHandle* acquire_fence) {
ATRACE_NAME("ConsumerQueue::Dequeue");
if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) {
- ALOGE("ConsumerQueue::Dequeue: Invalid parameter.");
+ ALOGE("%s: Invalid parameter.", __FUNCTION__);
return ErrorStatus(EINVAL);
}
@@ -752,19 +801,18 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
}
Status<void> ConsumerQueue::OnBufferAllocated() {
- ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id());
+ ALOGD_IF(TRACE, "%s: queue_id=%d", __FUNCTION__, id());
auto status = ImportBuffers();
if (!status) {
- ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
+ ALOGE("%s: Failed to import buffers: %s", __FUNCTION__,
status.GetErrorMessage().c_str());
return ErrorStatus(status.error());
} else if (status.get() == 0) {
- ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
+ ALOGW("%s: No new buffers allocated!", __FUNCTION__);
return ErrorStatus(ENOBUFS);
} else {
- ALOGD_IF(TRACE,
- "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
+ ALOGD_IF(TRACE, "%s: Imported %zu consumer buffers.", __FUNCTION__,
status.get());
return {};
}