blob: bb399b57b8cdb00d68be12585941bde19eecad09 [file] [log] [blame]
Joe Onorato1754d742016-11-21 17:51:35 -08001/*
2 * Copyright (C) 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "incidentd"
18
19#include "FdBuffer.h"
Yi Jin99c248f2017-08-25 18:11:58 -070020#include "io_util.h"
Joe Onorato1754d742016-11-21 17:51:35 -080021
22#include <cutils/log.h>
23#include <utils/SystemClock.h>
24
25#include <fcntl.h>
26#include <poll.h>
27#include <unistd.h>
Yi Jin0a3406f2017-06-22 19:23:11 -070028#include <wait.h>
Joe Onorato1754d742016-11-21 17:51:35 -080029
Yi Jin0a3406f2017-06-22 19:23:11 -070030const ssize_t BUFFER_SIZE = 16 * 1024; // 16 KB
Joe Onorato1754d742016-11-21 17:51:35 -080031const ssize_t MAX_BUFFER_COUNT = 256; // 4 MB max
32
Joe Onorato1754d742016-11-21 17:51:35 -080033FdBuffer::FdBuffer()
34 :mBuffers(),
35 mStartTime(-1),
36 mFinishTime(-1),
37 mCurrentWritten(-1),
38 mTimedOut(false),
39 mTruncated(false)
40{
41}
42
43FdBuffer::~FdBuffer()
44{
45 const int N = mBuffers.size();
46 for (int i=0; i<N; i++) {
47 uint8_t* buf = mBuffers[i];
48 free(buf);
49 }
50}
51
52status_t
53FdBuffer::read(int fd, int64_t timeout)
54{
55 struct pollfd pfds = {
56 .fd = fd,
57 .events = POLLIN
58 };
59 mStartTime = uptimeMillis();
60
61 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
62
63 uint8_t* buf = NULL;
64 while (true) {
65 if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) {
66 if (mBuffers.size() == MAX_BUFFER_COUNT) {
67 mTruncated = true;
68 break;
69 }
70 buf = (uint8_t*)malloc(BUFFER_SIZE);
71 if (buf == NULL) {
72 return NO_MEMORY;
73 }
74 mBuffers.push_back(buf);
75 mCurrentWritten = 0;
76 }
77
78 int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
79 if (remainingTime <= 0) {
80 mTimedOut = true;
81 break;
82 }
83
84 int count = poll(&pfds, 1, remainingTime);
85 if (count == 0) {
86 mTimedOut = true;
87 break;
88 } else if (count < 0) {
89 return -errno;
90 } else {
91 if ((pfds.revents & POLLERR) != 0) {
92 return errno != 0 ? -errno : UNKNOWN_ERROR;
93 } else {
94 ssize_t amt = ::read(fd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten);
95 if (amt < 0) {
96 if (errno == EAGAIN || errno == EWOULDBLOCK) {
97 continue;
98 } else {
99 return -errno;
100 }
101 } else if (amt == 0) {
102 break;
103 }
104 mCurrentWritten += amt;
105 }
106 }
107 }
108
109 mFinishTime = uptimeMillis();
110 return NO_ERROR;
111}
112
Yi Jin0a3406f2017-06-22 19:23:11 -0700113status_t
114FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeoutMs)
115{
116 struct pollfd pfds[] = {
117 { .fd = fd, .events = POLLIN },
118 { .fd = toFd, .events = POLLOUT },
119 { .fd = fromFd, .events = POLLIN },
120 };
121
122 mStartTime = uptimeMillis();
123
124 // mark all fds non blocking
125 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
126 fcntl(toFd, F_SETFL, fcntl(toFd, F_GETFL, 0) | O_NONBLOCK);
127 fcntl(fromFd, F_SETFL, fcntl(fromFd, F_GETFL, 0) | O_NONBLOCK);
128
129 // A circular buffer holds data read from fd and writes to parsing process
130 uint8_t cirBuf[BUFFER_SIZE];
131 size_t cirSize = 0;
132 int rpos = 0, wpos = 0;
133
134 // This is the buffer used to store processed data
135 uint8_t* buf = NULL;
136 while (true) {
137 if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) {
138 if (mBuffers.size() == MAX_BUFFER_COUNT) {
139 mTruncated = true;
140 break;
141 }
142 buf = (uint8_t*)malloc(BUFFER_SIZE);
143 if (buf == NULL) {
144 return NO_MEMORY;
145 }
146 mBuffers.push_back(buf);
147 mCurrentWritten = 0;
148 }
149
150 int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
151 if (remainingTime <= 0) {
152 mTimedOut = true;
153 break;
154 }
155
156 // wait for any pfds to be ready to perform IO
157 int count = poll(pfds, 3, remainingTime);
158 if (count == 0) {
159 mTimedOut = true;
160 break;
161 } else if (count < 0) {
162 return -errno;
163 }
164
165 // make sure no errors occur on any fds
166 for (int i = 0; i < 3; ++i) {
167 if ((pfds[i].revents & POLLERR) != 0) {
168 return errno != 0 ? -errno : UNKNOWN_ERROR;
169 }
170 }
171
172 // read from fd
173 if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) {
174 ssize_t amt;
175 if (rpos >= wpos) {
176 amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos);
177 } else {
Yi Jin0ed9b682017-08-18 14:51:20 -0700178 amt = ::read(fd, cirBuf + rpos, wpos - rpos);
Yi Jin0a3406f2017-06-22 19:23:11 -0700179 }
180 if (amt < 0) {
181 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
182 return -errno;
183 } // otherwise just continue
184 } else if (amt == 0) { // reach EOF so don't have to poll pfds[0].
185 ::close(pfds[0].fd);
186 pfds[0].fd = -1;
187 } else {
188 rpos += amt;
189 cirSize += amt;
190 }
191 }
192
193 // write to parsing process
194 if (cirSize > 0 && pfds[1].fd != -1) {
195 ssize_t amt;
196 if (rpos > wpos) {
197 amt = ::write(toFd, cirBuf + wpos, rpos - wpos);
198 } else {
199 amt = ::write(toFd, cirBuf + wpos, BUFFER_SIZE - wpos);
200 }
201 if (amt < 0) {
202 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
203 return -errno;
204 } // otherwise just continue
205 } else {
206 wpos += amt;
207 cirSize -= amt;
208 }
209 }
210
211 // if buffer is empty and fd is closed, close write fd.
212 if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) {
213 ::close(pfds[1].fd);
214 pfds[1].fd = -1;
215 }
216
217 // circular buffer, reset rpos and wpos
218 if (rpos >= BUFFER_SIZE) {
219 rpos = 0;
220 }
221 if (wpos >= BUFFER_SIZE) {
222 wpos = 0;
223 }
224
225 // read from parsing process
226 ssize_t amt = ::read(fromFd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten);
227 if (amt < 0) {
228 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
229 return -errno;
230 } // otherwise just continue
231 } else if (amt == 0) {
232 break;
233 } else {
234 mCurrentWritten += amt;
235 }
236 }
237
238 mFinishTime = uptimeMillis();
239 return NO_ERROR;
240}
241
Joe Onorato1754d742016-11-21 17:51:35 -0800242size_t
Yi Jin99c248f2017-08-25 18:11:58 -0700243FdBuffer::size() const
Joe Onorato1754d742016-11-21 17:51:35 -0800244{
Yi Jin0ed9b682017-08-18 14:51:20 -0700245 if (mBuffers.empty()) return 0;
Joe Onorato1754d742016-11-21 17:51:35 -0800246 return ((mBuffers.size() - 1) * BUFFER_SIZE) + mCurrentWritten;
247}
248
249status_t
Yi Jin99c248f2017-08-25 18:11:58 -0700250FdBuffer::flush(int fd) const
Joe Onorato1754d742016-11-21 17:51:35 -0800251{
Yi Jin99c248f2017-08-25 18:11:58 -0700252 size_t i=0;
253 status_t err = NO_ERROR;
254 for (i=0; i<mBuffers.size()-1; i++) {
255 err = write_all(fd, mBuffers[i], BUFFER_SIZE);
256 if (err != NO_ERROR) return err;
Joe Onorato1754d742016-11-21 17:51:35 -0800257 }
Yi Jin99c248f2017-08-25 18:11:58 -0700258 return write_all(fd, mBuffers[i], mCurrentWritten);
Joe Onorato1754d742016-11-21 17:51:35 -0800259}
260
Yi Jin0ed9b682017-08-18 14:51:20 -0700261FdBuffer::iterator
Yi Jin0f047162017-09-05 13:44:22 -0700262FdBuffer::begin() const
263{
264 return iterator(*this, 0, 0);
265}
266
267FdBuffer::iterator
Yi Jin99c248f2017-08-25 18:11:58 -0700268FdBuffer::end() const
Yi Jin0ed9b682017-08-18 14:51:20 -0700269{
270 if (mBuffers.empty() || mCurrentWritten < 0) return begin();
271 if (mCurrentWritten == BUFFER_SIZE)
272 // FdBuffer doesn't allocate another buf since no more bytes to read.
273 return FdBuffer::iterator(*this, mBuffers.size(), 0);
274 return FdBuffer::iterator(*this, mBuffers.size() - 1, mCurrentWritten);
275}
Joe Onorato1754d742016-11-21 17:51:35 -0800276
Yi Jin0f047162017-09-05 13:44:22 -0700277// ===============================================================================
278FdBuffer::iterator::iterator(const FdBuffer& buffer, ssize_t index, ssize_t offset)
279 : mFdBuffer(buffer),
280 mIndex(index),
281 mOffset(offset)
282{
283}
284
285FdBuffer::iterator&
286FdBuffer::iterator::operator=(iterator& other) const { return other; }
287
Yi Jin0ed9b682017-08-18 14:51:20 -0700288FdBuffer::iterator&
289FdBuffer::iterator::operator+(size_t offset)
290{
291 size_t newOffset = mOffset + offset;
292 while (newOffset >= BUFFER_SIZE) {
293 mIndex++;
294 newOffset -= BUFFER_SIZE;
295 }
296 mOffset = newOffset;
297 return *this;
298}
299
Yi Jin0f047162017-09-05 13:44:22 -0700300FdBuffer::iterator&
301FdBuffer::iterator::operator+=(size_t offset) { return *this + offset; }
302
303FdBuffer::iterator&
304FdBuffer::iterator::operator++() { return *this + 1; }
305
306FdBuffer::iterator
307FdBuffer::iterator::operator++(int) { return *this + 1; }
308
309bool
310FdBuffer::iterator::operator==(iterator other) const
311{
312 return mIndex == other.mIndex && mOffset == other.mOffset;
313}
314
315bool
316FdBuffer::iterator::operator!=(iterator other) const { return !(*this == other); }
317
318int
319FdBuffer::iterator::operator-(iterator other) const
320{
321 return (int)bytesRead() - (int)other.bytesRead();
322}
323
324FdBuffer::iterator::reference
325FdBuffer::iterator::operator*() const
326{
327 return mFdBuffer.mBuffers[mIndex][mOffset];
328}
329
330FdBuffer::iterator
331FdBuffer::iterator::snapshot() const
332{
333 return FdBuffer::iterator(mFdBuffer, mIndex, mOffset);
334}
335
Yi Jin0ed9b682017-08-18 14:51:20 -0700336size_t
Yi Jin99c248f2017-08-25 18:11:58 -0700337FdBuffer::iterator::bytesRead() const
Yi Jin0ed9b682017-08-18 14:51:20 -0700338{
339 return mIndex * BUFFER_SIZE + mOffset;
340}
Yi Jin0f047162017-09-05 13:44:22 -0700341
342bool
343FdBuffer::iterator::outOfBound() const
344{
345 return bytesRead() > mFdBuffer.size();
346}