diff options
Diffstat (limited to 'audio/2.0/default/StreamIn.cpp')
-rw-r--r-- | audio/2.0/default/StreamIn.cpp | 114 |
1 files changed, 80 insertions, 34 deletions
diff --git a/audio/2.0/default/StreamIn.cpp b/audio/2.0/default/StreamIn.cpp index ad18986401..1cde4ac1b9 100644 --- a/audio/2.0/default/StreamIn.cpp +++ b/audio/2.0/default/StreamIn.cpp @@ -38,6 +38,7 @@ class ReadThread : public Thread { // ReadThread's lifespan never exceeds StreamIn's lifespan. ReadThread(std::atomic<bool>* stop, audio_stream_in_t* stream, + StreamIn::CommandMQ* commandMQ, StreamIn::DataMQ* dataMQ, StreamIn::StatusMQ* statusMQ, EventFlag* efGroup, @@ -45,6 +46,7 @@ class ReadThread : public Thread { : Thread(false /*canCallJava*/), mStop(stop), mStream(stream), + mCommandMQ(commandMQ), mDataMQ(dataMQ), mStatusMQ(statusMQ), mEfGroup(efGroup), @@ -58,13 +60,19 @@ class ReadThread : public Thread { private: std::atomic<bool>* mStop; audio_stream_in_t* mStream; + StreamIn::CommandMQ* mCommandMQ; StreamIn::DataMQ* mDataMQ; StreamIn::StatusMQ* mStatusMQ; EventFlag* mEfGroup; ThreadPriority mThreadPriority; std::unique_ptr<uint8_t[]> mBuffer; + IStreamIn::ReadParameters mParameters; + IStreamIn::ReadStatus mStatus; bool threadLoop() override; + + void doGetCapturePosition(); + void doRead(); }; status_t ReadThread::readyToRun() { @@ -77,6 +85,32 @@ status_t ReadThread::readyToRun() { return OK; } +void ReadThread::doRead() { + size_t availableToWrite = mDataMQ->availableToWrite(); + size_t requestedToRead = mParameters.params.read; + if (requestedToRead > availableToWrite) { + ALOGW("truncating read data from %d to %d due to insufficient data queue space", + (int32_t)requestedToRead, (int32_t)availableToWrite); + requestedToRead = availableToWrite; + } + ssize_t readResult = mStream->read(mStream, &mBuffer[0], requestedToRead); + mStatus.retval = Result::OK; + uint64_t read = 0; + if (readResult >= 0) { + mStatus.reply.read = readResult; + if (!mDataMQ->write(&mBuffer[0], readResult)) { + ALOGW("data message queue write failed"); + } + } else { + mStatus.retval = Stream::analyzeStatus("read", readResult); + } +} + +void ReadThread::doGetCapturePosition() { + mStatus.retval = StreamIn::getCapturePositionImpl( + mStream, &mStatus.reply.capturePosition.frames, &mStatus.reply.capturePosition.time); +} + bool ReadThread::threadLoop() { // This implementation doesn't return control back to the Thread until it decides to stop, // as the Thread uses mutexes, and this can lead to priority inversion. @@ -87,21 +121,23 @@ bool ReadThread::threadLoop() { if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL))) { continue; // Nothing to do. } - - const size_t availToWrite = mDataMQ->availableToWrite(); - ssize_t readResult = mStream->read(mStream, &mBuffer[0], availToWrite); - Result retval = Result::OK; - uint64_t read = 0; - if (readResult >= 0) { - read = readResult; - if (!mDataMQ->write(&mBuffer[0], readResult)) { - ALOGW("data message queue write failed"); - } - } else { - retval = Stream::analyzeStatus("read", readResult); + if (!mCommandMQ->read(&mParameters)) { + continue; // Nothing to do. } - IStreamIn::ReadStatus status = { retval, read }; - if (!mStatusMQ->write(&status)) { + mStatus.replyTo = mParameters.command; + switch (mParameters.command) { + case IStreamIn::ReadCommand::READ: + doRead(); + break; + case IStreamIn::ReadCommand::GET_CAPTURE_POSITION: + doGetCapturePosition(); + break; + default: + ALOGE("Unknown read thread command code %d", mParameters.command); + mStatus.retval = Result::NOT_SUPPORTED; + break; + } + if (!mStatusMQ->write(&mStatus)) { ALOGW("status message queue write failed"); } mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY)); @@ -275,17 +311,19 @@ Return<void> StreamIn::prepareForReading( if (mDataMQ) { ALOGE("the client attempts to call prepareForReading twice"); _hidl_cb(Result::INVALID_STATE, - DataMQ::Descriptor(), StatusMQ::Descriptor()); + CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } + std::unique_ptr<CommandMQ> tempCommandMQ(new CommandMQ(1)); std::unique_ptr<DataMQ> tempDataMQ( new DataMQ(frameSize * framesCount, true /* EventFlag */)); std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1)); - if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) { + if (!tempCommandMQ->isValid() || !tempDataMQ->isValid() || !tempStatusMQ->isValid()) { + ALOGE_IF(!tempCommandMQ->isValid(), "command MQ is invalid"); ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid"); ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid"); _hidl_cb(Result::INVALID_ARGUMENTS, - DataMQ::Descriptor(), StatusMQ::Descriptor()); + CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } // TODO: Remove event flag management once blocking MQ is implemented. b/33815422 @@ -293,7 +331,7 @@ Return<void> StreamIn::prepareForReading( if (status != OK || !mEfGroup) { ALOGE("failed creating event flag for data MQ: %s", strerror(-status)); _hidl_cb(Result::INVALID_ARGUMENTS, - DataMQ::Descriptor(), StatusMQ::Descriptor()); + CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } @@ -301,6 +339,7 @@ Return<void> StreamIn::prepareForReading( mReadThread = new ReadThread( &mStopReadThread, mStream, + tempCommandMQ.get(), tempDataMQ.get(), tempStatusMQ.get(), mEfGroup, @@ -309,13 +348,14 @@ Return<void> StreamIn::prepareForReading( if (status != OK) { ALOGW("failed to start reader thread: %s", strerror(-status)); _hidl_cb(Result::INVALID_ARGUMENTS, - DataMQ::Descriptor(), StatusMQ::Descriptor()); + CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } + mCommandMQ = std::move(tempCommandMQ); mDataMQ = std::move(tempDataMQ); mStatusMQ = std::move(tempStatusMQ); - _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc()); + _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(), *mStatusMQ->getDesc()); return Void(); } @@ -323,22 +363,28 @@ Return<uint32_t> StreamIn::getInputFramesLost() { return mStream->get_input_frames_lost(mStream); } -Return<void> StreamIn::getCapturePosition(getCapturePosition_cb _hidl_cb) { +// static +Result StreamIn::getCapturePositionImpl( + audio_stream_in_t *stream, uint64_t *frames, uint64_t *time) { Result retval(Result::NOT_SUPPORTED); - uint64_t frames = 0, time = 0; - if (mStream->get_capture_position != NULL) { - int64_t halFrames, halTime; - retval = Stream::analyzeStatus( - "get_capture_position", - mStream->get_capture_position(mStream, &halFrames, &halTime), - // HAL may have a stub function, always returning ENOSYS, don't - // spam the log in this case. - ENOSYS); - if (retval == Result::OK) { - frames = halFrames; - time = halTime; - } + if (stream->get_capture_position != NULL) return retval; + int64_t halFrames, halTime; + retval = Stream::analyzeStatus( + "get_capture_position", + stream->get_capture_position(stream, &halFrames, &halTime), + // HAL may have a stub function, always returning ENOSYS, don't + // spam the log in this case. + ENOSYS); + if (retval == Result::OK) { + *frames = halFrames; + *time = halTime; } + return retval; +}; + +Return<void> StreamIn::getCapturePosition(getCapturePosition_cb _hidl_cb) { + uint64_t frames = 0, time = 0; + Result retval = getCapturePositionImpl(mStream, &frames, &time); _hidl_cb(retval, frames, time); return Void(); } |