Merge "adb: fix sync." into rvc-dev
diff --git a/adb/client/file_sync_client.cpp b/adb/client/file_sync_client.cpp
index 94bd8f5..7c341bc 100644
--- a/adb/client/file_sync_client.cpp
+++ b/adb/client/file_sync_client.cpp
@@ -209,7 +209,8 @@
class SyncConnection {
public:
- SyncConnection() {
+ SyncConnection() : acknowledgement_buffer_(sizeof(sync_status) + SYNC_DATA_MAX) {
+ acknowledgement_buffer_.resize(0);
max = SYNC_DATA_MAX; // TODO: decide at runtime.
std::string error;
@@ -507,34 +508,6 @@
return WriteOrDie(lpath, rpath, &msg.data, sizeof(msg.data));
}
- bool ReadAcknowledgments() {
- bool result = true;
- while (!deferred_acknowledgements_.empty()) {
- auto [from, to] = std::move(deferred_acknowledgements_.front());
- deferred_acknowledgements_.pop_front();
- result &= CopyDone(from, to);
- }
- return result;
- }
-
- bool CopyDone(const std::string& from, const std::string& to) {
- syncmsg msg;
- if (!ReadFdExactly(fd, &msg.status, sizeof(msg.status))) {
- Error("failed to copy '%s' to '%s': couldn't read from device", from.c_str(),
- to.c_str());
- return false;
- }
- if (msg.status.id == ID_OKAY) {
- return true;
- }
- if (msg.status.id != ID_FAIL) {
- Error("failed to copy '%s' to '%s': unknown reason %d", from.c_str(), to.c_str(),
- msg.status.id);
- return false;
- }
- return ReportCopyFailure(from, to, msg);
- }
-
bool ReportCopyFailure(const std::string& from, const std::string& to, const syncmsg& msg) {
std::vector<char> buf(msg.status.msglen + 1);
if (!ReadFdExactly(fd, &buf[0], msg.status.msglen)) {
@@ -547,6 +520,97 @@
return false;
}
+ void CopyDone() { deferred_acknowledgements_.pop_front(); }
+
+ void ReportDeferredCopyFailure(const std::string& msg) {
+ auto& [from, to] = deferred_acknowledgements_.front();
+ Error("failed to copy '%s' to '%s': remote %s", from.c_str(), to.c_str(), msg.c_str());
+ deferred_acknowledgements_.pop_front();
+ }
+
+ bool ReadAcknowledgements(bool read_all = false) {
+ // We need to read enough such that adbd's intermediate socket's write buffer can't be
+ // full. The default buffer on Linux is 212992 bytes, but there's 576 bytes of bookkeeping
+ // overhead per write. The worst case scenario is a continuous string of failures, since
+ // each logical packet is divided into two writes. If our packet size if conservatively 512
+ // bytes long, this leaves us with space for 128 responses.
+ constexpr size_t max_deferred_acks = 128;
+ auto& buf = acknowledgement_buffer_;
+ adb_pollfd pfd = {.fd = fd.get(), .events = POLLIN};
+ while (!deferred_acknowledgements_.empty()) {
+ bool should_block = read_all || deferred_acknowledgements_.size() >= max_deferred_acks;
+
+ ssize_t rc = adb_poll(&pfd, 1, should_block ? -1 : 0);
+ if (rc == 0) {
+ CHECK(!should_block);
+ return true;
+ }
+
+ if (acknowledgement_buffer_.size() < sizeof(sync_status)) {
+ const ssize_t header_bytes_left = sizeof(sync_status) - buf.size();
+ ssize_t rc = adb_read(fd, buf.end(), header_bytes_left);
+ if (rc <= 0) {
+ Error("failed to read copy response");
+ return false;
+ }
+
+ buf.resize(buf.size() + rc);
+ if (rc != header_bytes_left) {
+ // Early exit if we run out of data in the socket.
+ return true;
+ }
+
+ if (!should_block) {
+ // We don't want to read again yet, because the socket might be empty.
+ continue;
+ }
+ }
+
+ auto* hdr = reinterpret_cast<sync_status*>(buf.data());
+ if (hdr->id == ID_OKAY) {
+ buf.resize(0);
+ if (hdr->msglen != 0) {
+ Error("received ID_OKAY with msg_len (%" PRIu32 " != 0", hdr->msglen);
+ return false;
+ }
+ CopyDone();
+ continue;
+ } else if (hdr->id != ID_FAIL) {
+ Error("unexpected response from daemon: id = %#" PRIx32, hdr->id);
+ return false;
+ } else if (hdr->msglen > SYNC_DATA_MAX) {
+ Error("too-long message length from daemon: msglen = %" PRIu32, hdr->msglen);
+ return false;
+ }
+
+ const ssize_t msg_bytes_left = hdr->msglen + sizeof(sync_status) - buf.size();
+ CHECK_GE(msg_bytes_left, 0);
+ if (msg_bytes_left > 0) {
+ ssize_t rc = adb_read(fd, buf.end(), msg_bytes_left);
+ if (rc <= 0) {
+ Error("failed to read copy failure message");
+ return false;
+ }
+
+ buf.resize(buf.size() + rc);
+ if (rc != msg_bytes_left) {
+ if (should_block) {
+ continue;
+ } else {
+ return true;
+ }
+ }
+
+ std::string msg(buf.begin() + sizeof(sync_status), buf.end());
+ ReportDeferredCopyFailure(msg);
+ buf.resize(0);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
void Printf(const char* fmt, ...) __attribute__((__format__(__printf__, 2, 3))) {
std::string s;
@@ -613,6 +677,7 @@
private:
std::deque<std::pair<std::string, std::string>> deferred_acknowledgements_;
+ Block acknowledgement_buffer_;
FeatureSet features_;
bool have_stat_v2_;
bool have_ls_v2_;
@@ -721,7 +786,7 @@
if (!sc.SendSmallFile(rpath, mode, lpath, rpath, mtime, buf, data_length)) {
return false;
}
- return true;
+ return sc.ReadAcknowledgements();
#endif
}
@@ -744,7 +809,7 @@
return false;
}
}
- return true;
+ return sc.ReadAcknowledgements();
}
static bool sync_recv(SyncConnection& sc, const char* rpath, const char* lpath,
@@ -971,8 +1036,9 @@
}
sc.RecordFilesSkipped(skipped);
+ bool success = sc.ReadAcknowledgements(true);
sc.ReportTransferRate(lpath, TransferDirection::push);
- return true;
+ return success;
}
bool do_sync_push(const std::vector<const char*>& srcs, const char* dst, bool sync) {
@@ -1065,7 +1131,7 @@
sc.ReportTransferRate(src_path, TransferDirection::push);
}
- success &= sc.ReadAcknowledgments();
+ success &= sc.ReadAcknowledgements(true);
sc.ReportOverallTransferRate(TransferDirection::push);
return success;
}