Merge changes from topic 'adb_thread_safety' am: 9083ff1c24 am: 2ca23fa508 am: 05b7443644
am: 577c2172d1

Change-Id: I1d384220c6eea9cb3066a8cbe86298543bde574a
diff --git a/adb/adb.cpp b/adb/adb.cpp
index 39e71e5..0181daa 100644
--- a/adb/adb.cpp
+++ b/adb/adb.cpp
@@ -31,6 +31,8 @@
 #include <time.h>
 
 #include <chrono>
+#include <condition_variable>
+#include <mutex>
 #include <string>
 #include <thread>
 #include <vector>
@@ -48,6 +50,7 @@
 #include "adb_io.h"
 #include "adb_listeners.h"
 #include "adb_utils.h"
+#include "sysdeps/chrono.h"
 #include "transport.h"
 
 #if !ADB_HOST
@@ -313,19 +316,15 @@
     if (type == "bootloader") {
         D("setting connection_state to kCsBootloader");
         t->SetConnectionState(kCsBootloader);
-        update_transports();
     } else if (type == "device") {
         D("setting connection_state to kCsDevice");
         t->SetConnectionState(kCsDevice);
-        update_transports();
     } else if (type == "recovery") {
         D("setting connection_state to kCsRecovery");
         t->SetConnectionState(kCsRecovery);
-        update_transports();
     } else if (type == "sideload") {
         D("setting connection_state to kCsSideload");
         t->SetConnectionState(kCsSideload);
-        update_transports();
     } else {
         D("setting connection_state to kCsHost");
         t->SetConnectionState(kCsHost);
@@ -353,6 +352,8 @@
         send_auth_request(t);
     }
 #endif
+
+    update_transports();
 }
 
 void handle_packet(apacket *p, atransport *t)
@@ -1229,4 +1230,50 @@
       return ret - 1;
     return -1;
 }
+
+static auto& init_mutex = *new std::mutex();
+static auto& init_cv = *new std::condition_variable();
+static bool device_scan_complete = false;
+static bool transports_ready = false;
+
+void update_transport_status() {
+    bool result = iterate_transports([](const atransport* t) {
+        if (t->type == kTransportUsb && t->online != 1) {
+            return false;
+        }
+        return true;
+    });
+
+    D("update_transport_status: transports_ready = %s", result ? "true" : "false");
+
+    bool ready;
+
+    {
+        std::lock_guard<std::mutex> lock(init_mutex);
+        transports_ready = result;
+        ready = transports_ready && device_scan_complete;
+    }
+
+    if (ready) {
+        D("update_transport_status: notifying");
+        init_cv.notify_all();
+    }
+}
+
+void adb_notify_device_scan_complete() {
+    D("device scan complete");
+
+    {
+        std::lock_guard<std::mutex> lock(init_mutex);
+        device_scan_complete = true;
+    }
+
+    update_transport_status();
+}
+
+void adb_wait_for_device_initialization() {
+    std::unique_lock<std::mutex> lock(init_mutex);
+    init_cv.wait_for(lock, 3s, []() { return device_scan_complete && transports_ready; });
+}
+
 #endif  // ADB_HOST
diff --git a/adb/adb.h b/adb/adb.h
index e3675d8..d6b2b81 100644
--- a/adb/adb.h
+++ b/adb/adb.h
@@ -228,4 +228,18 @@
 
 void parse_banner(const std::string&, atransport* t);
 
+// On startup, the adb server needs to wait until all of the connected devices are ready.
+// To do this, we need to know when the scan has identified all of the potential new transports, and
+// when each transport becomes ready.
+// TODO: Do this for mDNS as well, instead of just USB?
+
+// We've found all of the transports we potentially care about.
+void adb_notify_device_scan_complete();
+
+// One or more transports have changed status, check to see if we're ready.
+void update_transport_status();
+
+// Wait until device scan has completed and every transport is ready, or a timeout elapses.
+void adb_wait_for_device_initialization();
+
 #endif
diff --git a/adb/adb_client.cpp b/adb/adb_client.cpp
index b656887..4f3ff25 100644
--- a/adb/adb_client.cpp
+++ b/adb/adb_client.cpp
@@ -28,12 +28,15 @@
 #include <sys/stat.h>
 #include <sys/types.h>
 
+#include <condition_variable>
+#include <mutex>
 #include <string>
 #include <thread>
 #include <vector>
 
 #include <android-base/stringprintf.h>
 #include <android-base/strings.h>
+#include <android-base/thread_annotations.h>
 #include <cutils/sockets.h>
 
 #include "adb_io.h"
@@ -177,9 +180,8 @@
         } else {
             fprintf(stderr, "* daemon started successfully\n");
         }
-        // Give the server some time to start properly and detect devices.
-        std::this_thread::sleep_for(3s);
-        // fall through to _adb_connect
+        // The server will wait until it detects all of its connected devices before acking.
+        // Fall through to _adb_connect.
     } else {
         // If a server is already running, check its version matches.
         int version = ADB_SERVER_VERSION - 1;
diff --git a/adb/client/main.cpp b/adb/client/main.cpp
index 606203c..fe5099c 100644
--- a/adb/client/main.cpp
+++ b/adb/client/main.cpp
@@ -156,33 +156,38 @@
         }
 #endif
 
-        // Inform our parent that we are up and running.
+        // Wait for the USB scan to complete before notifying the parent that we're up.
+        // We need to perform this in a thread, because we would otherwise block the event loop.
+        std::thread notify_thread([ack_reply_fd]() {
+            adb_wait_for_device_initialization();
 
-        // Any error output written to stderr now goes to adb.log. We could
-        // keep around a copy of the stderr fd and use that to write any errors
-        // encountered by the following code, but that is probably overkill.
+            // Any error output written to stderr now goes to adb.log. We could
+            // keep around a copy of the stderr fd and use that to write any errors
+            // encountered by the following code, but that is probably overkill.
 #if defined(_WIN32)
-        const HANDLE ack_reply_handle = cast_int_to_handle(ack_reply_fd);
-        const CHAR ack[] = "OK\n";
-        const DWORD bytes_to_write = arraysize(ack) - 1;
-        DWORD written = 0;
-        if (!WriteFile(ack_reply_handle, ack, bytes_to_write, &written, NULL)) {
-            fatal("adb: cannot write ACK to handle 0x%p: %s", ack_reply_handle,
-                  android::base::SystemErrorCodeToString(GetLastError()).c_str());
-        }
-        if (written != bytes_to_write) {
-            fatal("adb: cannot write %lu bytes of ACK: only wrote %lu bytes",
-                  bytes_to_write, written);
-        }
-        CloseHandle(ack_reply_handle);
+            const HANDLE ack_reply_handle = cast_int_to_handle(ack_reply_fd);
+            const CHAR ack[] = "OK\n";
+            const DWORD bytes_to_write = arraysize(ack) - 1;
+            DWORD written = 0;
+            if (!WriteFile(ack_reply_handle, ack, bytes_to_write, &written, NULL)) {
+                fatal("adb: cannot write ACK to handle 0x%p: %s", ack_reply_handle,
+                      android::base::SystemErrorCodeToString(GetLastError()).c_str());
+            }
+            if (written != bytes_to_write) {
+                fatal("adb: cannot write %lu bytes of ACK: only wrote %lu bytes", bytes_to_write,
+                      written);
+            }
+            CloseHandle(ack_reply_handle);
 #else
-        // TODO(danalbert): Can't use SendOkay because we're sending "OK\n", not
-        // "OKAY".
-        if (!android::base::WriteStringToFd("OK\n", ack_reply_fd)) {
-            fatal_errno("error writing ACK to fd %d", ack_reply_fd);
-        }
-        unix_close(ack_reply_fd);
+            // TODO(danalbert): Can't use SendOkay because we're sending "OK\n", not
+            // "OKAY".
+            if (!android::base::WriteStringToFd("OK\n", ack_reply_fd)) {
+                fatal_errno("error writing ACK to fd %d", ack_reply_fd);
+            }
+            unix_close(ack_reply_fd);
 #endif
+        });
+        notify_thread.detach();
     }
 
     D("Event loop starting");
diff --git a/adb/client/usb_libusb.cpp b/adb/client/usb_libusb.cpp
index d39884a..20610ee 100644
--- a/adb/client/usb_libusb.cpp
+++ b/adb/client/usb_libusb.cpp
@@ -352,6 +352,8 @@
         }
         libusb_free_device_list(list, 1);
 
+        adb_notify_device_scan_complete();
+
         std::this_thread::sleep_for(500ms);
     }
 }
diff --git a/adb/fdevent.cpp b/adb/fdevent.cpp
index 72c9eef..b28de4b 100644
--- a/adb/fdevent.cpp
+++ b/adb/fdevent.cpp
@@ -26,15 +26,19 @@
 #include <unistd.h>
 
 #include <atomic>
+#include <functional>
 #include <list>
+#include <mutex>
 #include <unordered_map>
 #include <vector>
 
 #include <android-base/logging.h>
 #include <android-base/stringprintf.h>
+#include <android-base/thread_annotations.h>
 
 #include "adb_io.h"
 #include "adb_trace.h"
+#include "adb_unique_fd.h"
 #include "adb_utils.h"
 
 #if !ADB_HOST
@@ -75,6 +79,10 @@
 static bool main_thread_valid;
 static unsigned long main_thread_id;
 
+static auto& run_queue_notify_fd = *new unique_fd();
+static auto& run_queue_mutex = *new std::mutex();
+static auto& run_queue GUARDED_BY(run_queue_mutex) = *new std::vector<std::function<void()>>();
+
 void check_main_thread() {
     if (main_thread_valid) {
         CHECK_EQ(main_thread_id, adb_thread_id());
@@ -112,8 +120,7 @@
     return android::base::StringPrintf("(fdevent %d %s)", fde->fd, state.c_str());
 }
 
-fdevent *fdevent_create(int fd, fd_func func, void *arg)
-{
+fdevent* fdevent_create(int fd, fd_func func, void* arg) {
     check_main_thread();
     fdevent *fde = (fdevent*) malloc(sizeof(fdevent));
     if(fde == 0) return 0;
@@ -122,8 +129,7 @@
     return fde;
 }
 
-void fdevent_destroy(fdevent *fde)
-{
+void fdevent_destroy(fdevent* fde) {
     check_main_thread();
     if(fde == 0) return;
     if(!(fde->state & FDE_CREATED)) {
@@ -278,8 +284,7 @@
     }
 }
 
-static void fdevent_call_fdfunc(fdevent* fde)
-{
+static void fdevent_call_fdfunc(fdevent* fde) {
     unsigned events = fde->events;
     fde->events = 0;
     CHECK(fde->state & FDE_PENDING);
@@ -292,10 +297,7 @@
 
 #include <sys/ioctl.h>
 
-static void fdevent_subproc_event_func(int fd, unsigned ev,
-                                       void* /* userdata */)
-{
-
+static void fdevent_subproc_event_func(int fd, unsigned ev, void* /* userdata */) {
     D("subproc handling on fd = %d, ev = %x", fd, ev);
 
     CHECK_GE(fd, 0);
@@ -342,8 +344,7 @@
     }
 }
 
-void fdevent_subproc_setup()
-{
+static void fdevent_subproc_setup() {
     int s[2];
 
     if(adb_socketpair(s)) {
@@ -358,12 +359,63 @@
 }
 #endif // !ADB_HOST
 
-void fdevent_loop()
-{
+static void fdevent_run_flush() REQUIRES(run_queue_mutex) {
+    for (auto& f : run_queue) {
+        f();
+    }
+    run_queue.clear();
+}
+
+static void fdevent_run_func(int fd, unsigned ev, void* /* userdata */) {
+    CHECK_GE(fd, 0);
+    CHECK(ev & FDE_READ);
+
+    char buf[1024];
+
+    // Empty the fd.
+    if (adb_read(fd, buf, sizeof(buf)) == -1) {
+        PLOG(FATAL) << "failed to empty run queue notify fd";
+    }
+
+    std::lock_guard<std::mutex> lock(run_queue_mutex);
+    fdevent_run_flush();
+}
+
+static void fdevent_run_setup() {
+    std::lock_guard<std::mutex> lock(run_queue_mutex);
+    CHECK(run_queue_notify_fd.get() == -1);
+    int s[2];
+    if (adb_socketpair(s) != 0) {
+        PLOG(FATAL) << "failed to create run queue notify socketpair";
+    }
+
+    run_queue_notify_fd.reset(s[0]);
+    fdevent* fde = fdevent_create(s[1], fdevent_run_func, nullptr);
+    CHECK(fde != nullptr);
+    fdevent_add(fde, FDE_READ);
+
+    fdevent_run_flush();
+}
+
+void fdevent_run_on_main_thread(std::function<void()> fn) {
+    std::lock_guard<std::mutex> lock(run_queue_mutex);
+    run_queue.push_back(std::move(fn));
+
+    // run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up.
+    // In that case, rely on the setup code to flush the queue without a notification being needed.
+    if (run_queue_notify_fd != -1) {
+        if (adb_write(run_queue_notify_fd.get(), "", 1) != 1) {
+            PLOG(FATAL) << "failed to write to run queue notify fd";
+        }
+    }
+}
+
+void fdevent_loop() {
     set_main_thread();
 #if !ADB_HOST
     fdevent_subproc_setup();
 #endif // !ADB_HOST
+    fdevent_run_setup();
 
     while (true) {
         if (terminate_loop) {
@@ -393,6 +445,11 @@
 void fdevent_reset() {
     g_poll_node_map.clear();
     g_pending_list.clear();
+
+    std::lock_guard<std::mutex> lock(run_queue_mutex);
+    run_queue_notify_fd.reset();
+    run_queue.clear();
+
     main_thread_valid = false;
     terminate_loop = false;
 }
diff --git a/adb/fdevent.h b/adb/fdevent.h
index e32845a..896400a 100644
--- a/adb/fdevent.h
+++ b/adb/fdevent.h
@@ -20,6 +20,8 @@
 #include <stddef.h>
 #include <stdint.h>  /* for int64_t */
 
+#include <functional>
+
 /* events that may be observed */
 #define FDE_READ              0x0001
 #define FDE_WRITE             0x0002
@@ -78,6 +80,9 @@
 
 void check_main_thread();
 
+// Queue an operation to run on the main thread.
+void fdevent_run_on_main_thread(std::function<void()> fn);
+
 // The following functions are used only for tests.
 void fdevent_terminate_loop();
 size_t fdevent_installed_count();
diff --git a/adb/fdevent_test.cpp b/adb/fdevent_test.cpp
index bdb973a..86e0209 100644
--- a/adb/fdevent_test.cpp
+++ b/adb/fdevent_test.cpp
@@ -173,3 +173,24 @@
     std::thread thread(InvalidFdThreadFunc);
     thread.join();
 }
+
+TEST_F(FdeventTest, run_on_main_thread) {
+    std::vector<int> vec;
+
+    PrepareThread();
+    std::thread thread(fdevent_loop);
+
+    for (int i = 0; i < 100; ++i) {
+        fdevent_run_on_main_thread([i, &vec]() {
+            check_main_thread();
+            vec.push_back(i);
+        });
+    }
+
+    TerminateThread(thread);
+
+    ASSERT_EQ(100u, vec.size());
+    for (int i = 0; i < 100; ++i) {
+        ASSERT_EQ(i, vec[i]);
+    }
+}
diff --git a/adb/fdevent_test.h b/adb/fdevent_test.h
index f4215ae..5ca49ac 100644
--- a/adb/fdevent_test.h
+++ b/adb/fdevent_test.h
@@ -53,11 +53,11 @@
 
     size_t GetAdditionalLocalSocketCount() {
 #if ADB_HOST
-        // dummy socket installed in PrepareThread()
-        return 1;
-#else
-        // dummy socket and one more socket installed in fdevent_subproc_setup()
+        // dummy socket installed in PrepareThread() + fdevent_run_on_main_thread socket
         return 2;
+#else
+        // dummy socket + fdevent_run_on_main_thread + fdevent_subproc_setup() sockets
+        return 3;
 #endif
     }
 
diff --git a/adb/transport.cpp b/adb/transport.cpp
index cc8c162..20de642 100644
--- a/adb/transport.cpp
+++ b/adb/transport.cpp
@@ -400,8 +400,27 @@
     return &tracker->socket;
 }
 
+// Check if all of the USB transports are connected.
+bool iterate_transports(std::function<bool(const atransport*)> fn) {
+    std::lock_guard<std::mutex> lock(transport_lock);
+    for (const auto& t : transport_list) {
+        if (!fn(t)) {
+            return false;
+        }
+    }
+    for (const auto& t : pending_list) {
+        if (!fn(t)) {
+            return false;
+        }
+    }
+    return true;
+}
+
 // Call this function each time the transport list has changed.
 void update_transports() {
+    update_transport_status();
+
+    // Notify `adb track-devices` clients.
     std::string transports = list_transports(false);
 
     device_tracker* tracker = device_tracker_list;
diff --git a/adb/transport.h b/adb/transport.h
index 8c15d66..e129355 100644
--- a/adb/transport.h
+++ b/adb/transport.h
@@ -198,6 +198,10 @@
 void kick_transport(atransport* t);
 void update_transports(void);
 
+// Iterates across all of the current and pending transports.
+// Stops iteration and returns false if fn returns false, otherwise returns true.
+bool iterate_transports(std::function<bool(const atransport*)> fn);
+
 void init_transport_registration(void);
 void init_mdns_transport_discovery(void);
 std::string list_transports(bool long_listing);
diff --git a/adb/transport_mdns.cpp b/adb/transport_mdns.cpp
index e49b1c6..3603f09 100644
--- a/adb/transport_mdns.cpp
+++ b/adb/transport_mdns.cpp
@@ -24,6 +24,8 @@
 #include <arpa/inet.h>
 #endif
 
+#include <thread>
+
 #include <android-base/stringprintf.h>
 #include <dns_sd.h>
 
@@ -262,19 +264,22 @@
     }
 }
 
-void init_mdns_transport_discovery(void) {
-    DNSServiceErrorType errorCode =
-        DNSServiceBrowse(&service_ref, 0, 0, kADBServiceType, nullptr,
-                         register_mdns_transport, nullptr);
+void init_mdns_transport_discovery_thread(void) {
+    DNSServiceErrorType errorCode = DNSServiceBrowse(&service_ref, 0, 0, kADBServiceType, nullptr,
+                                                     register_mdns_transport, nullptr);
 
     if (errorCode != kDNSServiceErr_NoError) {
         D("Got %d initiating mDNS browse.", errorCode);
         return;
     }
 
-    fdevent_install(&service_ref_fde,
-                    adb_DNSServiceRefSockFD(service_ref),
-                    pump_service_ref,
-                    &service_ref);
-    fdevent_set(&service_ref_fde, FDE_READ);
+    fdevent_run_on_main_thread([]() {
+        fdevent_install(&service_ref_fde, adb_DNSServiceRefSockFD(service_ref), pump_service_ref,
+                        &service_ref);
+        fdevent_set(&service_ref_fde, FDE_READ);
+    });
+}
+
+void init_mdns_transport_discovery(void) {
+    std::thread(init_mdns_transport_discovery_thread).detach();
 }