adb: don't immediately close a socket when write fails.
When we fail to write to a local socket peer, we might still have data
queued up to send to the other side. Defer closing the socket until
we've failed to both read and write.
Bug: http://b/74616284
Test: python test_device.py
Change-Id: Ifc4b8fe95369b4872e475c2ae4ee611dd2d8b9d7
(cherry picked from commit 184f4805476af19c2e58dd96c2ce6aa241efaacf)
diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp
index 44d3276..6b40056 100644
--- a/adb/socket_test.cpp
+++ b/adb/socket_test.cpp
@@ -209,7 +209,6 @@
TerminateThread(thread);
}
-#if 0
// Ensure that if we fail to write output to an fd, we will still flush data coming from it.
TEST_F(LocalSocketTest, flush_after_shutdown) {
int head_fd[2];
@@ -248,7 +247,6 @@
ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
TerminateThread(thread);
}
-#endif
#if defined(__linux__)
diff --git a/adb/sockets.cpp b/adb/sockets.cpp
index 0007fed..e05a3db 100644
--- a/adb/sockets.cpp
+++ b/adb/sockets.cpp
@@ -106,50 +106,131 @@
}
}
+enum class SocketFlushResult {
+ Destroyed,
+ TryAgain,
+ Completed,
+};
+
+static SocketFlushResult local_socket_flush_incoming(asocket* s) {
+ while (!s->packet_queue.empty()) {
+ Range& r = s->packet_queue.front();
+
+ int rc = adb_write(s->fd, r.data(), r.size());
+ if (rc == static_cast<int>(r.size())) {
+ s->packet_queue.pop_front();
+ } else if (rc > 0) {
+ r.drop_front(rc);
+ fdevent_add(&s->fde, FDE_WRITE);
+ return SocketFlushResult::TryAgain;
+ } else if (rc == -1 && errno == EAGAIN) {
+ fdevent_add(&s->fde, FDE_WRITE);
+ return SocketFlushResult::TryAgain;
+ }
+
+ // We failed to write, but it's possible that we can still read from the socket.
+ // Give that a try before giving up.
+ s->has_write_error = true;
+ break;
+ }
+
+ // If we sent the last packet of a closing socket, we can now destroy it.
+ if (s->closing) {
+ s->close(s);
+ return SocketFlushResult::Destroyed;
+ }
+
+ fdevent_del(&s->fde, FDE_WRITE);
+ return SocketFlushResult::Completed;
+}
+
+// Returns false if the socket has been closed and destroyed as a side-effect of this function.
+static bool local_socket_flush_outgoing(asocket* s) {
+ const size_t max_payload = s->get_max_payload();
+ std::string data;
+ data.resize(max_payload);
+ char* x = &data[0];
+ size_t avail = max_payload;
+ int r = 0;
+ int is_eof = 0;
+
+ while (avail > 0) {
+ r = adb_read(s->fd, x, avail);
+ D("LS(%d): post adb_read(fd=%d,...) r=%d (errno=%d) avail=%zu", s->id, s->fd, r,
+ r < 0 ? errno : 0, avail);
+ if (r == -1) {
+ if (errno == EAGAIN) {
+ break;
+ }
+ } else if (r > 0) {
+ avail -= r;
+ x += r;
+ continue;
+ }
+
+ /* r = 0 or unhandled error */
+ is_eof = 1;
+ break;
+ }
+ D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d forced_eof=%d", s->id, s->fd, r, is_eof,
+ s->fde.force_eof);
+
+ if (avail != max_payload && s->peer) {
+ data.resize(max_payload - avail);
+
+ // s->peer->enqueue() may call s->close() and free s,
+ // so save variables for debug printing below.
+ unsigned saved_id = s->id;
+ int saved_fd = s->fd;
+ r = s->peer->enqueue(s->peer, std::move(data));
+ D("LS(%u): fd=%d post peer->enqueue(). r=%d", saved_id, saved_fd, r);
+
+ if (r < 0) {
+ // Error return means they closed us as a side-effect and we must
+ // return immediately.
+ //
+ // Note that if we still have buffered packets, the socket will be
+ // placed on the closing socket list. This handler function will be
+ // called again to process FDE_WRITE events.
+ return false;
+ }
+
+ if (r > 0) {
+ /* if the remote cannot accept further events,
+ ** we disable notification of READs. They'll
+ ** be enabled again when we get a call to ready()
+ */
+ fdevent_del(&s->fde, FDE_READ);
+ }
+ }
+
+ // Don't allow a forced eof if data is still there.
+ if ((s->fde.force_eof && !r) || is_eof) {
+ D(" closing because is_eof=%d r=%d s->fde.force_eof=%d", is_eof, r, s->fde.force_eof);
+ s->close(s);
+ return false;
+ }
+
+ return true;
+}
+
static int local_socket_enqueue(asocket* s, std::string data) {
D("LS(%d): enqueue %zu", s->id, data.size());
Range r(std::move(data));
-
- /* if there is already data queue'd, we will receive
- ** events when it's time to write. just add this to
- ** the tail
- */
- if (!s->packet_queue.empty()) {
- goto enqueue;
- }
-
- /* write as much as we can, until we
- ** would block or there is an error/eof
- */
- while (!r.empty()) {
- int rc = adb_write(s->fd, r.data(), r.size());
- if (rc > 0) {
- r.drop_front(rc);
- continue;
- }
-
- if (rc == 0 || errno != EAGAIN) {
- D("LS(%d): not ready, errno=%d: %s", s->id, errno, strerror(errno));
- s->has_write_error = true;
- s->close(s);
- return 1; /* not ready (error) */
- } else {
- // errno == EAGAIN
- break;
- }
- }
-
- if (r.empty()) {
- return 0; /* ready for more data */
- }
-
-enqueue:
- /* make sure we are notified when we can drain the queue */
s->packet_queue.push_back(std::move(r));
- fdevent_add(&s->fde, FDE_WRITE);
+ switch (local_socket_flush_incoming(s)) {
+ case SocketFlushResult::Destroyed:
+ return -1;
- return 1; /* not ready (backlog) */
+ case SocketFlushResult::TryAgain:
+ return 1;
+
+ case SocketFlushResult::Completed:
+ return 0;
+ }
+
+ return !s->packet_queue.empty();
}
static void local_socket_ready(asocket* s) {
@@ -224,114 +305,21 @@
** in order to simplify the code.
*/
if (ev & FDE_WRITE) {
- while (!s->packet_queue.empty()) {
- Range& r = s->packet_queue.front();
- while (!r.empty()) {
- int rc = adb_write(fd, r.data(), r.size());
- if (rc == -1) {
- /* returning here is ok because FDE_READ will
- ** be processed in the next iteration loop
- */
- if (errno == EAGAIN) {
- return;
- }
- } else if (rc > 0) {
- r.drop_front(rc);
- continue;
- }
-
- D(" closing after write because rc=%d and errno is %d", rc, errno);
- s->has_write_error = true;
- s->close(s);
+ switch (local_socket_flush_incoming(s)) {
+ case SocketFlushResult::Destroyed:
return;
- }
- if (r.empty()) {
- s->packet_queue.pop_front();
- }
+ case SocketFlushResult::TryAgain:
+ break;
+
+ case SocketFlushResult::Completed:
+ s->peer->ready(s->peer);
+ break;
}
-
- /* if we sent the last packet of a closing socket,
- ** we can now destroy it.
- */
- if (s->closing) {
- D(" closing because 'closing' is set after write");
- s->close(s);
- return;
- }
-
- /* no more packets queued, so we can ignore
- ** writable events again and tell our peer
- ** to resume writing
- */
- fdevent_del(&s->fde, FDE_WRITE);
- s->peer->ready(s->peer);
}
if (ev & FDE_READ) {
- const size_t max_payload = s->get_max_payload();
- std::string data;
- data.resize(max_payload);
- char* x = &data[0];
- size_t avail = max_payload;
- int r = 0;
- int is_eof = 0;
-
- while (avail > 0) {
- r = adb_read(fd, x, avail);
- D("LS(%d): post adb_read(fd=%d,...) r=%d (errno=%d) avail=%zu", s->id, s->fd, r,
- r < 0 ? errno : 0, avail);
- if (r == -1) {
- if (errno == EAGAIN) {
- break;
- }
- } else if (r > 0) {
- avail -= r;
- x += r;
- continue;
- }
-
- /* r = 0 or unhandled error */
- is_eof = 1;
- break;
- }
- D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d forced_eof=%d", s->id, s->fd, r, is_eof,
- s->fde.force_eof);
-
- if (avail != max_payload && s->peer) {
- data.resize(max_payload - avail);
-
- // s->peer->enqueue() may call s->close() and free s,
- // so save variables for debug printing below.
- unsigned saved_id = s->id;
- int saved_fd = s->fd;
- r = s->peer->enqueue(s->peer, std::move(data));
- D("LS(%u): fd=%d post peer->enqueue(). r=%d", saved_id, saved_fd, r);
-
- if (r < 0) {
- /* error return means they closed us as a side-effect
- ** and we must return immediately.
- **
- ** note that if we still have buffered packets, the
- ** socket will be placed on the closing socket list.
- ** this handler function will be called again
- ** to process FDE_WRITE events.
- */
- return;
- }
-
- if (r > 0) {
- /* if the remote cannot accept further events,
- ** we disable notification of READs. They'll
- ** be enabled again when we get a call to ready()
- */
- fdevent_del(&s->fde, FDE_READ);
- }
- }
- /* Don't allow a forced eof if data is still there */
- if ((s->fde.force_eof && !r) || is_eof) {
- D(" closing because is_eof=%d r=%d s->fde.force_eof=%d", is_eof, r, s->fde.force_eof);
- s->close(s);
+ if (!local_socket_flush_outgoing(s)) {
return;
}
}