summaryrefslogtreecommitdiff
path: root/audio/2.0/default/StreamIn.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'audio/2.0/default/StreamIn.cpp')
-rw-r--r--audio/2.0/default/StreamIn.cpp114
1 files changed, 80 insertions, 34 deletions
diff --git a/audio/2.0/default/StreamIn.cpp b/audio/2.0/default/StreamIn.cpp
index ad18986401..1cde4ac1b9 100644
--- a/audio/2.0/default/StreamIn.cpp
+++ b/audio/2.0/default/StreamIn.cpp
@@ -38,6 +38,7 @@ class ReadThread : public Thread {
// ReadThread's lifespan never exceeds StreamIn's lifespan.
ReadThread(std::atomic<bool>* stop,
audio_stream_in_t* stream,
+ StreamIn::CommandMQ* commandMQ,
StreamIn::DataMQ* dataMQ,
StreamIn::StatusMQ* statusMQ,
EventFlag* efGroup,
@@ -45,6 +46,7 @@ class ReadThread : public Thread {
: Thread(false /*canCallJava*/),
mStop(stop),
mStream(stream),
+ mCommandMQ(commandMQ),
mDataMQ(dataMQ),
mStatusMQ(statusMQ),
mEfGroup(efGroup),
@@ -58,13 +60,19 @@ class ReadThread : public Thread {
private:
std::atomic<bool>* mStop;
audio_stream_in_t* mStream;
+ StreamIn::CommandMQ* mCommandMQ;
StreamIn::DataMQ* mDataMQ;
StreamIn::StatusMQ* mStatusMQ;
EventFlag* mEfGroup;
ThreadPriority mThreadPriority;
std::unique_ptr<uint8_t[]> mBuffer;
+ IStreamIn::ReadParameters mParameters;
+ IStreamIn::ReadStatus mStatus;
bool threadLoop() override;
+
+ void doGetCapturePosition();
+ void doRead();
};
status_t ReadThread::readyToRun() {
@@ -77,6 +85,32 @@ status_t ReadThread::readyToRun() {
return OK;
}
+void ReadThread::doRead() {
+ size_t availableToWrite = mDataMQ->availableToWrite();
+ size_t requestedToRead = mParameters.params.read;
+ if (requestedToRead > availableToWrite) {
+ ALOGW("truncating read data from %d to %d due to insufficient data queue space",
+ (int32_t)requestedToRead, (int32_t)availableToWrite);
+ requestedToRead = availableToWrite;
+ }
+ ssize_t readResult = mStream->read(mStream, &mBuffer[0], requestedToRead);
+ mStatus.retval = Result::OK;
+ uint64_t read = 0;
+ if (readResult >= 0) {
+ mStatus.reply.read = readResult;
+ if (!mDataMQ->write(&mBuffer[0], readResult)) {
+ ALOGW("data message queue write failed");
+ }
+ } else {
+ mStatus.retval = Stream::analyzeStatus("read", readResult);
+ }
+}
+
+void ReadThread::doGetCapturePosition() {
+ mStatus.retval = StreamIn::getCapturePositionImpl(
+ mStream, &mStatus.reply.capturePosition.frames, &mStatus.reply.capturePosition.time);
+}
+
bool ReadThread::threadLoop() {
// This implementation doesn't return control back to the Thread until it decides to stop,
// as the Thread uses mutexes, and this can lead to priority inversion.
@@ -87,21 +121,23 @@ bool ReadThread::threadLoop() {
if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL))) {
continue; // Nothing to do.
}
-
- const size_t availToWrite = mDataMQ->availableToWrite();
- ssize_t readResult = mStream->read(mStream, &mBuffer[0], availToWrite);
- Result retval = Result::OK;
- uint64_t read = 0;
- if (readResult >= 0) {
- read = readResult;
- if (!mDataMQ->write(&mBuffer[0], readResult)) {
- ALOGW("data message queue write failed");
- }
- } else {
- retval = Stream::analyzeStatus("read", readResult);
+ if (!mCommandMQ->read(&mParameters)) {
+ continue; // Nothing to do.
}
- IStreamIn::ReadStatus status = { retval, read };
- if (!mStatusMQ->write(&status)) {
+ mStatus.replyTo = mParameters.command;
+ switch (mParameters.command) {
+ case IStreamIn::ReadCommand::READ:
+ doRead();
+ break;
+ case IStreamIn::ReadCommand::GET_CAPTURE_POSITION:
+ doGetCapturePosition();
+ break;
+ default:
+ ALOGE("Unknown read thread command code %d", mParameters.command);
+ mStatus.retval = Result::NOT_SUPPORTED;
+ break;
+ }
+ if (!mStatusMQ->write(&mStatus)) {
ALOGW("status message queue write failed");
}
mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
@@ -275,17 +311,19 @@ Return<void> StreamIn::prepareForReading(
if (mDataMQ) {
ALOGE("the client attempts to call prepareForReading twice");
_hidl_cb(Result::INVALID_STATE,
- DataMQ::Descriptor(), StatusMQ::Descriptor());
+ CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void();
}
+ std::unique_ptr<CommandMQ> tempCommandMQ(new CommandMQ(1));
std::unique_ptr<DataMQ> tempDataMQ(
new DataMQ(frameSize * framesCount, true /* EventFlag */));
std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1));
- if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
+ if (!tempCommandMQ->isValid() || !tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
+ ALOGE_IF(!tempCommandMQ->isValid(), "command MQ is invalid");
ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid");
ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid");
_hidl_cb(Result::INVALID_ARGUMENTS,
- DataMQ::Descriptor(), StatusMQ::Descriptor());
+ CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void();
}
// TODO: Remove event flag management once blocking MQ is implemented. b/33815422
@@ -293,7 +331,7 @@ Return<void> StreamIn::prepareForReading(
if (status != OK || !mEfGroup) {
ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
_hidl_cb(Result::INVALID_ARGUMENTS,
- DataMQ::Descriptor(), StatusMQ::Descriptor());
+ CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void();
}
@@ -301,6 +339,7 @@ Return<void> StreamIn::prepareForReading(
mReadThread = new ReadThread(
&mStopReadThread,
mStream,
+ tempCommandMQ.get(),
tempDataMQ.get(),
tempStatusMQ.get(),
mEfGroup,
@@ -309,13 +348,14 @@ Return<void> StreamIn::prepareForReading(
if (status != OK) {
ALOGW("failed to start reader thread: %s", strerror(-status));
_hidl_cb(Result::INVALID_ARGUMENTS,
- DataMQ::Descriptor(), StatusMQ::Descriptor());
+ CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void();
}
+ mCommandMQ = std::move(tempCommandMQ);
mDataMQ = std::move(tempDataMQ);
mStatusMQ = std::move(tempStatusMQ);
- _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc());
+ _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(), *mStatusMQ->getDesc());
return Void();
}
@@ -323,22 +363,28 @@ Return<uint32_t> StreamIn::getInputFramesLost() {
return mStream->get_input_frames_lost(mStream);
}
-Return<void> StreamIn::getCapturePosition(getCapturePosition_cb _hidl_cb) {
+// static
+Result StreamIn::getCapturePositionImpl(
+ audio_stream_in_t *stream, uint64_t *frames, uint64_t *time) {
Result retval(Result::NOT_SUPPORTED);
- uint64_t frames = 0, time = 0;
- if (mStream->get_capture_position != NULL) {
- int64_t halFrames, halTime;
- retval = Stream::analyzeStatus(
- "get_capture_position",
- mStream->get_capture_position(mStream, &halFrames, &halTime),
- // HAL may have a stub function, always returning ENOSYS, don't
- // spam the log in this case.
- ENOSYS);
- if (retval == Result::OK) {
- frames = halFrames;
- time = halTime;
- }
+ if (stream->get_capture_position != NULL) return retval;
+ int64_t halFrames, halTime;
+ retval = Stream::analyzeStatus(
+ "get_capture_position",
+ stream->get_capture_position(stream, &halFrames, &halTime),
+ // HAL may have a stub function, always returning ENOSYS, don't
+ // spam the log in this case.
+ ENOSYS);
+ if (retval == Result::OK) {
+ *frames = halFrames;
+ *time = halTime;
}
+ return retval;
+};
+
+Return<void> StreamIn::getCapturePosition(getCapturePosition_cb _hidl_cb) {
+ uint64_t frames = 0, time = 0;
+ Result retval = getCapturePositionImpl(mStream, &frames, &time);
_hidl_cb(retval, frames, time);
return Void();
}