adb: fix sync.

adbd's file sync service doesn't handle a full socket gracefully,
immediately terminating the service as soon as it fails to write a
response. This would generally be fine if the socket's buffer were as
large as it claims (212992 by default with a 64-bit kernel), but this
buffer size is a giant lie, as each write has 576 bytes of overhead
that's used up in the send buffer. When setting the send buffer size,
the kernel helpfully doubles the value to attempt to account for the
overhead, but when writing 8 byte responses, only 2% of the buffer
actually gets used for responses, so we run out of buffer after 364
files instead of the 26624 that would be expected.

Fix this by processing the responses as they become available, and
calculate a maximum limit to how many sends we dispatch before we stop
and wait for responses to come in.

Bug: http://b/150827486
Test: manually modified adbd to respond with giant error messages, and
      modified adb to not read responses until we choose to block
Change-Id: Ieb8c935662864211e2fd16c337ffed0992990086
(cherry picked from commit 672cdfeeff8ad64622a529ed0c4d1b6923df7539)
diff --git a/adb/client/file_sync_client.cpp b/adb/client/file_sync_client.cpp
index 94bd8f5..7c341bc 100644
--- a/adb/client/file_sync_client.cpp
+++ b/adb/client/file_sync_client.cpp
@@ -209,7 +209,8 @@
 
 class SyncConnection {
   public:
-    SyncConnection() {
+    SyncConnection() : acknowledgement_buffer_(sizeof(sync_status) + SYNC_DATA_MAX) {
+        acknowledgement_buffer_.resize(0);
         max = SYNC_DATA_MAX; // TODO: decide at runtime.
 
         std::string error;
@@ -507,34 +508,6 @@
         return WriteOrDie(lpath, rpath, &msg.data, sizeof(msg.data));
     }
 
-    bool ReadAcknowledgments() {
-        bool result = true;
-        while (!deferred_acknowledgements_.empty()) {
-            auto [from, to] = std::move(deferred_acknowledgements_.front());
-            deferred_acknowledgements_.pop_front();
-            result &= CopyDone(from, to);
-        }
-        return result;
-    }
-
-    bool CopyDone(const std::string& from, const std::string& to) {
-        syncmsg msg;
-        if (!ReadFdExactly(fd, &msg.status, sizeof(msg.status))) {
-            Error("failed to copy '%s' to '%s': couldn't read from device", from.c_str(),
-                  to.c_str());
-            return false;
-        }
-        if (msg.status.id == ID_OKAY) {
-            return true;
-        }
-        if (msg.status.id != ID_FAIL) {
-            Error("failed to copy '%s' to '%s': unknown reason %d", from.c_str(), to.c_str(),
-                  msg.status.id);
-            return false;
-        }
-        return ReportCopyFailure(from, to, msg);
-    }
-
     bool ReportCopyFailure(const std::string& from, const std::string& to, const syncmsg& msg) {
         std::vector<char> buf(msg.status.msglen + 1);
         if (!ReadFdExactly(fd, &buf[0], msg.status.msglen)) {
@@ -547,6 +520,97 @@
         return false;
     }
 
+    void CopyDone() { deferred_acknowledgements_.pop_front(); }
+
+    void ReportDeferredCopyFailure(const std::string& msg) {
+        auto& [from, to] = deferred_acknowledgements_.front();
+        Error("failed to copy '%s' to '%s': remote %s", from.c_str(), to.c_str(), msg.c_str());
+        deferred_acknowledgements_.pop_front();
+    }
+
+    bool ReadAcknowledgements(bool read_all = false) {
+        // We need to read enough such that adbd's intermediate socket's write buffer can't be
+        // full. The default buffer on Linux is 212992 bytes, but there's 576 bytes of bookkeeping
+        // overhead per write. The worst case scenario is a continuous string of failures, since
+        // each logical packet is divided into two writes. If our packet size if conservatively 512
+        // bytes long, this leaves us with space for 128 responses.
+        constexpr size_t max_deferred_acks = 128;
+        auto& buf = acknowledgement_buffer_;
+        adb_pollfd pfd = {.fd = fd.get(), .events = POLLIN};
+        while (!deferred_acknowledgements_.empty()) {
+            bool should_block = read_all || deferred_acknowledgements_.size() >= max_deferred_acks;
+
+            ssize_t rc = adb_poll(&pfd, 1, should_block ? -1 : 0);
+            if (rc == 0) {
+                CHECK(!should_block);
+                return true;
+            }
+
+            if (acknowledgement_buffer_.size() < sizeof(sync_status)) {
+                const ssize_t header_bytes_left = sizeof(sync_status) - buf.size();
+                ssize_t rc = adb_read(fd, buf.end(), header_bytes_left);
+                if (rc <= 0) {
+                    Error("failed to read copy response");
+                    return false;
+                }
+
+                buf.resize(buf.size() + rc);
+                if (rc != header_bytes_left) {
+                    // Early exit if we run out of data in the socket.
+                    return true;
+                }
+
+                if (!should_block) {
+                    // We don't want to read again yet, because the socket might be empty.
+                    continue;
+                }
+            }
+
+            auto* hdr = reinterpret_cast<sync_status*>(buf.data());
+            if (hdr->id == ID_OKAY) {
+                buf.resize(0);
+                if (hdr->msglen != 0) {
+                    Error("received ID_OKAY with msg_len (%" PRIu32 " != 0", hdr->msglen);
+                    return false;
+                }
+                CopyDone();
+                continue;
+            } else if (hdr->id != ID_FAIL) {
+                Error("unexpected response from daemon: id = %#" PRIx32, hdr->id);
+                return false;
+            } else if (hdr->msglen > SYNC_DATA_MAX) {
+                Error("too-long message length from daemon: msglen = %" PRIu32, hdr->msglen);
+                return false;
+            }
+
+            const ssize_t msg_bytes_left = hdr->msglen + sizeof(sync_status) - buf.size();
+            CHECK_GE(msg_bytes_left, 0);
+            if (msg_bytes_left > 0) {
+                ssize_t rc = adb_read(fd, buf.end(), msg_bytes_left);
+                if (rc <= 0) {
+                    Error("failed to read copy failure message");
+                    return false;
+                }
+
+                buf.resize(buf.size() + rc);
+                if (rc != msg_bytes_left) {
+                    if (should_block) {
+                        continue;
+                    } else {
+                        return true;
+                    }
+                }
+
+                std::string msg(buf.begin() + sizeof(sync_status), buf.end());
+                ReportDeferredCopyFailure(msg);
+                buf.resize(0);
+                return false;
+            }
+        }
+
+        return true;
+    }
+
     void Printf(const char* fmt, ...) __attribute__((__format__(__printf__, 2, 3))) {
         std::string s;
 
@@ -613,6 +677,7 @@
 
   private:
     std::deque<std::pair<std::string, std::string>> deferred_acknowledgements_;
+    Block acknowledgement_buffer_;
     FeatureSet features_;
     bool have_stat_v2_;
     bool have_ls_v2_;
@@ -721,7 +786,7 @@
         if (!sc.SendSmallFile(rpath, mode, lpath, rpath, mtime, buf, data_length)) {
             return false;
         }
-        return true;
+        return sc.ReadAcknowledgements();
 #endif
     }
 
@@ -744,7 +809,7 @@
             return false;
         }
     }
-    return true;
+    return sc.ReadAcknowledgements();
 }
 
 static bool sync_recv(SyncConnection& sc, const char* rpath, const char* lpath,
@@ -971,8 +1036,9 @@
     }
 
     sc.RecordFilesSkipped(skipped);
+    bool success = sc.ReadAcknowledgements(true);
     sc.ReportTransferRate(lpath, TransferDirection::push);
-    return true;
+    return success;
 }
 
 bool do_sync_push(const std::vector<const char*>& srcs, const char* dst, bool sync) {
@@ -1065,7 +1131,7 @@
         sc.ReportTransferRate(src_path, TransferDirection::push);
     }
 
-    success &= sc.ReadAcknowledgments();
+    success &= sc.ReadAcknowledgements(true);
     sc.ReportOverallTransferRate(TransferDirection::push);
     return success;
 }