adbd: limit the sizes of aio writes.

The kernel does a kmalloc of the write size for each write during
io_submit, which can lead to failure if the kernel's heap is fragmented
such that it can't allocate a contiguous 1MB chunk. Partition writes
into 16 page chunks which are much more likely to succeed.

Bug: http://b/126582877
Test: test_device.py
Change-Id: Ib2d48561594e1d81f1a2f62e34eaa40265ac47c4
diff --git a/adb/daemon/usb.cpp b/adb/daemon/usb.cpp
index f0e2861..83ff221 100644
--- a/adb/daemon/usb.cpp
+++ b/adb/daemon/usb.cpp
@@ -29,6 +29,7 @@
 #include <linux/usb/functionfs.h>
 #include <sys/eventfd.h>
 
+#include <algorithm>
 #include <array>
 #include <future>
 #include <memory>
@@ -60,6 +61,7 @@
 static constexpr size_t kUsbReadSize = 16384;
 
 static constexpr size_t kUsbWriteQueueDepth = 16;
+static constexpr size_t kUsbWriteSize = 16 * PAGE_SIZE;
 
 static const char* to_string(enum usb_functionfs_event_type type) {
     switch (type) {
@@ -115,7 +117,7 @@
 struct IoBlock {
     bool pending;
     struct iocb control;
-    Block payload;
+    std::shared_ptr<Block> payload;
 
     TransferId id() const { return TransferId::from_value(control.aio_data); }
 };
@@ -207,8 +209,20 @@
         std::lock_guard<std::mutex> lock(write_mutex_);
         write_requests_.push_back(CreateWriteBlock(std::move(header), next_write_id_++));
         if (!packet->payload.empty()) {
-            write_requests_.push_back(
-                    CreateWriteBlock(std::move(packet->payload), next_write_id_++));
+            // The kernel attempts to allocate a contiguous block of memory for each write,
+            // which can fail if the write is large and the kernel heap is fragmented.
+            // Split large writes into smaller chunks to avoid this.
+            std::shared_ptr<Block> payload = std::make_shared<Block>(std::move(packet->payload));
+            size_t offset = 0;
+            size_t len = payload->size();
+
+            while (len > 0) {
+                size_t write_size = std::min(kUsbWriteSize, len);
+                write_requests_.push_back(
+                        CreateWriteBlock(payload, offset, write_size, next_write_id_++));
+                len -= write_size;
+                offset += write_size;
+            }
         }
         SubmitWrites();
         return true;
@@ -367,10 +381,10 @@
 
     void PrepareReadBlock(IoBlock* block, uint64_t id) {
         block->pending = false;
-        block->payload.resize(kUsbReadSize);
+        block->payload = std::make_shared<Block>(kUsbReadSize);
         block->control.aio_data = static_cast<uint64_t>(TransferId::read(id));
-        block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data());
-        block->control.aio_nbytes = block->payload.size();
+        block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload->data());
+        block->control.aio_nbytes = block->payload->size();
     }
 
     IoBlock CreateReadBlock(uint64_t id) {
@@ -421,7 +435,7 @@
         uint64_t read_idx = id.id % kUsbReadQueueDepth;
         IoBlock* block = &read_requests_[read_idx];
         block->pending = false;
-        block->payload.resize(size);
+        block->payload->resize(size);
 
         // Notification for completed reads can be received out of order.
         if (block->id().id != needed_read_id_) {
@@ -442,16 +456,16 @@
     }
 
     void ProcessRead(IoBlock* block) {
-        if (!block->payload.empty()) {
+        if (!block->payload->empty()) {
             if (!incoming_header_.has_value()) {
-                CHECK_EQ(sizeof(amessage), block->payload.size());
+                CHECK_EQ(sizeof(amessage), block->payload->size());
                 amessage msg;
-                memcpy(&msg, block->payload.data(), sizeof(amessage));
+                memcpy(&msg, block->payload->data(), sizeof(amessage));
                 LOG(DEBUG) << "USB read:" << dump_header(&msg);
                 incoming_header_ = msg;
             } else {
                 size_t bytes_left = incoming_header_->data_length - incoming_payload_.size();
-                Block payload = std::move(block->payload);
+                Block payload = std::move(*block->payload);
                 CHECK_LE(payload.size(), bytes_left);
                 incoming_payload_.append(std::make_unique<Block>(std::move(payload)));
             }
@@ -506,7 +520,8 @@
         SubmitWrites();
     }
 
-    std::unique_ptr<IoBlock> CreateWriteBlock(Block payload, uint64_t id) {
+    std::unique_ptr<IoBlock> CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset,
+                                              size_t len, uint64_t id) {
         auto block = std::make_unique<IoBlock>();
         block->payload = std::move(payload);
         block->control.aio_data = static_cast<uint64_t>(TransferId::write(id));
@@ -514,14 +529,20 @@
         block->control.aio_lio_opcode = IOCB_CMD_PWRITE;
         block->control.aio_reqprio = 0;
         block->control.aio_fildes = write_fd_.get();
-        block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data());
-        block->control.aio_nbytes = block->payload.size();
+        block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload->data() + offset);
+        block->control.aio_nbytes = len;
         block->control.aio_offset = 0;
         block->control.aio_flags = IOCB_FLAG_RESFD;
         block->control.aio_resfd = worker_event_fd_.get();
         return block;
     }
 
+    std::unique_ptr<IoBlock> CreateWriteBlock(Block payload, uint64_t id) {
+        std::shared_ptr<Block> block = std::make_shared<Block>(std::move(payload));
+        size_t len = block->size();
+        return CreateWriteBlock(std::move(block), 0, len, id);
+    }
+
     void SubmitWrites() REQUIRES(write_mutex_) {
         if (writes_submitted_ == kUsbWriteQueueDepth) {
             return;