Implement c++ native lib for streaming proto, part 1
Extract protobuf class out and creates EncodedBuffer class
which holds protobuf data.
Next step is to create a ProtoOutputStream and let incident helper
adapt the change as well.
please see frameworks/base/core/java/android/util/proto
Bug: 65641021
Test: unit tested
Change-Id: I0dd343b2e62d60f091c8f857fae3452ec8da6b96
diff --git a/cmds/incidentd/src/FdBuffer.cpp b/cmds/incidentd/src/FdBuffer.cpp
index bb399b5..b7633a4 100644
--- a/cmds/incidentd/src/FdBuffer.cpp
+++ b/cmds/incidentd/src/FdBuffer.cpp
@@ -17,7 +17,6 @@
#define LOG_TAG "incidentd"
#include "FdBuffer.h"
-#include "io_util.h"
#include <cutils/log.h>
#include <utils/SystemClock.h>
@@ -31,10 +30,9 @@
const ssize_t MAX_BUFFER_COUNT = 256; // 4 MB max
FdBuffer::FdBuffer()
- :mBuffers(),
+ :mBuffer(BUFFER_SIZE),
mStartTime(-1),
mFinishTime(-1),
- mCurrentWritten(-1),
mTimedOut(false),
mTruncated(false)
{
@@ -42,11 +40,6 @@
FdBuffer::~FdBuffer()
{
- const int N = mBuffers.size();
- for (int i=0; i<N; i++) {
- uint8_t* buf = mBuffers[i];
- free(buf);
- }
}
status_t
@@ -60,20 +53,12 @@
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
- uint8_t* buf = NULL;
while (true) {
- if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) {
- if (mBuffers.size() == MAX_BUFFER_COUNT) {
- mTruncated = true;
- break;
- }
- buf = (uint8_t*)malloc(BUFFER_SIZE);
- if (buf == NULL) {
- return NO_MEMORY;
- }
- mBuffers.push_back(buf);
- mCurrentWritten = 0;
+ if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
+ mTruncated = true;
+ break;
}
+ if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
if (remainingTime <= 0) {
@@ -91,7 +76,7 @@
if ((pfds.revents & POLLERR) != 0) {
return errno != 0 ? -errno : UNKNOWN_ERROR;
} else {
- ssize_t amt = ::read(fd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten);
+ ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite());
if (amt < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
@@ -101,11 +86,10 @@
} else if (amt == 0) {
break;
}
- mCurrentWritten += amt;
+ mBuffer.wp()->move(amt);
}
}
}
-
mFinishTime = uptimeMillis();
return NO_ERROR;
}
@@ -132,20 +116,12 @@
int rpos = 0, wpos = 0;
// This is the buffer used to store processed data
- uint8_t* buf = NULL;
while (true) {
- if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) {
- if (mBuffers.size() == MAX_BUFFER_COUNT) {
- mTruncated = true;
- break;
- }
- buf = (uint8_t*)malloc(BUFFER_SIZE);
- if (buf == NULL) {
- return NO_MEMORY;
- }
- mBuffers.push_back(buf);
- mCurrentWritten = 0;
+ if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
+ mTruncated = true;
+ break;
}
+ if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
if (remainingTime <= 0) {
@@ -223,7 +199,7 @@
}
// read from parsing process
- ssize_t amt = ::read(fromFd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten);
+ ssize_t amt = ::read(fromFd, mBuffer.writeBuffer(), mBuffer.currentToWrite());
if (amt < 0) {
if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
return -errno;
@@ -231,7 +207,7 @@
} else if (amt == 0) {
break;
} else {
- mCurrentWritten += amt;
+ mBuffer.wp()->move(amt);
}
}
@@ -242,105 +218,11 @@
size_t
FdBuffer::size() const
{
- if (mBuffers.empty()) return 0;
- return ((mBuffers.size() - 1) * BUFFER_SIZE) + mCurrentWritten;
+ return mBuffer.size();
}
-status_t
-FdBuffer::flush(int fd) const
+EncodedBuffer::iterator
+FdBuffer::data() const
{
- size_t i=0;
- status_t err = NO_ERROR;
- for (i=0; i<mBuffers.size()-1; i++) {
- err = write_all(fd, mBuffers[i], BUFFER_SIZE);
- if (err != NO_ERROR) return err;
- }
- return write_all(fd, mBuffers[i], mCurrentWritten);
-}
-
-FdBuffer::iterator
-FdBuffer::begin() const
-{
- return iterator(*this, 0, 0);
-}
-
-FdBuffer::iterator
-FdBuffer::end() const
-{
- if (mBuffers.empty() || mCurrentWritten < 0) return begin();
- if (mCurrentWritten == BUFFER_SIZE)
- // FdBuffer doesn't allocate another buf since no more bytes to read.
- return FdBuffer::iterator(*this, mBuffers.size(), 0);
- return FdBuffer::iterator(*this, mBuffers.size() - 1, mCurrentWritten);
-}
-
-// ===============================================================================
-FdBuffer::iterator::iterator(const FdBuffer& buffer, ssize_t index, ssize_t offset)
- : mFdBuffer(buffer),
- mIndex(index),
- mOffset(offset)
-{
-}
-
-FdBuffer::iterator&
-FdBuffer::iterator::operator=(iterator& other) const { return other; }
-
-FdBuffer::iterator&
-FdBuffer::iterator::operator+(size_t offset)
-{
- size_t newOffset = mOffset + offset;
- while (newOffset >= BUFFER_SIZE) {
- mIndex++;
- newOffset -= BUFFER_SIZE;
- }
- mOffset = newOffset;
- return *this;
-}
-
-FdBuffer::iterator&
-FdBuffer::iterator::operator+=(size_t offset) { return *this + offset; }
-
-FdBuffer::iterator&
-FdBuffer::iterator::operator++() { return *this + 1; }
-
-FdBuffer::iterator
-FdBuffer::iterator::operator++(int) { return *this + 1; }
-
-bool
-FdBuffer::iterator::operator==(iterator other) const
-{
- return mIndex == other.mIndex && mOffset == other.mOffset;
-}
-
-bool
-FdBuffer::iterator::operator!=(iterator other) const { return !(*this == other); }
-
-int
-FdBuffer::iterator::operator-(iterator other) const
-{
- return (int)bytesRead() - (int)other.bytesRead();
-}
-
-FdBuffer::iterator::reference
-FdBuffer::iterator::operator*() const
-{
- return mFdBuffer.mBuffers[mIndex][mOffset];
-}
-
-FdBuffer::iterator
-FdBuffer::iterator::snapshot() const
-{
- return FdBuffer::iterator(mFdBuffer, mIndex, mOffset);
-}
-
-size_t
-FdBuffer::iterator::bytesRead() const
-{
- return mIndex * BUFFER_SIZE + mOffset;
-}
-
-bool
-FdBuffer::iterator::outOfBound() const
-{
- return bytesRead() > mFdBuffer.size();
+ return mBuffer.begin();
}