summaryrefslogtreecommitdiff
path: root/cmds/incidentd/src/FdBuffer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cmds/incidentd/src/FdBuffer.cpp')
-rw-r--r--cmds/incidentd/src/FdBuffer.cpp186
1 files changed, 151 insertions, 35 deletions
diff --git a/cmds/incidentd/src/FdBuffer.cpp b/cmds/incidentd/src/FdBuffer.cpp
index 527d7eef3a96..0fff4e6dc4a0 100644
--- a/cmds/incidentd/src/FdBuffer.cpp
+++ b/cmds/incidentd/src/FdBuffer.cpp
@@ -24,16 +24,16 @@
#include <fcntl.h>
#include <poll.h>
#include <unistd.h>
+#include <wait.h>
-const ssize_t BUFFER_SIZE = 16 * 1024;
+const bool DEBUG = false;
+const ssize_t BUFFER_SIZE = 16 * 1024; // 16 KB
const ssize_t MAX_BUFFER_COUNT = 256; // 4 MB max
-
FdBuffer::FdBuffer()
- :mBuffers(),
+ :mBuffer(BUFFER_SIZE),
mStartTime(-1),
mFinishTime(-1),
- mCurrentWritten(-1),
mTimedOut(false),
mTruncated(false)
{
@@ -41,11 +41,6 @@ FdBuffer::FdBuffer()
FdBuffer::~FdBuffer()
{
- const int N = mBuffers.size();
- for (int i=0; i<N; i++) {
- uint8_t* buf = mBuffers[i];
- free(buf);
- }
}
status_t
@@ -59,71 +54,192 @@ FdBuffer::read(int fd, int64_t timeout)
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
- uint8_t* buf = NULL;
while (true) {
- if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) {
- if (mBuffers.size() == MAX_BUFFER_COUNT) {
- mTruncated = true;
- break;
- }
- buf = (uint8_t*)malloc(BUFFER_SIZE);
- if (buf == NULL) {
- return NO_MEMORY;
- }
- mBuffers.push_back(buf);
- mCurrentWritten = 0;
+ if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
+ mTruncated = true;
+ break;
}
+ if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
if (remainingTime <= 0) {
+ if (DEBUG) ALOGD("timed out due to long read");
mTimedOut = true;
break;
}
int count = poll(&pfds, 1, remainingTime);
if (count == 0) {
+ if (DEBUG) ALOGD("timed out due to block calling poll");
mTimedOut = true;
break;
} else if (count < 0) {
+ if (DEBUG) ALOGD("poll failed: %s", strerror(errno));
return -errno;
} else {
if ((pfds.revents & POLLERR) != 0) {
+ if (DEBUG) ALOGD("return event has error %s", strerror(errno));
return errno != 0 ? -errno : UNKNOWN_ERROR;
} else {
- ssize_t amt = ::read(fd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten);
+ ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite());
if (amt < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
} else {
+ if (DEBUG) ALOGD("Fail to read %d: %s", fd, strerror(errno));
return -errno;
}
} else if (amt == 0) {
break;
}
- mCurrentWritten += amt;
+ mBuffer.wp()->move(amt);
}
}
}
-
mFinishTime = uptimeMillis();
return NO_ERROR;
}
-size_t
-FdBuffer::size()
-{
- return ((mBuffers.size() - 1) * BUFFER_SIZE) + mCurrentWritten;
-}
-
status_t
-FdBuffer::write(ReportRequestSet* reporter)
+FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeoutMs, const bool isSysfs)
{
- const int N = mBuffers.size() - 1;
- for (int i=0; i<N; i++) {
- reporter->write(mBuffers[i], BUFFER_SIZE);
+ struct pollfd pfds[] = {
+ { .fd = fd, .events = POLLIN },
+ { .fd = toFd, .events = POLLOUT },
+ { .fd = fromFd, .events = POLLIN },
+ };
+
+ mStartTime = uptimeMillis();
+
+ // mark all fds non blocking
+ fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
+ fcntl(toFd, F_SETFL, fcntl(toFd, F_GETFL, 0) | O_NONBLOCK);
+ fcntl(fromFd, F_SETFL, fcntl(fromFd, F_GETFL, 0) | O_NONBLOCK);
+
+ // A circular buffer holds data read from fd and writes to parsing process
+ uint8_t cirBuf[BUFFER_SIZE];
+ size_t cirSize = 0;
+ int rpos = 0, wpos = 0;
+
+ // This is the buffer used to store processed data
+ while (true) {
+ if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
+ mTruncated = true;
+ break;
+ }
+ if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
+
+ int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
+ if (remainingTime <= 0) {
+ if (DEBUG) ALOGD("timed out due to long read");
+ mTimedOut = true;
+ break;
+ }
+
+ // wait for any pfds to be ready to perform IO
+ int count = poll(pfds, 3, remainingTime);
+ if (count == 0) {
+ if (DEBUG) ALOGD("timed out due to block calling poll");
+ mTimedOut = true;
+ break;
+ } else if (count < 0) {
+ if (DEBUG) ALOGD("Fail to poll: %s", strerror(errno));
+ return -errno;
+ }
+
+ // make sure no errors occur on any fds
+ for (int i = 0; i < 3; ++i) {
+ if ((pfds[i].revents & POLLERR) != 0) {
+ if (i == 0 && isSysfs) {
+ if (DEBUG) ALOGD("fd %d is sysfs, ignore its POLLERR return value", fd);
+ continue;
+ }
+ if (DEBUG) ALOGD("fd[%d]=%d returns error events: %s", i, fd, strerror(errno));
+ return errno != 0 ? -errno : UNKNOWN_ERROR;
+ }
+ }
+
+ // read from fd
+ if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) {
+ ssize_t amt;
+ if (rpos >= wpos) {
+ amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos);
+ } else {
+ amt = ::read(fd, cirBuf + rpos, wpos - rpos);
+ }
+ if (amt < 0) {
+ if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
+ if (DEBUG) ALOGD("Fail to read fd %d: %s", fd, strerror(errno));
+ return -errno;
+ } // otherwise just continue
+ } else if (amt == 0) { // reach EOF so don't have to poll pfds[0].
+ ::close(pfds[0].fd);
+ pfds[0].fd = -1;
+ } else {
+ rpos += amt;
+ cirSize += amt;
+ }
+ }
+
+ // write to parsing process
+ if (cirSize > 0 && pfds[1].fd != -1) {
+ ssize_t amt;
+ if (rpos > wpos) {
+ amt = ::write(toFd, cirBuf + wpos, rpos - wpos);
+ } else {
+ amt = ::write(toFd, cirBuf + wpos, BUFFER_SIZE - wpos);
+ }
+ if (amt < 0) {
+ if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
+ if (DEBUG) ALOGD("Fail to write toFd %d: %s", toFd, strerror(errno));
+ return -errno;
+ } // otherwise just continue
+ } else {
+ wpos += amt;
+ cirSize -= amt;
+ }
+ }
+
+ // if buffer is empty and fd is closed, close write fd.
+ if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) {
+ ::close(pfds[1].fd);
+ pfds[1].fd = -1;
+ }
+
+ // circular buffer, reset rpos and wpos
+ if (rpos >= BUFFER_SIZE) {
+ rpos = 0;
+ }
+ if (wpos >= BUFFER_SIZE) {
+ wpos = 0;
+ }
+
+ // read from parsing process
+ ssize_t amt = ::read(fromFd, mBuffer.writeBuffer(), mBuffer.currentToWrite());
+ if (amt < 0) {
+ if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
+ if (DEBUG) ALOGD("Fail to read fromFd %d: %s", fromFd, strerror(errno));
+ return -errno;
+ } // otherwise just continue
+ } else if (amt == 0) {
+ break;
+ } else {
+ mBuffer.wp()->move(amt);
+ }
}
- reporter->write(mBuffers[N], mCurrentWritten);
+
+ mFinishTime = uptimeMillis();
return NO_ERROR;
}
+size_t
+FdBuffer::size() const
+{
+ return mBuffer.size();
+}
+EncodedBuffer::iterator
+FdBuffer::data() const
+{
+ return mBuffer.begin();
+}