adb: don't abort when connecting to the same address twice.

When connecting to an address, we construct a transport first, and then
check whether we've already connected to that address. The consequent
destruction of the BlockingConnectionAdapter attempts to join threads
that haven't been started, which aborts.

Make it safe to destruct a BlockingConnectionAdapter without calling
Start on it first, to solve this.

Bug: http://b/69137547
Test: nc -l 12345 & (adb connect localhost:12345; adb connect localhost:12345)
Test: python test_adb.py
Change-Id: I6cb968a62dbac6332907e06575893d764905ee62
diff --git a/adb/test_adb.py b/adb/test_adb.py
index e771106..3bb433d 100644
--- a/adb/test_adb.py
+++ b/adb/test_adb.py
@@ -162,15 +162,14 @@
 
         Bug: https://code.google.com/p/android/issues/detail?id=21021
         """
-        port = 12345
-
         with contextlib.closing(
                 socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener:
             # Use SO_REUSEADDR so subsequent runs of the test can grab the port
             # even if it is in TIME_WAIT.
             listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-            listener.bind(('127.0.0.1', port))
+            listener.bind(('127.0.0.1', 0))
             listener.listen(4)
+            port = listener.getsockname()[1]
 
             # Now that listening has started, start adb emu kill, telling it to
             # connect to our mock emulator.
@@ -233,6 +232,24 @@
                 output.strip(), 'connected to localhost:{}'.format(port))
             s.close()
 
+    def test_already_connected(self):
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s.bind(('127.0.0.1', 0))
+        s.listen(2)
+
+        port = s.getsockname()[1]
+        output = subprocess.check_output(
+            ['adb', 'connect', 'localhost:{}'.format(port)])
+
+        self.assertEqual(
+            output.strip(), 'connected to localhost:{}'.format(port))
+
+        # b/31250450: this always returns 0 but probably shouldn't.
+        output = subprocess.check_output(
+            ['adb', 'connect', 'localhost:{}'.format(port)])
+
+        self.assertEqual(
+            output.strip(), 'already connected to localhost:{}'.format(port))
 
 def main():
     random.seed(0)
diff --git a/adb/transport.cpp b/adb/transport.cpp
index 37b56e2..2867d38 100644
--- a/adb/transport.cpp
+++ b/adb/transport.cpp
@@ -77,7 +77,15 @@
     Stop();
 }
 
+static void AssumeLocked(std::mutex& mutex) ASSERT_CAPABILITY(mutex) {}
+
 void BlockingConnectionAdapter::Start() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    if (started_) {
+        LOG(FATAL) << "BlockingConnectionAdapter(" << this->transport_name_
+                   << "): started multiple times";
+    }
+
     read_thread_ = std::thread([this]() {
         LOG(INFO) << this->transport_name_ << ": read thread spawning";
         while (true) {
@@ -95,7 +103,11 @@
         LOG(INFO) << this->transport_name_ << ": write thread spawning";
         while (true) {
             std::unique_lock<std::mutex> lock(mutex_);
-            cv_.wait(lock, [this]() { return this->stopped_ || !this->write_queue_.empty(); });
+            cv_.wait(lock, [this]() REQUIRES(mutex_) {
+                return this->stopped_ || !this->write_queue_.empty();
+            });
+
+            AssumeLocked(mutex_);
 
             if (this->stopped_) {
                 return;
@@ -111,25 +123,44 @@
         }
         std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "write failed"); });
     });
+
+    started_ = true;
 }
 
 void BlockingConnectionAdapter::Stop() {
-    std::unique_lock<std::mutex> lock(mutex_);
-    if (stopped_) {
-        LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): already stopped";
-        return;
-    }
+    {
+        std::lock_guard<std::mutex> lock(mutex_);
+        if (!started_) {
+            LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): not started";
+            return;
+        }
 
-    stopped_ = true;
-    lock.unlock();
+        if (stopped_) {
+            LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_
+                      << "): already stopped";
+            return;
+        }
+
+        stopped_ = true;
+    }
 
     LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopping";
 
     this->underlying_->Close();
-
     this->cv_.notify_one();
-    read_thread_.join();
-    write_thread_.join();
+
+    // Move the threads out into locals with the lock taken, and then unlock to let them exit.
+    std::thread read_thread;
+    std::thread write_thread;
+
+    {
+        std::lock_guard<std::mutex> lock(mutex_);
+        read_thread = std::move(read_thread_);
+        write_thread = std::move(write_thread_);
+    }
+
+    read_thread.join();
+    write_thread.join();
 
     LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopped";
     std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "requested stop"); });
@@ -137,7 +168,7 @@
 
 bool BlockingConnectionAdapter::Write(std::unique_ptr<apacket> packet) {
     {
-        std::unique_lock<std::mutex> lock(this->mutex_);
+        std::lock_guard<std::mutex> lock(this->mutex_);
         write_queue_.emplace_back(std::move(packet));
     }
 
diff --git a/adb/transport.h b/adb/transport.h
index 8b71e34..d18c362 100644
--- a/adb/transport.h
+++ b/adb/transport.h
@@ -30,6 +30,7 @@
 #include <thread>
 #include <unordered_set>
 
+#include <android-base/thread_annotations.h>
 #include <openssl/rsa.h>
 
 #include "adb.h"
@@ -121,13 +122,14 @@
     virtual void Start() override final;
     virtual void Stop() override final;
 
-    bool stopped_ = false;
+    bool started_ GUARDED_BY(mutex_) = false;
+    bool stopped_ GUARDED_BY(mutex_) = false;
 
     std::unique_ptr<BlockingConnection> underlying_;
-    std::thread read_thread_;
-    std::thread write_thread_;
+    std::thread read_thread_ GUARDED_BY(mutex_);
+    std::thread write_thread_ GUARDED_BY(mutex_);
 
-    std::deque<std::unique_ptr<apacket>> write_queue_;
+    std::deque<std::unique_ptr<apacket>> write_queue_ GUARDED_BY(mutex_);
     std::mutex mutex_;
     std::condition_variable cv_;