diff options
author | Jason Sams <rjsams@android.com> | 2012-02-07 15:32:08 -0800 |
---|---|---|
committer | Jason Sams <rjsams@android.com> | 2012-02-07 15:32:08 -0800 |
commit | 4c2e4c80ce519e09e5b00fd7533e64a834d70639 (patch) | |
tree | ce0180103a149102bd5bfa314be209d053b17d6e /libs/rs/rsThreadIO.cpp | |
parent | d36ad9b1ff99675dd0eca6a3fda1f52353f451a4 (diff) |
Implement RS VSync on new vsync infrastructure.
Change-Id: I662159a086a56e28732dd64a3a3cb30f8d4b72b1
Replace lockless fifo from server to client with sockets.
Change-Id: I99a4ab4f18496c0fbac96ee7b8099797af4712ea
Diffstat (limited to 'libs/rs/rsThreadIO.cpp')
-rw-r--r-- | libs/rs/rsThreadIO.cpp | 264 |
1 files changed, 113 insertions, 151 deletions
diff --git a/libs/rs/rsThreadIO.cpp b/libs/rs/rsThreadIO.cpp index 191777428712..8e4b9883a353 100644 --- a/libs/rs/rsThreadIO.cpp +++ b/libs/rs/rsThreadIO.cpp @@ -18,227 +18,189 @@ #include "rsThreadIO.h" +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> + +#include <fcntl.h> +#include <poll.h> + + using namespace android; using namespace android::renderscript; -ThreadIO::ThreadIO() : mUsingSocket(false) { +ThreadIO::ThreadIO() { + mRunning = true; } ThreadIO::~ThreadIO() { } -void ThreadIO::init(bool useSocket) { - mUsingSocket = useSocket; - mToCore.init(16 * 1024); - - if (mUsingSocket) { - mToClientSocket.init(); - mToCoreSocket.init(); - } else { - mToClient.init(1024); - } +void ThreadIO::init() { + mToClient.init(); + mToCore.init(); } void ThreadIO::shutdown() { - //ALOGE("shutdown 1"); + mRunning = false; mToCore.shutdown(); - //ALOGE("shutdown 2"); -} - -void ThreadIO::coreFlush() { - //ALOGE("coreFlush 1"); - if (mUsingSocket) { - } else { - mToCore.flush(); - } - //ALOGE("coreFlush 2"); } void * ThreadIO::coreHeader(uint32_t cmdID, size_t dataLen) { //ALOGE("coreHeader %i %i", cmdID, dataLen); - if (mUsingSocket) { - CoreCmdHeader hdr; - hdr.bytes = dataLen; - hdr.cmdID = cmdID; - mToCoreSocket.writeAsync(&hdr, sizeof(hdr)); - } else { - mCoreCommandSize = dataLen; - mCoreCommandID = cmdID; - mCoreDataPtr = (uint8_t *)mToCore.reserve(dataLen); - mCoreDataBasePtr = mCoreDataPtr; - } - //ALOGE("coreHeader ret %p", mCoreDataPtr); - return mCoreDataPtr; -} - -void ThreadIO::coreData(const void *data, size_t dataLen) { - //ALOGE("coreData %p %i", data, dataLen); - mToCoreSocket.writeAsync(data, dataLen); - //ALOGE("coreData ret %p", mCoreDataPtr); + CoreCmdHeader *hdr = (CoreCmdHeader *)&mSendBuffer[0]; + hdr->bytes = dataLen; + hdr->cmdID = cmdID; + mSendLen = dataLen + sizeof(CoreCmdHeader); + //mToCoreSocket.writeAsync(&hdr, sizeof(hdr)); + //ALOGE("coreHeader ret "); + return &mSendBuffer[sizeof(CoreCmdHeader)]; } void ThreadIO::coreCommit() { - //ALOGE("coreCommit %p %p %i", mCoreDataPtr, mCoreDataBasePtr, mCoreCommandSize); - if (mUsingSocket) { - } else { - rsAssert((size_t)(mCoreDataPtr - mCoreDataBasePtr) <= mCoreCommandSize); - mToCore.commit(mCoreCommandID, mCoreCommandSize); - } - //ALOGE("coreCommit ret"); -} - -void ThreadIO::coreCommitSync() { - //ALOGE("coreCommitSync %p %p %i", mCoreDataPtr, mCoreDataBasePtr, mCoreCommandSize); - if (mUsingSocket) { - } else { - rsAssert((size_t)(mCoreDataPtr - mCoreDataBasePtr) <= mCoreCommandSize); - mToCore.commitSync(mCoreCommandID, mCoreCommandSize); - } - //ALOGE("coreCommitSync ret"); + mToCore.writeAsync(&mSendBuffer, mSendLen); } void ThreadIO::clientShutdown() { - //ALOGE("coreShutdown 1"); mToClient.shutdown(); - //ALOGE("coreShutdown 2"); } void ThreadIO::coreSetReturn(const void *data, size_t dataLen) { - rsAssert(dataLen <= sizeof(mToCoreRet)); - memcpy(&mToCoreRet, data, dataLen); + uint32_t buf; + if (data == NULL) { + data = &buf; + dataLen = sizeof(buf); + } + + mToCore.readReturn(data, dataLen); } void ThreadIO::coreGetReturn(void *data, size_t dataLen) { - memcpy(data, &mToCoreRet, dataLen); -} + uint32_t buf; + if (data == NULL) { + data = &buf; + dataLen = sizeof(buf); + } -void ThreadIO::setTimoutCallback(void (*cb)(void *), void *dat, uint64_t timeout) { - mToCore.setTimoutCallback(cb, dat, timeout); + mToCore.writeWaitReturn(data, dataLen); } +void ThreadIO::setTimeoutCallback(void (*cb)(void *), void *dat, uint64_t timeout) { + //mToCore.setTimeoutCallback(cb, dat, timeout); +} -bool ThreadIO::playCoreCommands(Context *con, bool waitForCommand, uint64_t timeToWait) { +bool ThreadIO::playCoreCommands(Context *con, bool waitForCommand, int waitFd) { bool ret = false; - uint64_t startTime = con->getTime(); - while (!mToCore.isEmpty() || waitForCommand) { - uint32_t cmdID = 0; - uint32_t cmdSize = 0; - if (con->props.mLogTimes) { - con->timerSet(Context::RS_TIMER_IDLE); - } + uint8_t buf[2 * 1024]; + const CoreCmdHeader *cmd = (const CoreCmdHeader *)&buf[0]; + const void * data = (const void *)&buf[sizeof(CoreCmdHeader)]; + + struct pollfd p[2]; + p[0].fd = mToCore.getReadFd(); + p[0].events = POLLIN; + p[0].revents = 0; + p[1].fd = waitFd; + p[1].events = POLLIN; + p[1].revents = 0; + int pollCount = 1; + if (waitFd >= 0) { + pollCount = 2; + } - uint64_t delay = 0; - if (waitForCommand) { - delay = timeToWait - (con->getTime() - startTime); - if (delay > timeToWait) { - delay = 0; - } - } + if (con->props.mLogTimes) { + con->timerSet(Context::RS_TIMER_IDLE); + } - if (delay == 0 && timeToWait != 0 && mToCore.isEmpty()) { + int waitTime = -1; + while (mRunning) { + int pr = poll(p, pollCount, waitTime); + if (pr <= 0) { break; } - const void * data = mToCore.get(&cmdID, &cmdSize, delay); - if (!cmdSize) { - // exception or timeout occurred. - break; - } - ret = true; - if (con->props.mLogTimes) { - con->timerSet(Context::RS_TIMER_INTERNAL); + if (p[0].revents) { + size_t r = mToCore.read(&buf[0], sizeof(CoreCmdHeader)); + mToCore.read(&buf[sizeof(CoreCmdHeader)], cmd->bytes); + + if (r != sizeof(CoreCmdHeader)) { + // exception or timeout occurred. + break; + } + + ret = true; + if (con->props.mLogTimes) { + con->timerSet(Context::RS_TIMER_INTERNAL); + } + waitForCommand = false; + //ALOGV("playCoreCommands 3 %i %i", cmd->cmdID, cmd->bytes); + + if (cmd->cmdID >= (sizeof(gPlaybackFuncs) / sizeof(void *))) { + rsAssert(cmd->cmdID < (sizeof(gPlaybackFuncs) / sizeof(void *))); + ALOGE("playCoreCommands error con %p, cmd %i", con, cmd->cmdID); + } + gPlaybackFuncs[cmd->cmdID](con, data, cmd->bytes); + + if (con->props.mLogTimes) { + con->timerSet(Context::RS_TIMER_IDLE); + } + + if (waitFd < 0) { + // If we don't have a secondary wait object we should stop blocking now + // that at least one command has been processed. + waitTime = 0; + } } - waitForCommand = false; - //ALOGV("playCoreCommands 3 %i %i", cmdID, cmdSize); - if (cmdID >= (sizeof(gPlaybackFuncs) / sizeof(void *))) { - rsAssert(cmdID < (sizeof(gPlaybackFuncs) / sizeof(void *))); - ALOGE("playCoreCommands error con %p, cmd %i", con, cmdID); - mToCore.printDebugData(); + if (p[1].revents && !p[0].revents) { + // We want to finish processing fifo events before processing the vsync. + // Otherwise we can end up falling behind and having tremendous lag. + break; } - gPlaybackFuncs[cmdID](con, data, cmdSize << 2); - mToCore.next(); } return ret; } RsMessageToClientType ThreadIO::getClientHeader(size_t *receiveLen, uint32_t *usrID) { - if (mUsingSocket) { - mToClientSocket.read(&mLastClientHeader, sizeof(mLastClientHeader)); - } else { - size_t bytesData = 0; - const uint32_t *d = (const uint32_t *)mToClient.get(&mLastClientHeader.cmdID, (uint32_t*)&bytesData); - if (bytesData >= sizeof(uint32_t)) { - mLastClientHeader.userID = d[0]; - mLastClientHeader.bytes = bytesData - sizeof(uint32_t); - } else { - mLastClientHeader.userID = 0; - mLastClientHeader.bytes = 0; - } - } + //ALOGE("getClientHeader"); + mToClient.read(&mLastClientHeader, sizeof(mLastClientHeader)); + receiveLen[0] = mLastClientHeader.bytes; usrID[0] = mLastClientHeader.userID; + //ALOGE("getClientHeader %i %i %i", mLastClientHeader.cmdID, usrID[0], receiveLen[0]); return (RsMessageToClientType)mLastClientHeader.cmdID; } RsMessageToClientType ThreadIO::getClientPayload(void *data, size_t *receiveLen, uint32_t *usrID, size_t bufferLen) { + //ALOGE("getClientPayload"); receiveLen[0] = mLastClientHeader.bytes; usrID[0] = mLastClientHeader.userID; if (bufferLen < mLastClientHeader.bytes) { return RS_MESSAGE_TO_CLIENT_RESIZE; } - if (mUsingSocket) { - if (receiveLen[0]) { - mToClientSocket.read(data, receiveLen[0]); - } - return (RsMessageToClientType)mLastClientHeader.cmdID; - } else { - uint32_t bytesData = 0; - uint32_t commandID = 0; - const uint32_t *d = (const uint32_t *)mToClient.get(&commandID, &bytesData); - //ALOGE("getMessageToClient 3 %i %i", commandID, bytesData); - //ALOGE("getMessageToClient %i %i", commandID, *subID); - if (bufferLen >= receiveLen[0]) { - memcpy(data, d+1, receiveLen[0]); - mToClient.next(); - return (RsMessageToClientType)commandID; - } + if (receiveLen[0]) { + mToClient.read(data, receiveLen[0]); } - return RS_MESSAGE_TO_CLIENT_RESIZE; + //ALOGE("getClientPayload x"); + return (RsMessageToClientType)mLastClientHeader.cmdID; } bool ThreadIO::sendToClient(RsMessageToClientType cmdID, uint32_t usrID, const void *data, size_t dataLen, bool waitForSpace) { + + //ALOGE("sendToClient %i %i %i", cmdID, usrID, (int)dataLen); ClientCmdHeader hdr; hdr.bytes = dataLen; hdr.cmdID = cmdID; hdr.userID = usrID; - if (mUsingSocket) { - mToClientSocket.writeAsync(&hdr, sizeof(hdr)); - if (dataLen) { - mToClientSocket.writeAsync(data, dataLen); - } - return true; - } else { - if (!waitForSpace) { - if (!mToClient.makeSpaceNonBlocking(dataLen + sizeof(hdr))) { - // Not enough room, and not waiting. - return false; - } - } - //ALOGE("sendMessageToClient 2"); - uint32_t *p = (uint32_t *)mToClient.reserve(dataLen + sizeof(usrID)); - p[0] = usrID; - if (dataLen > 0) { - memcpy(p+1, data, dataLen); - } - mToClient.commit(cmdID, dataLen + sizeof(usrID)); - //ALOGE("sendMessageToClient 3"); - return true; + mToClient.writeAsync(&hdr, sizeof(hdr)); + if (dataLen) { + mToClient.writeAsync(data, dataLen); } - return false; + + //ALOGE("sendToClient x"); + return true; } |