summaryrefslogtreecommitdiff
path: root/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
diff options
context:
space:
mode:
authorCorey Tabaka <eieio@google.com>2017-09-13 18:02:48 -0700
committerJiwen 'Steve' Cai <jwcai@google.com>2017-10-10 20:39:56 -0700
commit52ea25cf06cef250ec73052611b48556b3fce4d5 (patch)
tree6595f49407fbe45702f943d913a1d34bd1910fb8 /libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
parent35b5114be8da71c69fc8a1ff8fb457c912c0992f (diff)
Add shared memory based buffer metadata
This CLs reduces BufferHub CPU consumption by adding asynchronous state transition so that out-of-process VR composition can run on 2016 pixel devices smoothly. In addition, this CL addresses a couple corner cases in the existing bufferhub logic, which fixes various blackscreen issues. 1/ Tracks buffer transition states (gained, posted, acquired, released) from the client side via atomic shared memory and adds PostAsync/AcquireAsync/ReleaseAsync/GainAsync with metadata and fence support. 2/ Adds dequeue order guarantee for buffers enqueued with dvrWriteBufferQueuePostBuffer. 3/ Synchronous BuffeHub operations are still supported. 4/ Bump up the bufferhubd's soft limit of open file descriptor. 5/ Handle orphaned consumer in acquired state. This is a corner case that consumer process goes aways (most likely due to a crash) leaving buffer stuck in acquired state with inconsistent buffer state. 6/ Fixes a race condition for released buffer to be Gain'ed and Acquire'd when a new consumer is created in released state. 7/ Improve silent consumer queue efficiency: Silent queues no longer import buffers or receive signals about new buffers and they are limited to only spawning other consumers and notifications about producers hanging up. 8/ Modify PDX/UDS channel event signaling to work around epoll behavior. PDX UDS uses a combination of an eventfd and an epoll set to simulate the original PDX transport channel events. An odd behavior discovered in the kernel implementation of epoll was found that causes the epoll fd to "unsignal" itself whenever epoll_wait() is called on it, regardless of whether it should still be pending. This breaks the edge triggerd behavior in nested epoll sets that channel events depend on. Since this is unlikely to ever be fixed in the kernel we work around the behavior by using the epoll set only as a logical OR of two eventfds and never calling epoll_wait() on it. When polling is required we use regluar poll() with the eventfds and data fd to avoid the bad behavior in epoll_wait(). 9/ Keep reading data after PDX hangup signal. UDS will signal hangup when the other end of the socket closes. However, data could still be in the kerenl buffer and should be consumed. Fix an issue where the service misses an impulse sent right before the socket is closed. Bug: 65455724 Bug: 65458354 Bug: 65458312 Bug: 64027135 Bug: 67424527 Test: libpdx_uds_tests bufferhub_tests buffer_hub_queue-test buffer_hub_queue_producer-test dvr_api-test Change-Id: Id07db1f206ccf4e06f7ee3c671193334408971ca
Diffstat (limited to 'libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp')
-rw-r--r--libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp300
1 files changed, 126 insertions, 174 deletions
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
index f9f87ff1b8..8bea0cde7a 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
@@ -10,6 +10,7 @@
#include <pdx/default_transport/client_channel.h>
#include <pdx/default_transport/client_channel_factory.h>
#include <pdx/file_handle.h>
+#include <pdx/trace.h>
#define RETRY_EINTR(fnc_call) \
([&]() -> decltype(fnc_call) { \
@@ -44,17 +45,6 @@ Status<int> PollEvents(int fd, short events) {
}
}
-// Polls a buffer for the given events, taking care to do the proper
-// translation.
-Status<int> PollEvents(const std::shared_ptr<BufferHubBuffer>& buffer,
- short events) {
- auto poll_status = PollEvents(buffer->event_fd(), events);
- if (!poll_status)
- return poll_status;
-
- return buffer->GetEventMask(poll_status.get());
-}
-
std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
return {static_cast<int32_t>(value >> 32),
static_cast<int32_t>(value & ((1ull << 32) - 1))};
@@ -115,27 +105,27 @@ void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) {
default_width_ = queue_info.producer_config.default_width;
default_height_ = queue_info.producer_config.default_height;
default_format_ = queue_info.producer_config.default_format;
- meta_size_ = queue_info.producer_config.meta_size_bytes;
+ user_metadata_size_ = queue_info.producer_config.user_metadata_size;
id_ = queue_info.id;
}
std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
- if (auto status = CreateConsumerQueueHandle())
+ if (auto status = CreateConsumerQueueHandle(/*silent*/ false))
return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
else
return nullptr;
}
std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
- if (auto status = CreateConsumerQueueHandle())
- return std::unique_ptr<ConsumerQueue>(
- new ConsumerQueue(status.take(), true));
+ if (auto status = CreateConsumerQueueHandle(/*silent*/ true))
+ return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
else
return nullptr;
}
-Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() {
- auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>();
+Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle(
+ bool silent) {
+ auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent);
if (!status) {
ALOGE(
"BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
@@ -148,6 +138,7 @@ Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() {
}
bool BufferHubQueue::WaitForBuffers(int timeout) {
+ ATRACE_NAME("BufferHubQueue::WaitForBuffers");
std::array<epoll_event, kMaxEvents> events;
// Loop at least once to check for hangups.
@@ -178,13 +169,18 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
const int num_events = ret;
// A BufferQueue's epoll fd tracks N+1 events, where there are N events,
- // one for each buffer, in the queue and one extra event for the queue
+ // one for each buffer in the queue, and one extra event for the queue
// client itself.
for (int i = 0; i < num_events; i++) {
int32_t event_fd;
int32_t index;
std::tie(event_fd, index) = Unstuff(events[i].data.u64);
+ PDX_TRACE_FORMAT(
+ "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;"
+ "slot=%d|",
+ id(), num_events, i, event_fd, index);
+
ALOGD_IF(TRACE,
"BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
i, event_fd, index);
@@ -208,6 +204,7 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
int poll_events) {
+ ATRACE_NAME("BufferHubQueue::HandleBufferEvent");
if (!buffers_[slot]) {
ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
return ErrorStatus(ENOENT);
@@ -221,58 +218,19 @@ Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
}
const int events = status.get();
+ PDX_TRACE_FORMAT(
+ "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;"
+ "events=%d|",
+ id(), buffers_[slot]->id(), slot, event_fd, poll_events, events);
+
if (events & EPOLLIN) {
- auto entry_status = OnBufferReady(buffers_[slot], slot);
- if (entry_status.ok() || entry_status.error() == EALREADY) {
- // Only enqueue the buffer if it moves to or is already in the state
- // requested in OnBufferReady().
- return Enqueue(entry_status.take());
- } else if (entry_status.error() == EBUSY) {
- // If the buffer is busy this means that the buffer moved from released to
- // posted when a new consumer was created before the ProducerQueue had a
- // chance to regain it. This is a valid transition that we have to handle
- // because edge triggered poll events latch the ready state even if it is
- // later de-asserted -- don't enqueue or print an error log in this case.
- } else {
- ALOGE(
- "BufferHubQueue::HandleBufferEvent: Failed to set buffer ready, "
- "queue_id=%d buffer_id=%d: %s",
- id(), buffers_[slot]->id(), entry_status.GetErrorMessage().c_str());
- }
+ return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()});
} else if (events & EPOLLHUP) {
- // Check to see if the current buffer in the slot hung up. This is a bit of
- // paranoia to deal with the epoll set getting out of sync with the buffer
- // slots.
- auto poll_status = PollEvents(buffers_[slot], POLLIN);
- if (!poll_status && poll_status.error() != ETIMEDOUT) {
- ALOGE("BufferHubQueue::HandleBufferEvent: Failed to poll buffer: %s",
- poll_status.GetErrorMessage().c_str());
- return poll_status.error_status();
- }
-
- const bool hangup_pending = status.ok() && (poll_status.get() & EPOLLHUP);
-
ALOGW(
"BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
- "event_fd=%d buffer_id=%d hangup_pending=%d poll_status=%x",
- slot, buffers_[slot]->event_fd(), buffers_[slot]->id(), hangup_pending,
- poll_status.get());
-
- if (hangup_pending) {
- return RemoveBuffer(slot);
- } else {
- // Clean up the bookkeeping for the event fd. This is a bit of paranoia to
- // deal with the epoll set getting out of sync with the buffer slots.
- // Hitting this path should be very unusual.
- const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, event_fd, nullptr);
- if (ret < 0) {
- ALOGE(
- "BufferHubQueue::HandleBufferEvent: Failed to remove fd=%d from "
- "epoll set: %s",
- event_fd, strerror(-ret));
- return ErrorStatus(-ret);
- }
- }
+ "event_fd=%d buffer_id=%d",
+ slot, buffers_[slot]->event_fd(), buffers_[slot]->id());
+ return RemoveBuffer(slot);
} else {
ALOGW(
"BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
@@ -284,6 +242,7 @@ Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
}
Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
+ ATRACE_NAME("BufferHubQueue::HandleQueueEvent");
auto status = GetEventMask(poll_event);
if (!status) {
ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
@@ -330,13 +289,16 @@ Status<void> BufferHubQueue::AddBuffer(
return remove_status.error_status();
}
- epoll_event event = {.events = EPOLLIN | EPOLLET,
- .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
- const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buffer->event_fd(), &event);
- if (ret < 0) {
- ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
- strerror(-ret));
- return ErrorStatus(-ret);
+ for (const auto& event_source : buffer->GetEventSources()) {
+ epoll_event event = {.events = event_source.event_mask | EPOLLET,
+ .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
+ const int ret =
+ epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event);
+ if (ret < 0) {
+ ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
+ strerror(-ret));
+ return ErrorStatus(-ret);
+ }
}
buffers_[slot] = buffer;
@@ -348,15 +310,16 @@ Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot);
if (buffers_[slot]) {
- const int ret =
- epoll_fd_.Control(EPOLL_CTL_DEL, buffers_[slot]->event_fd(), nullptr);
- if (ret < 0) {
- ALOGE(
- "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll "
- "set: "
- "%s",
- strerror(-ret));
- return ErrorStatus(-ret);
+ for (const auto& event_source : buffers_[slot]->GetEventSources()) {
+ const int ret =
+ epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr);
+ if (ret < 0) {
+ ALOGE(
+ "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll "
+ "set: %s",
+ strerror(-ret));
+ return ErrorStatus(-ret);
+ }
}
// Trigger OnBufferRemoved callback if registered.
@@ -372,7 +335,7 @@ Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
Status<void> BufferHubQueue::Enqueue(Entry entry) {
if (!is_full()) {
- available_buffers_.Append(std::move(entry));
+ available_buffers_.push(std::move(entry));
// Trigger OnBufferAvailable callback if registered.
if (on_buffer_available_)
@@ -385,25 +348,26 @@ Status<void> BufferHubQueue::Enqueue(Entry entry) {
}
}
-Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(
- int timeout, size_t* slot, void* meta, LocalHandle* fence) {
+Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(int timeout,
+ size_t* slot) {
ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(),
timeout);
- if (!WaitForBuffers(timeout))
- return ErrorStatus(ETIMEDOUT);
+ PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count());
+
+ if (count() == 0) {
+ if (!WaitForBuffers(timeout))
+ return ErrorStatus(ETIMEDOUT);
+ }
- auto& entry = available_buffers_.Front();
+ auto& entry = available_buffers_.top();
+ PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(),
+ entry.slot);
std::shared_ptr<BufferHubBuffer> buffer = std::move(entry.buffer);
*slot = entry.slot;
- *fence = std::move(entry.fence);
- if (meta && entry.metadata) {
- std::copy(entry.metadata.get(), entry.metadata.get() + meta_size_,
- reinterpret_cast<uint8_t*>(meta));
- }
- available_buffers_.PopFront();
+ available_buffers_.pop();
return {std::move(buffer)};
}
@@ -419,7 +383,8 @@ void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) {
pdx::Status<void> BufferHubQueue::FreeAllBuffers() {
// Clear all available buffers.
- available_buffers_.Clear();
+ while (!available_buffers_.empty())
+ available_buffers_.pop();
pdx::Status<void> last_error; // No error.
// Clear all buffers this producer queue is tracking.
@@ -429,7 +394,7 @@ pdx::Status<void> BufferHubQueue::FreeAllBuffers() {
if (!status) {
ALOGE(
"ProducerQueue::FreeAllBuffers: Failed to remove buffer at "
- "slot=%d.",
+ "slot=%zu.",
slot);
last_error = status.error_status();
}
@@ -548,7 +513,7 @@ Status<void> ProducerQueue::AddBuffer(
if (!status)
return status;
- return Enqueue(buffer, slot);
+ return BufferHubQueue::Enqueue({buffer, slot, 0ULL});
}
Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
@@ -565,40 +530,33 @@ Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
int timeout, size_t* slot, LocalHandle* release_fence) {
- if (slot == nullptr || release_fence == nullptr) {
- ALOGE("ProducerQueue::Dequeue: Invalid parameter: slot=%p release_fence=%p",
- slot, release_fence);
- return ErrorStatus(EINVAL);
- }
-
- auto buffer_status =
- BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence);
- if (!buffer_status)
- return buffer_status.error_status();
-
- return {std::static_pointer_cast<BufferProducer>(buffer_status.take())};
+ DvrNativeBufferMetadata canonical_meta;
+ return Dequeue(timeout, slot, &canonical_meta, release_fence);
}
-Status<BufferHubQueue::Entry> ProducerQueue::OnBufferReady(
- const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
- ALOGD_IF(TRACE,
- "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d slot=%zu",
- id(), buffer->id(), slot);
+pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
+ int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
+ pdx::LocalHandle* release_fence) {
+ ATRACE_NAME("ProducerQueue::Dequeue");
+ if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) {
+ ALOGE("ProducerQueue::Dequeue: Invalid parameter.");
+ return ErrorStatus(EINVAL);
+ }
- // Avoid taking a transient reference, buffer is valid for the duration of
- // this method call.
- auto* producer_buffer = static_cast<BufferProducer*>(buffer.get());
- LocalHandle release_fence;
+ auto status = BufferHubQueue::Dequeue(timeout, slot);
+ if (!status)
+ return status.error_status();
- const int ret = producer_buffer->Gain(&release_fence);
- if (ret < 0)
+ auto buffer = std::static_pointer_cast<BufferProducer>(status.take());
+ const int ret = buffer->GainAsync(out_meta, release_fence);
+ if (ret < 0 && ret != -EALREADY)
return ErrorStatus(-ret);
- else
- return {{buffer, nullptr, std::move(release_fence), slot}};
+
+ return {std::move(buffer)};
}
-ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
- : BufferHubQueue(std::move(handle)), ignore_on_import_(ignore_on_import) {
+ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
+ : BufferHubQueue(std::move(handle)) {
auto status = ImportQueue();
if (!status) {
ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
@@ -619,9 +577,17 @@ ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
Status<size_t> ConsumerQueue::ImportBuffers() {
auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
if (!status) {
- ALOGE("ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
+ if (status.error() == EBADR) {
+ ALOGI(
+ "ConsumerQueue::ImportBuffers: Queue is silent, no buffers "
+ "imported.");
+ return {0};
+ } else {
+ ALOGE(
+ "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
status.GetErrorMessage().c_str());
- return status.error_status();
+ return status.error_status();
+ }
}
int ret;
@@ -642,22 +608,6 @@ Status<size_t> ConsumerQueue::ImportBuffers() {
continue;
}
- // Setup ignore state before adding buffer to the queue.
- if (ignore_on_import_) {
- ALOGD_IF(TRACE,
- "ConsumerQueue::ImportBuffers: Setting buffer to ignored state: "
- "buffer_id=%d",
- buffer_consumer->id());
- ret = buffer_consumer->SetIgnore(true);
- if (ret < 0) {
- ALOGE(
- "ConsumerQueue::ImportBuffers: Failed to set ignored state on "
- "imported buffer buffer_id=%d: %s",
- buffer_consumer->id(), strerror(-ret));
- last_error = ErrorStatus(-ret);
- }
- }
-
auto add_status =
AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
if (!add_status) {
@@ -685,7 +635,7 @@ Status<void> ConsumerQueue::AddBuffer(
// Check to see if the buffer is already signaled. This is necessary to catch
// cases where buffers are already available; epoll edge triggered mode does
- // not fire until and edge transition when adding new buffers to the epoll
+ // not fire until an edge transition when adding new buffers to the epoll
// set. Note that we only poll the fd events because HandleBufferEvent() takes
// care of checking the translated buffer events.
auto poll_status = PollEvents(buffer->event_fd(), POLLIN);
@@ -703,51 +653,53 @@ Status<void> ConsumerQueue::AddBuffer(
}
Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
- int timeout, size_t* slot, void* meta, size_t meta_size,
+ int timeout, size_t* slot, void* meta, size_t user_metadata_size,
LocalHandle* acquire_fence) {
- if (meta_size != meta_size_) {
+ if (user_metadata_size != user_metadata_size_) {
ALOGE(
"ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
"does not match metadata size (%zu) for the queue.",
- meta_size, meta_size_);
+ user_metadata_size, user_metadata_size_);
return ErrorStatus(EINVAL);
}
- if (slot == nullptr || acquire_fence == nullptr) {
- ALOGE(
- "ConsumerQueue::Dequeue: Invalid parameter: slot=%p meta=%p "
- "acquire_fence=%p",
- slot, meta, acquire_fence);
- return ErrorStatus(EINVAL);
- }
+ DvrNativeBufferMetadata canonical_meta;
+ auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence);
+ if (!status)
+ return status.error_status();
- auto buffer_status =
- BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence);
- if (!buffer_status)
- return buffer_status.error_status();
+ if (meta && user_metadata_size) {
+ void* metadata_src =
+ reinterpret_cast<void*>(canonical_meta.user_metadata_ptr);
+ if (metadata_src) {
+ memcpy(meta, metadata_src, user_metadata_size);
+ } else {
+ ALOGW("ConsumerQueue::Dequeue: no user-defined metadata.");
+ }
+ }
- return {std::static_pointer_cast<BufferConsumer>(buffer_status.take())};
+ return status;
}
-Status<BufferHubQueue::Entry> ConsumerQueue::OnBufferReady(
- const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
- ALOGD_IF(TRACE,
- "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d slot=%zu",
- id(), buffer->id(), slot);
+Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
+ int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
+ pdx::LocalHandle* acquire_fence) {
+ ATRACE_NAME("ConsumerQueue::Dequeue");
+ if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) {
+ ALOGE("ConsumerQueue::Dequeue: Invalid parameter.");
+ return ErrorStatus(EINVAL);
+ }
- // Avoid taking a transient reference, buffer is valid for the duration of
- // this method call.
- auto* consumer_buffer = static_cast<BufferConsumer*>(buffer.get());
- std::unique_ptr<uint8_t[]> metadata(meta_size_ ? new uint8_t[meta_size_]
- : nullptr);
- LocalHandle acquire_fence;
+ auto status = BufferHubQueue::Dequeue(timeout, slot);
+ if (!status)
+ return status.error_status();
- const int ret =
- consumer_buffer->Acquire(&acquire_fence, metadata.get(), meta_size_);
+ auto buffer = std::static_pointer_cast<BufferConsumer>(status.take());
+ const int ret = buffer->AcquireAsync(out_meta, acquire_fence);
if (ret < 0)
return ErrorStatus(-ret);
- else
- return {{buffer, std::move(metadata), std::move(acquire_fence), slot}};
+
+ return {std::move(buffer)};
}
Status<void> ConsumerQueue::OnBufferAllocated() {