diff options
Diffstat (limited to 'audio/2.0/default/StreamOut.cpp')
-rw-r--r-- | audio/2.0/default/StreamOut.cpp | 86 |
1 files changed, 63 insertions, 23 deletions
diff --git a/audio/2.0/default/StreamOut.cpp b/audio/2.0/default/StreamOut.cpp index 1948b68e2f..5f187c5ea4 100644 --- a/audio/2.0/default/StreamOut.cpp +++ b/audio/2.0/default/StreamOut.cpp @@ -36,6 +36,7 @@ class WriteThread : public Thread { // WriteThread's lifespan never exceeds StreamOut's lifespan. WriteThread(std::atomic<bool>* stop, audio_stream_out_t* stream, + StreamOut::CommandMQ* commandMQ, StreamOut::DataMQ* dataMQ, StreamOut::StatusMQ* statusMQ, EventFlag* efGroup, @@ -43,6 +44,7 @@ class WriteThread : public Thread { : Thread(false /*canCallJava*/), mStop(stop), mStream(stream), + mCommandMQ(commandMQ), mDataMQ(dataMQ), mStatusMQ(statusMQ), mEfGroup(efGroup), @@ -56,13 +58,19 @@ class WriteThread : public Thread { private: std::atomic<bool>* mStop; audio_stream_out_t* mStream; + StreamOut::CommandMQ* mCommandMQ; StreamOut::DataMQ* mDataMQ; StreamOut::StatusMQ* mStatusMQ; EventFlag* mEfGroup; ThreadPriority mThreadPriority; std::unique_ptr<uint8_t[]> mBuffer; + IStreamOut::WriteStatus mStatus; bool threadLoop() override; + + void doGetLatency(); + void doGetPresentationPosition(); + void doWrite(); }; status_t WriteThread::readyToRun() { @@ -75,6 +83,32 @@ status_t WriteThread::readyToRun() { return OK; } +void WriteThread::doWrite() { + const size_t availToRead = mDataMQ->availableToRead(); + mStatus.retval = Result::OK; + mStatus.reply.written = 0; + if (mDataMQ->read(&mBuffer[0], availToRead)) { + ssize_t writeResult = mStream->write(mStream, &mBuffer[0], availToRead); + if (writeResult >= 0) { + mStatus.reply.written = writeResult; + } else { + mStatus.retval = Stream::analyzeStatus("write", writeResult); + } + } +} + +void WriteThread::doGetPresentationPosition() { + mStatus.retval = StreamOut::getPresentationPositionImpl( + mStream, + &mStatus.reply.presentationPosition.frames, + &mStatus.reply.presentationPosition.timeStamp); +} + +void WriteThread::doGetLatency() { + mStatus.retval = Result::OK; + mStatus.reply.latencyMs = mStream->get_latency(mStream); +} + bool WriteThread::threadLoop() { // This implementation doesn't return control back to the Thread until it decides to stop, // as the Thread uses mutexes, and this can lead to priority inversion. @@ -86,24 +120,26 @@ bool WriteThread::threadLoop() { if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) { continue; // Nothing to do. } - - const size_t availToRead = mDataMQ->availableToRead(); - IStreamOut::WriteStatus status; - status.writeRetval = Result::OK; - status.written = 0; - if (mDataMQ->read(&mBuffer[0], availToRead)) { - ssize_t writeResult = mStream->write(mStream, &mBuffer[0], availToRead); - if (writeResult >= 0) { - status.written = writeResult; - } else { - status.writeRetval = Stream::analyzeStatus("write", writeResult); - } + if (!mCommandMQ->read(&mStatus.replyTo)) { + continue; // Nothing to do. + } + switch (mStatus.replyTo) { + case IStreamOut::WriteCommand::WRITE: + doWrite(); + break; + case IStreamOut::WriteCommand::GET_PRESENTATION_POSITION: + doGetPresentationPosition(); + break; + case IStreamOut::WriteCommand::GET_LATENCY: + doGetLatency(); + break; + default: + ALOGE("Unknown write thread command code %d", mStatus.replyTo); + mStatus.retval = Result::NOT_SUPPORTED; + break; } - status.presentationPositionRetval = status.writeRetval == Result::OK ? - StreamOut::getPresentationPositionImpl(mStream, &status.frames, &status.timeStamp) : - Result::OK; - if (!mStatusMQ->write(&status)) { - ALOGW("status message queue write failed"); + if (!mStatusMQ->write(&mStatus)) { + ALOGE("status message queue write failed"); } mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL)); } @@ -259,17 +295,19 @@ Return<void> StreamOut::prepareForWriting( if (mDataMQ) { ALOGE("the client attempts to call prepareForWriting twice"); _hidl_cb(Result::INVALID_STATE, - DataMQ::Descriptor(), StatusMQ::Descriptor()); + CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } + std::unique_ptr<CommandMQ> tempCommandMQ(new CommandMQ(1)); std::unique_ptr<DataMQ> tempDataMQ( new DataMQ(frameSize * framesCount, true /* EventFlag */)); std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1)); - if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) { + if (!tempCommandMQ->isValid() || !tempDataMQ->isValid() || !tempStatusMQ->isValid()) { + ALOGE_IF(!tempCommandMQ->isValid(), "command MQ is invalid"); ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid"); ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid"); _hidl_cb(Result::INVALID_ARGUMENTS, - DataMQ::Descriptor(), StatusMQ::Descriptor()); + CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } // TODO: Remove event flag management once blocking MQ is implemented. b/33815422 @@ -277,7 +315,7 @@ Return<void> StreamOut::prepareForWriting( if (status != OK || !mEfGroup) { ALOGE("failed creating event flag for data MQ: %s", strerror(-status)); _hidl_cb(Result::INVALID_ARGUMENTS, - DataMQ::Descriptor(), StatusMQ::Descriptor()); + CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } @@ -285,6 +323,7 @@ Return<void> StreamOut::prepareForWriting( mWriteThread = new WriteThread( &mStopWriteThread, mStream, + tempCommandMQ.get(), tempDataMQ.get(), tempStatusMQ.get(), mEfGroup, @@ -293,13 +332,14 @@ Return<void> StreamOut::prepareForWriting( if (status != OK) { ALOGW("failed to start writer thread: %s", strerror(-status)); _hidl_cb(Result::INVALID_ARGUMENTS, - DataMQ::Descriptor(), StatusMQ::Descriptor()); + CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } + mCommandMQ = std::move(tempCommandMQ); mDataMQ = std::move(tempDataMQ); mStatusMQ = std::move(tempStatusMQ); - _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc()); + _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(), *mStatusMQ->getDesc()); return Void(); } |