adb: don't immediately close a socket when write fails.

When we fail to write to a local socket peer, we might still have data
queued up to send to the other side. Defer closing the socket until
we've failed to both read and write.

Bug: http://b/74616284
Test: python test_device.py
Change-Id: Ifc4b8fe95369b4872e475c2ae4ee611dd2d8b9d7
diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp
index 44d3276..6b40056 100644
--- a/adb/socket_test.cpp
+++ b/adb/socket_test.cpp
@@ -209,7 +209,6 @@
     TerminateThread(thread);
 }
 
-#if 0
 // Ensure that if we fail to write output to an fd, we will still flush data coming from it.
 TEST_F(LocalSocketTest, flush_after_shutdown) {
     int head_fd[2];
@@ -248,7 +247,6 @@
     ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
     TerminateThread(thread);
 }
-#endif
 
 #if defined(__linux__)
 
diff --git a/adb/sockets.cpp b/adb/sockets.cpp
index 0007fed..e05a3db 100644
--- a/adb/sockets.cpp
+++ b/adb/sockets.cpp
@@ -106,50 +106,131 @@
     }
 }
 
+enum class SocketFlushResult {
+    Destroyed,
+    TryAgain,
+    Completed,
+};
+
+static SocketFlushResult local_socket_flush_incoming(asocket* s) {
+    while (!s->packet_queue.empty()) {
+        Range& r = s->packet_queue.front();
+
+        int rc = adb_write(s->fd, r.data(), r.size());
+        if (rc == static_cast<int>(r.size())) {
+            s->packet_queue.pop_front();
+        } else if (rc > 0) {
+            r.drop_front(rc);
+            fdevent_add(&s->fde, FDE_WRITE);
+            return SocketFlushResult::TryAgain;
+        } else if (rc == -1 && errno == EAGAIN) {
+            fdevent_add(&s->fde, FDE_WRITE);
+            return SocketFlushResult::TryAgain;
+        }
+
+        // We failed to write, but it's possible that we can still read from the socket.
+        // Give that a try before giving up.
+        s->has_write_error = true;
+        break;
+    }
+
+    // If we sent the last packet of a closing socket, we can now destroy it.
+    if (s->closing) {
+        s->close(s);
+        return SocketFlushResult::Destroyed;
+    }
+
+    fdevent_del(&s->fde, FDE_WRITE);
+    return SocketFlushResult::Completed;
+}
+
+// Returns false if the socket has been closed and destroyed as a side-effect of this function.
+static bool local_socket_flush_outgoing(asocket* s) {
+    const size_t max_payload = s->get_max_payload();
+    std::string data;
+    data.resize(max_payload);
+    char* x = &data[0];
+    size_t avail = max_payload;
+    int r = 0;
+    int is_eof = 0;
+
+    while (avail > 0) {
+        r = adb_read(s->fd, x, avail);
+        D("LS(%d): post adb_read(fd=%d,...) r=%d (errno=%d) avail=%zu", s->id, s->fd, r,
+          r < 0 ? errno : 0, avail);
+        if (r == -1) {
+            if (errno == EAGAIN) {
+                break;
+            }
+        } else if (r > 0) {
+            avail -= r;
+            x += r;
+            continue;
+        }
+
+        /* r = 0 or unhandled error */
+        is_eof = 1;
+        break;
+    }
+    D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d forced_eof=%d", s->id, s->fd, r, is_eof,
+      s->fde.force_eof);
+
+    if (avail != max_payload && s->peer) {
+        data.resize(max_payload - avail);
+
+        // s->peer->enqueue() may call s->close() and free s,
+        // so save variables for debug printing below.
+        unsigned saved_id = s->id;
+        int saved_fd = s->fd;
+        r = s->peer->enqueue(s->peer, std::move(data));
+        D("LS(%u): fd=%d post peer->enqueue(). r=%d", saved_id, saved_fd, r);
+
+        if (r < 0) {
+            // Error return means they closed us as a side-effect and we must
+            // return immediately.
+            //
+            // Note that if we still have buffered packets, the socket will be
+            // placed on the closing socket list. This handler function will be
+            // called again to process FDE_WRITE events.
+            return false;
+        }
+
+        if (r > 0) {
+            /* if the remote cannot accept further events,
+            ** we disable notification of READs.  They'll
+            ** be enabled again when we get a call to ready()
+            */
+            fdevent_del(&s->fde, FDE_READ);
+        }
+    }
+
+    // Don't allow a forced eof if data is still there.
+    if ((s->fde.force_eof && !r) || is_eof) {
+        D(" closing because is_eof=%d r=%d s->fde.force_eof=%d", is_eof, r, s->fde.force_eof);
+        s->close(s);
+        return false;
+    }
+
+    return true;
+}
+
 static int local_socket_enqueue(asocket* s, std::string data) {
     D("LS(%d): enqueue %zu", s->id, data.size());
 
     Range r(std::move(data));
-
-    /* if there is already data queue'd, we will receive
-    ** events when it's time to write.  just add this to
-    ** the tail
-    */
-    if (!s->packet_queue.empty()) {
-        goto enqueue;
-    }
-
-    /* write as much as we can, until we
-    ** would block or there is an error/eof
-    */
-    while (!r.empty()) {
-        int rc = adb_write(s->fd, r.data(), r.size());
-        if (rc > 0) {
-            r.drop_front(rc);
-            continue;
-        }
-
-        if (rc == 0 || errno != EAGAIN) {
-            D("LS(%d): not ready, errno=%d: %s", s->id, errno, strerror(errno));
-            s->has_write_error = true;
-            s->close(s);
-            return 1; /* not ready (error) */
-        } else {
-            // errno == EAGAIN
-            break;
-        }
-    }
-
-    if (r.empty()) {
-        return 0; /* ready for more data */
-    }
-
-enqueue:
-    /* make sure we are notified when we can drain the queue */
     s->packet_queue.push_back(std::move(r));
-    fdevent_add(&s->fde, FDE_WRITE);
+    switch (local_socket_flush_incoming(s)) {
+        case SocketFlushResult::Destroyed:
+            return -1;
 
-    return 1; /* not ready (backlog) */
+        case SocketFlushResult::TryAgain:
+            return 1;
+
+        case SocketFlushResult::Completed:
+            return 0;
+    }
+
+    return !s->packet_queue.empty();
 }
 
 static void local_socket_ready(asocket* s) {
@@ -224,114 +305,21 @@
     ** in order to simplify the code.
     */
     if (ev & FDE_WRITE) {
-        while (!s->packet_queue.empty()) {
-            Range& r = s->packet_queue.front();
-            while (!r.empty()) {
-                int rc = adb_write(fd, r.data(), r.size());
-                if (rc == -1) {
-                    /* returning here is ok because FDE_READ will
-                    ** be processed in the next iteration loop
-                    */
-                    if (errno == EAGAIN) {
-                        return;
-                    }
-                } else if (rc > 0) {
-                    r.drop_front(rc);
-                    continue;
-                }
-
-                D(" closing after write because rc=%d and errno is %d", rc, errno);
-                s->has_write_error = true;
-                s->close(s);
+        switch (local_socket_flush_incoming(s)) {
+            case SocketFlushResult::Destroyed:
                 return;
-            }
 
-            if (r.empty()) {
-                s->packet_queue.pop_front();
-            }
+            case SocketFlushResult::TryAgain:
+                break;
+
+            case SocketFlushResult::Completed:
+                s->peer->ready(s->peer);
+                break;
         }
-
-        /* if we sent the last packet of a closing socket,
-        ** we can now destroy it.
-        */
-        if (s->closing) {
-            D(" closing because 'closing' is set after write");
-            s->close(s);
-            return;
-        }
-
-        /* no more packets queued, so we can ignore
-        ** writable events again and tell our peer
-        ** to resume writing
-        */
-        fdevent_del(&s->fde, FDE_WRITE);
-        s->peer->ready(s->peer);
     }
 
     if (ev & FDE_READ) {
-        const size_t max_payload = s->get_max_payload();
-        std::string data;
-        data.resize(max_payload);
-        char* x = &data[0];
-        size_t avail = max_payload;
-        int r = 0;
-        int is_eof = 0;
-
-        while (avail > 0) {
-            r = adb_read(fd, x, avail);
-            D("LS(%d): post adb_read(fd=%d,...) r=%d (errno=%d) avail=%zu", s->id, s->fd, r,
-              r < 0 ? errno : 0, avail);
-            if (r == -1) {
-                if (errno == EAGAIN) {
-                    break;
-                }
-            } else if (r > 0) {
-                avail -= r;
-                x += r;
-                continue;
-            }
-
-            /* r = 0 or unhandled error */
-            is_eof = 1;
-            break;
-        }
-        D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d forced_eof=%d", s->id, s->fd, r, is_eof,
-          s->fde.force_eof);
-
-        if (avail != max_payload && s->peer) {
-            data.resize(max_payload - avail);
-
-            // s->peer->enqueue() may call s->close() and free s,
-            // so save variables for debug printing below.
-            unsigned saved_id = s->id;
-            int saved_fd = s->fd;
-            r = s->peer->enqueue(s->peer, std::move(data));
-            D("LS(%u): fd=%d post peer->enqueue(). r=%d", saved_id, saved_fd, r);
-
-            if (r < 0) {
-                /* error return means they closed us as a side-effect
-                ** and we must return immediately.
-                **
-                ** note that if we still have buffered packets, the
-                ** socket will be placed on the closing socket list.
-                ** this handler function will be called again
-                ** to process FDE_WRITE events.
-                */
-                return;
-            }
-
-            if (r > 0) {
-                /* if the remote cannot accept further events,
-                ** we disable notification of READs.  They'll
-                ** be enabled again when we get a call to ready()
-                */
-                fdevent_del(&s->fde, FDE_READ);
-            }
-        }
-        /* Don't allow a forced eof if data is still there */
-        if ((s->fde.force_eof && !r) || is_eof) {
-            D(" closing because is_eof=%d r=%d s->fde.force_eof=%d", is_eof, r, s->fde.force_eof);
-            s->close(s);
+        if (!local_socket_flush_outgoing(s)) {
             return;
         }
     }