adb: fix NonblockingFdConnection's behavior with large writes.

Large opportunistic writes would perform a write without updating
writable_ or waking up the polling thread, which resulted in the worker
thread never polling with POLLOUT.

Test: adb_benchmark
Change-Id: Ifed3b97a4b647b539dcd2df858572fa7da9a22d0
diff --git a/adb/transport_fd.cpp b/adb/transport_fd.cpp
index ec61279..a93e68a 100644
--- a/adb/transport_fd.cpp
+++ b/adb/transport_fd.cpp
@@ -85,18 +85,9 @@
             if (pfds[0].revents) {
                 if ((pfds[0].revents & POLLOUT)) {
                     std::lock_guard<std::mutex> lock(this->write_mutex_);
-                    WriteResult result = DispatchWrites();
-                    switch (result) {
-                        case WriteResult::Error:
-                            *error = "write failed";
-                            return;
-
-                        case WriteResult::Completed:
-                            writable_ = true;
-                            break;
-
-                        case WriteResult::TryAgain:
-                            break;
+                    if (DispatchWrites() == WriteResult::Error) {
+                        *error = "write failed";
+                        return;
                     }
                 }
 
@@ -179,13 +170,14 @@
 
     WriteResult DispatchWrites() REQUIRES(write_mutex_) {
         CHECK(!write_buffer_.empty());
-        if (!writable_) {
-            return WriteResult::TryAgain;
-        }
-
         auto iovs = write_buffer_.iovecs();
         ssize_t rc = adb_writev(fd_.get(), iovs.data(), iovs.size());
         if (rc == -1) {
+            if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                writable_ = false;
+                return WriteResult::TryAgain;
+            }
+
             return WriteResult::Error;
         } else if (rc == 0) {
             errno = 0;
@@ -194,6 +186,7 @@
 
         // TODO: Implement a more efficient drop_front?
         write_buffer_.take_front(rc);
+        writable_ = write_buffer_.empty();
         if (write_buffer_.empty()) {
             return WriteResult::Completed;
         }
@@ -211,7 +204,12 @@
         if (!packet->payload.empty()) {
             write_buffer_.append(std::make_unique<IOVector::block_type>(std::move(packet->payload)));
         }
-        return DispatchWrites() != WriteResult::Error;
+
+        WriteResult result = DispatchWrites();
+        if (result == WriteResult::TryAgain) {
+            WakeThread();
+        }
+        return result != WriteResult::Error;
     }
 
     std::thread thread_;