diff options
Diffstat (limited to 'cmds/incidentd/src/FdBuffer.cpp')
-rw-r--r-- | cmds/incidentd/src/FdBuffer.cpp | 182 |
1 files changed, 147 insertions, 35 deletions
diff --git a/cmds/incidentd/src/FdBuffer.cpp b/cmds/incidentd/src/FdBuffer.cpp index 527d7eef3a96..30dd339a629b 100644 --- a/cmds/incidentd/src/FdBuffer.cpp +++ b/cmds/incidentd/src/FdBuffer.cpp @@ -24,16 +24,16 @@ #include <fcntl.h> #include <poll.h> #include <unistd.h> +#include <wait.h> -const ssize_t BUFFER_SIZE = 16 * 1024; +const bool DEBUG = false; +const ssize_t BUFFER_SIZE = 16 * 1024; // 16 KB const ssize_t MAX_BUFFER_COUNT = 256; // 4 MB max - FdBuffer::FdBuffer() - :mBuffers(), + :mBuffer(BUFFER_SIZE), mStartTime(-1), mFinishTime(-1), - mCurrentWritten(-1), mTimedOut(false), mTruncated(false) { @@ -41,11 +41,6 @@ FdBuffer::FdBuffer() FdBuffer::~FdBuffer() { - const int N = mBuffers.size(); - for (int i=0; i<N; i++) { - uint8_t* buf = mBuffers[i]; - free(buf); - } } status_t @@ -59,20 +54,12 @@ FdBuffer::read(int fd, int64_t timeout) fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); - uint8_t* buf = NULL; while (true) { - if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) { - if (mBuffers.size() == MAX_BUFFER_COUNT) { - mTruncated = true; - break; - } - buf = (uint8_t*)malloc(BUFFER_SIZE); - if (buf == NULL) { - return NO_MEMORY; - } - mBuffers.push_back(buf); - mCurrentWritten = 0; + if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { + mTruncated = true; + break; } + if (mBuffer.writeBuffer() == NULL) return NO_MEMORY; int64_t remainingTime = (mStartTime + timeout) - uptimeMillis(); if (remainingTime <= 0) { @@ -85,45 +72,170 @@ FdBuffer::read(int fd, int64_t timeout) mTimedOut = true; break; } else if (count < 0) { + if (DEBUG) ALOGD("poll failed: %s", strerror(errno)); return -errno; } else { if ((pfds.revents & POLLERR) != 0) { + if (DEBUG) ALOGD("return event has error %s", strerror(errno)); return errno != 0 ? -errno : UNKNOWN_ERROR; } else { - ssize_t amt = ::read(fd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten); + ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite()); if (amt < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { continue; } else { + if (DEBUG) ALOGD("Fail to read %d: %s", fd, strerror(errno)); return -errno; } } else if (amt == 0) { break; } - mCurrentWritten += amt; + mBuffer.wp()->move(amt); } } } - mFinishTime = uptimeMillis(); return NO_ERROR; } -size_t -FdBuffer::size() -{ - return ((mBuffers.size() - 1) * BUFFER_SIZE) + mCurrentWritten; -} - status_t -FdBuffer::write(ReportRequestSet* reporter) +FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeoutMs, const bool isSysfs) { - const int N = mBuffers.size() - 1; - for (int i=0; i<N; i++) { - reporter->write(mBuffers[i], BUFFER_SIZE); + struct pollfd pfds[] = { + { .fd = fd, .events = POLLIN }, + { .fd = toFd, .events = POLLOUT }, + { .fd = fromFd, .events = POLLIN }, + }; + + mStartTime = uptimeMillis(); + + // mark all fds non blocking + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); + fcntl(toFd, F_SETFL, fcntl(toFd, F_GETFL, 0) | O_NONBLOCK); + fcntl(fromFd, F_SETFL, fcntl(fromFd, F_GETFL, 0) | O_NONBLOCK); + + // A circular buffer holds data read from fd and writes to parsing process + uint8_t cirBuf[BUFFER_SIZE]; + size_t cirSize = 0; + int rpos = 0, wpos = 0; + + // This is the buffer used to store processed data + while (true) { + if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { + mTruncated = true; + break; + } + if (mBuffer.writeBuffer() == NULL) return NO_MEMORY; + + int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis(); + if (remainingTime <= 0) { + mTimedOut = true; + break; + } + + // wait for any pfds to be ready to perform IO + int count = poll(pfds, 3, remainingTime); + if (count == 0) { + mTimedOut = true; + break; + } else if (count < 0) { + if (DEBUG) ALOGD("Fail to poll: %s", strerror(errno)); + return -errno; + } + + // make sure no errors occur on any fds + for (int i = 0; i < 3; ++i) { + if ((pfds[i].revents & POLLERR) != 0) { + if (i == 0 && isSysfs) { + if (DEBUG) ALOGD("fd %d is sysfs, ignore its POLLERR return value", fd); + continue; + } + if (DEBUG) ALOGD("fd[%d]=%d returns error events: %s", i, fd, strerror(errno)); + return errno != 0 ? -errno : UNKNOWN_ERROR; + } + } + + // read from fd + if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) { + ssize_t amt; + if (rpos >= wpos) { + amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos); + } else { + amt = ::read(fd, cirBuf + rpos, wpos - rpos); + } + if (amt < 0) { + if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { + if (DEBUG) ALOGD("Fail to read fd %d: %s", fd, strerror(errno)); + return -errno; + } // otherwise just continue + } else if (amt == 0) { // reach EOF so don't have to poll pfds[0]. + ::close(pfds[0].fd); + pfds[0].fd = -1; + } else { + rpos += amt; + cirSize += amt; + } + } + + // write to parsing process + if (cirSize > 0 && pfds[1].fd != -1) { + ssize_t amt; + if (rpos > wpos) { + amt = ::write(toFd, cirBuf + wpos, rpos - wpos); + } else { + amt = ::write(toFd, cirBuf + wpos, BUFFER_SIZE - wpos); + } + if (amt < 0) { + if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { + if (DEBUG) ALOGD("Fail to write toFd %d: %s", toFd, strerror(errno)); + return -errno; + } // otherwise just continue + } else { + wpos += amt; + cirSize -= amt; + } + } + + // if buffer is empty and fd is closed, close write fd. + if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) { + ::close(pfds[1].fd); + pfds[1].fd = -1; + } + + // circular buffer, reset rpos and wpos + if (rpos >= BUFFER_SIZE) { + rpos = 0; + } + if (wpos >= BUFFER_SIZE) { + wpos = 0; + } + + // read from parsing process + ssize_t amt = ::read(fromFd, mBuffer.writeBuffer(), mBuffer.currentToWrite()); + if (amt < 0) { + if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { + if (DEBUG) ALOGD("Fail to read fromFd %d: %s", fromFd, strerror(errno)); + return -errno; + } // otherwise just continue + } else if (amt == 0) { + break; + } else { + mBuffer.wp()->move(amt); + } } - reporter->write(mBuffers[N], mCurrentWritten); + + mFinishTime = uptimeMillis(); return NO_ERROR; } +size_t +FdBuffer::size() const +{ + return mBuffer.size(); +} +EncodedBuffer::iterator +FdBuffer::data() const +{ + return mBuffer.begin(); +} |