Merge changes Iff1ec14b,I32123c51,I517dbcba,Ic215c7fe

* changes:
  adb: try harder to fill our test sockets.
  adb: don't set has_write_error on success.
  adb: move ownership of the fdevent thread into FdeventTest.
  adb: guarantee that fdevent_run_on_main_thread happens last.
diff --git a/adb/fdevent.cpp b/adb/fdevent.cpp
index 9776c1b..cf441cf 100644
--- a/adb/fdevent.cpp
+++ b/adb/fdevent.cpp
@@ -75,6 +75,7 @@
 static bool main_thread_valid;
 static uint64_t main_thread_id;
 
+static bool run_needs_flush = false;
 static auto& run_queue_notify_fd = *new unique_fd();
 static auto& run_queue_mutex = *new std::mutex();
 static auto& run_queue GUARDED_BY(run_queue_mutex) = *new std::deque<std::function<void()>>();
@@ -317,7 +318,8 @@
         PLOG(FATAL) << "failed to empty run queue notify fd";
     }
 
-    fdevent_run_flush();
+    // Mark that we need to flush, and then run it at the end of fdevent_loop.
+    run_needs_flush = true;
 }
 
 static void fdevent_run_setup() {
@@ -378,6 +380,11 @@
             g_pending_list.pop_front();
             fdevent_call_fdfunc(fde);
         }
+
+        if (run_needs_flush) {
+            fdevent_run_flush();
+            run_needs_flush = false;
+        }
     }
 }
 
diff --git a/adb/fdevent_test.cpp b/adb/fdevent_test.cpp
index e3d5a35..2f0ff18 100644
--- a/adb/fdevent_test.cpp
+++ b/adb/fdevent_test.cpp
@@ -80,30 +80,7 @@
 
 TEST_F(FdeventTest, fdevent_terminate) {
     PrepareThread();
-
-    std::thread thread(fdevent_loop);
-    TerminateThread(thread);
-}
-
-static void FdEventThreadFunc(ThreadArg* arg) {
-    std::vector<int> read_fds;
-    std::vector<int> write_fds;
-
-    read_fds.push_back(arg->first_read_fd);
-    for (size_t i = 0; i < arg->middle_pipe_count; ++i) {
-        int fds[2];
-        ASSERT_EQ(0, adb_socketpair(fds));
-        read_fds.push_back(fds[0]);
-        write_fds.push_back(fds[1]);
-    }
-    write_fds.push_back(arg->last_write_fd);
-
-    std::vector<std::unique_ptr<FdHandler>> fd_handlers;
-    for (size_t i = 0; i < read_fds.size(); ++i) {
-        fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i]));
-    }
-
-    fdevent_loop();
+    TerminateThread();
 }
 
 TEST_F(FdeventTest, smoke) {
@@ -122,7 +99,26 @@
     int reader = fd_pair2[0];
 
     PrepareThread();
-    std::thread thread(FdEventThreadFunc, &thread_arg);
+
+    std::vector<std::unique_ptr<FdHandler>> fd_handlers;
+    fdevent_run_on_main_thread([&thread_arg, &fd_handlers]() {
+        std::vector<int> read_fds;
+        std::vector<int> write_fds;
+
+        read_fds.push_back(thread_arg.first_read_fd);
+        for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) {
+            int fds[2];
+            ASSERT_EQ(0, adb_socketpair(fds));
+            read_fds.push_back(fds[0]);
+            write_fds.push_back(fds[1]);
+        }
+        write_fds.push_back(thread_arg.last_write_fd);
+
+        for (size_t i = 0; i < read_fds.size(); ++i) {
+            fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i]));
+        }
+    });
+    WaitForFdeventLoop();
 
     for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
         std::string read_buffer = MESSAGE;
@@ -132,7 +128,10 @@
         ASSERT_EQ(read_buffer, write_buffer);
     }
 
-    TerminateThread(thread);
+    fdevent_run_on_main_thread([&fd_handlers]() { fd_handlers.clear(); });
+    WaitForFdeventLoop();
+
+    TerminateThread();
     ASSERT_EQ(0, adb_close(writer));
     ASSERT_EQ(0, adb_close(reader));
 }
@@ -143,7 +142,7 @@
     size_t* happened_event_count;
 };
 
-static void InvalidFdEventCallback(int fd, unsigned events, void* userdata) {
+static void InvalidFdEventCallback(int, unsigned events, void* userdata) {
     InvalidFdArg* arg = reinterpret_cast<InvalidFdArg*>(userdata);
     ASSERT_EQ(arg->expected_events, events);
     fdevent_remove(&arg->fde);
@@ -179,7 +178,6 @@
     std::vector<int> vec;
 
     PrepareThread();
-    std::thread thread(fdevent_loop);
 
     // Block the main thread for a long time while we queue our callbacks.
     fdevent_run_on_main_thread([]() {
@@ -194,7 +192,7 @@
         });
     }
 
-    TerminateThread(thread);
+    TerminateThread();
 
     ASSERT_EQ(1000000u, vec.size());
     for (int i = 0; i < 1000000; ++i) {
@@ -218,11 +216,8 @@
     std::vector<int> vec;
 
     PrepareThread();
-    std::thread thread(fdevent_loop);
-
     fdevent_run_on_main_thread(make_appender(&vec, 0));
-
-    TerminateThread(thread);
+    TerminateThread();
 
     ASSERT_EQ(100u, vec.size());
     for (int i = 0; i < 100; ++i) {
diff --git a/adb/fdevent_test.h b/adb/fdevent_test.h
index 1a2d41c..5a417e0 100644
--- a/adb/fdevent_test.h
+++ b/adb/fdevent_test.h
@@ -16,10 +16,31 @@
 
 #include <gtest/gtest.h>
 
+#include <condition_variable>
+#include <mutex>
 #include <thread>
 
 #include "socket.h"
 #include "sysdeps.h"
+#include "sysdeps/chrono.h"
+
+static void WaitForFdeventLoop() {
+    // Sleep for a bit to make sure that network events have propagated.
+    std::this_thread::sleep_for(100ms);
+
+    // fdevent_run_on_main_thread has a guaranteed ordering, and is guaranteed to happen after
+    // socket events, so as soon as our function is called, we know that we've processed all
+    // previous events.
+    std::mutex mutex;
+    std::condition_variable cv;
+    std::unique_lock<std::mutex> lock(mutex);
+    fdevent_run_on_main_thread([&]() {
+        mutex.lock();
+        mutex.unlock();
+        cv.notify_one();
+    });
+    cv.wait(lock);
+}
 
 class FdeventTest : public ::testing::Test {
   protected:
@@ -49,6 +70,9 @@
         }
         dummy_socket->ready(dummy_socket);
         dummy = dummy_fds[0];
+
+        thread_ = std::thread([]() { fdevent_loop(); });
+        WaitForFdeventLoop();
     }
 
     size_t GetAdditionalLocalSocketCount() {
@@ -56,10 +80,12 @@
         return 2;
     }
 
-    void TerminateThread(std::thread& thread) {
+    void TerminateThread() {
         fdevent_terminate_loop();
         ASSERT_TRUE(WriteFdExactly(dummy, "", 1));
-        thread.join();
+        thread_.join();
         ASSERT_EQ(0, adb_close(dummy));
     }
+
+    std::thread thread_;
 };
diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp
index f587fdb..6c4a8b2 100644
--- a/adb/socket_test.cpp
+++ b/adb/socket_test.cpp
@@ -42,10 +42,6 @@
 
 class LocalSocketTest : public FdeventTest {};
 
-static void WaitForFdeventLoop() {
-    std::this_thread::sleep_for(100ms);
-}
-
 TEST_F(LocalSocketTest, smoke) {
     // Join two socketpairs with a chain of intermediate socketpairs.
     int first[2];
@@ -86,7 +82,6 @@
     connect(prev_tail, end);
 
     PrepareThread();
-    std::thread thread(fdevent_loop);
 
     for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
         std::string read_buffer = MESSAGE;
@@ -102,7 +97,7 @@
     // Wait until the local sockets are closed.
     WaitForFdeventLoop();
     ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
-    TerminateThread(thread);
+    TerminateThread();
 }
 
 struct CloseWithPacketArg {
@@ -111,24 +106,39 @@
     int cause_close_fd;
 };
 
-static void CloseWithPacketThreadFunc(CloseWithPacketArg* arg) {
-    asocket* s = create_local_socket(arg->socket_fd);
-    ASSERT_TRUE(s != nullptr);
-    arg->bytes_written = 0;
+static void CreateCloser(CloseWithPacketArg* arg) {
+    fdevent_run_on_main_thread([arg]() {
+        asocket* s = create_local_socket(arg->socket_fd);
+        ASSERT_TRUE(s != nullptr);
+        arg->bytes_written = 0;
 
-    std::string data;
-    data.resize(MAX_PAYLOAD);
-    arg->bytes_written += data.size();
-    int ret = s->enqueue(s, std::move(data));
-    ASSERT_EQ(1, ret);
+        // On platforms that implement sockets via underlying sockets (e.g. Wine),
+        // a socket can appear to be full, and then become available for writes
+        // again without read being called on the other end. Loop and sleep after
+        // each write to give the underlying implementation time to flush.
+        bool socket_filled = false;
+        for (int i = 0; i < 128; ++i) {
+            std::string data;
+            data.resize(MAX_PAYLOAD);
+            arg->bytes_written += data.size();
+            int ret = s->enqueue(s, std::move(data));
+            if (ret == 1) {
+                socket_filled = true;
+                break;
+            }
+            ASSERT_NE(-1, ret);
 
-    asocket* cause_close_s = create_local_socket(arg->cause_close_fd);
-    ASSERT_TRUE(cause_close_s != nullptr);
-    cause_close_s->peer = s;
-    s->peer = cause_close_s;
-    cause_close_s->ready(cause_close_s);
+            std::this_thread::sleep_for(250ms);
+        }
+        ASSERT_TRUE(socket_filled);
 
-    fdevent_loop();
+        asocket* cause_close_s = create_local_socket(arg->cause_close_fd);
+        ASSERT_TRUE(cause_close_s != nullptr);
+        cause_close_s->peer = s;
+        s->peer = cause_close_s;
+        cause_close_s->ready(cause_close_s);
+    });
+    WaitForFdeventLoop();
 }
 
 // This test checks if we can close local socket in the following situation:
@@ -145,9 +155,8 @@
     arg.cause_close_fd = cause_close_fd[1];
 
     PrepareThread();
-    std::thread thread(CloseWithPacketThreadFunc, &arg);
+    CreateCloser(&arg);
 
-    WaitForFdeventLoop();
     ASSERT_EQ(0, adb_close(cause_close_fd[0]));
 
     WaitForFdeventLoop();
@@ -156,7 +165,7 @@
 
     WaitForFdeventLoop();
     ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
-    TerminateThread(thread);
+    TerminateThread();
 }
 
 // This test checks if we can read packets from a closing local socket.
@@ -170,7 +179,7 @@
     arg.cause_close_fd = cause_close_fd[1];
 
     PrepareThread();
-    std::thread thread(CloseWithPacketThreadFunc, &arg);
+    CreateCloser(&arg);
 
     WaitForFdeventLoop();
     ASSERT_EQ(0, adb_close(cause_close_fd[0]));
@@ -186,7 +195,7 @@
 
     WaitForFdeventLoop();
     ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
-    TerminateThread(thread);
+    TerminateThread();
 }
 
 // This test checks if we can close local socket in the following situation:
@@ -203,7 +212,7 @@
     arg.cause_close_fd = cause_close_fd[1];
 
     PrepareThread();
-    std::thread thread(CloseWithPacketThreadFunc, &arg);
+    CreateCloser(&arg);
 
     WaitForFdeventLoop();
     EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count());
@@ -211,7 +220,7 @@
 
     WaitForFdeventLoop();
     ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
-    TerminateThread(thread);
+    TerminateThread();
 }
 
 // Ensure that if we fail to write output to an fd, we will still flush data coming from it.
@@ -231,7 +240,6 @@
     tail->ready(tail);
 
     PrepareThread();
-    std::thread thread(fdevent_loop);
 
     EXPECT_TRUE(WriteFdExactly(head_fd[0], "foo", 3));
 
@@ -249,7 +257,7 @@
 
     WaitForFdeventLoop();
     ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
-    TerminateThread(thread);
+    TerminateThread();
 }
 
 #if defined(__linux__)
@@ -258,21 +266,10 @@
     std::string error;
     int fd = network_loopback_client(5038, SOCK_STREAM, &error);
     ASSERT_GE(fd, 0) << error;
-    std::this_thread::sleep_for(200ms);
+    std::this_thread::sleep_for(1s);
     ASSERT_EQ(0, adb_close(fd));
 }
 
-struct CloseRdHupSocketArg {
-    int socket_fd;
-};
-
-static void CloseRdHupSocketThreadFunc(CloseRdHupSocketArg* arg) {
-    asocket* s = create_local_socket(arg->socket_fd);
-    ASSERT_TRUE(s != nullptr);
-
-    fdevent_loop();
-}
-
 // This test checks if we can close sockets in CLOSE_WAIT state.
 TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) {
     std::string error;
@@ -283,11 +280,13 @@
 
     int accept_fd = adb_socket_accept(listen_fd, nullptr, nullptr);
     ASSERT_GE(accept_fd, 0);
-    CloseRdHupSocketArg arg;
-    arg.socket_fd = accept_fd;
 
     PrepareThread();
-    std::thread thread(CloseRdHupSocketThreadFunc, &arg);
+
+    fdevent_run_on_main_thread([accept_fd]() {
+        asocket* s = create_local_socket(accept_fd);
+        ASSERT_TRUE(s != nullptr);
+    });
 
     WaitForFdeventLoop();
     EXPECT_EQ(1u + GetAdditionalLocalSocketCount(), fdevent_installed_count());
@@ -297,7 +296,7 @@
 
     WaitForFdeventLoop();
     ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
-    TerminateThread(thread);
+    TerminateThread();
 }
 
 #endif  // defined(__linux__)
diff --git a/adb/sockets.cpp b/adb/sockets.cpp
index e05a3db..0887e6f 100644
--- a/adb/sockets.cpp
+++ b/adb/sockets.cpp
@@ -126,12 +126,12 @@
         } else if (rc == -1 && errno == EAGAIN) {
             fdevent_add(&s->fde, FDE_WRITE);
             return SocketFlushResult::TryAgain;
+        } else {
+            // We failed to write, but it's possible that we can still read from the socket.
+            // Give that a try before giving up.
+            s->has_write_error = true;
+            break;
         }
-
-        // We failed to write, but it's possible that we can still read from the socket.
-        // Give that a try before giving up.
-        s->has_write_error = true;
-        break;
     }
 
     // If we sent the last packet of a closing socket, we can now destroy it.