Merge changes I9f36cc26,I06561ad0,I42c2a8d0 am: 3a38310476
am: e5e5bccd40

Change-Id: Ia1f891df6366514565dcfcd82521024b71722015
diff --git a/adb/Android.bp b/adb/Android.bp
index 99de54e..1f41e4f 100644
--- a/adb/Android.bp
+++ b/adb/Android.bp
@@ -122,6 +122,7 @@
     "sysdeps_test.cpp",
     "sysdeps/stat_test.cpp",
     "transport_test.cpp",
+    "types_test.cpp",
 ]
 
 cc_library_host_static {
diff --git a/adb/adb_unique_fd.h b/adb/adb_unique_fd.h
index 9c02cbe..7d2354d 100644
--- a/adb/adb_unique_fd.h
+++ b/adb/adb_unique_fd.h
@@ -28,11 +28,38 @@
 using unique_fd = android::base::unique_fd_impl<AdbCloser>;
 
 #if !defined(_WIN32)
-inline bool Pipe(unique_fd* read, unique_fd* write) {
+inline bool Pipe(unique_fd* read, unique_fd* write, int flags = 0) {
     int pipefd[2];
+#if !defined(__APPLE__)
+    if (pipe2(pipefd, flags) != 0) {
+        return false;
+    }
+#else
+    // Darwin doesn't have pipe2. Implement it ourselves.
+    if (flags != 0 && (flags & ~(O_CLOEXEC | O_NONBLOCK)) != 0) {
+        errno = EINVAL;
+        return false;
+    }
+
     if (pipe(pipefd) != 0) {
         return false;
     }
+
+    if (flags & O_CLOEXEC) {
+        if (fcntl(pipefd[0], F_SETFD, FD_CLOEXEC) != 0 ||
+            fcntl(pipefd[1], F_SETFD, FD_CLOEXEC) != 0) {
+            PLOG(FATAL) << "failed to set FD_CLOEXEC on newly created pipe";
+        }
+    }
+
+    if (flags & O_NONBLOCK) {
+        if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) != 0 ||
+            fcntl(pipefd[1], F_SETFL, O_NONBLOCK) != 0) {
+            PLOG(FATAL) << "failed to set O_NONBLOCK  on newly created pipe";
+        }
+    }
+#endif
+
     read->reset(pipefd[0]);
     write->reset(pipefd[1]);
     return true;
diff --git a/adb/benchmark_device.py b/adb/benchmark_device.py
new file mode 100755
index 0000000..00c2315
--- /dev/null
+++ b/adb/benchmark_device.py
@@ -0,0 +1,120 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2018 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import statistics
+import time
+
+import adb
+
+def lock_min(device):
+    device.shell_nocheck(["""
+        for x in /sys/devices/system/cpu/cpu?/cpufreq; do
+            echo userspace > $x/scaling_governor
+            cat $x/scaling_min_freq > $x/scaling_setspeed
+        done
+    """])
+
+def lock_max(device):
+    device.shell_nocheck(["""
+        for x in /sys/devices/system/cpu/cpu?/cpufreq; do
+            echo userspace > $x/scaling_governor
+            cat $x/scaling_max_freq > $x/scaling_setspeed
+        done
+    """])
+
+def unlock(device):
+    device.shell_nocheck(["""
+        for x in /sys/devices/system/cpu/cpu?/cpufreq; do
+            echo ondemand > $x/scaling_governor
+            echo sched > $x/scaling_governor
+            echo schedutil > $x/scaling_governor
+        done
+    """])
+
+def harmonic_mean(xs):
+    return 1.0 / statistics.mean([1.0 / x for x in xs])
+
+def analyze(name, speeds):
+    median = statistics.median(speeds)
+    mean = harmonic_mean(speeds)
+    stddev = statistics.stdev(speeds)
+    msg = "%s: %d runs: median %.2f MiB/s, mean %.2f MiB/s, stddev: %.2f MiB/s"
+    print(msg % (name, len(speeds), median, mean, stddev))
+
+def benchmark_push(device=None, file_size_mb=100):
+    if device == None:
+        device = adb.get_device()
+
+    lock_max(device)
+
+    remote_path = "/dev/null"
+    local_path = "/tmp/adb_benchmark_temp"
+
+    with open(local_path, "wb") as f:
+        f.truncate(file_size_mb * 1024 * 1024)
+
+    speeds = list()
+    for _ in range(0, 5):
+        begin = time.time()
+        device.push(local=local_path, remote=remote_path)
+        end = time.time()
+        speeds.append(file_size_mb / float(end - begin))
+
+    analyze("push %dMiB" % file_size_mb, speeds)
+
+def benchmark_pull(device=None, file_size_mb=100):
+    if device == None:
+        device = adb.get_device()
+
+    lock_max(device)
+
+    remote_path = "/data/local/tmp/adb_benchmark_temp"
+    local_path = "/tmp/adb_benchmark_temp"
+
+    device.shell(["dd", "if=/dev/zero", "of=" + remote_path, "bs=1m",
+                  "count=" + str(file_size_mb)])
+    speeds = list()
+    for _ in range(0, 5):
+        begin = time.time()
+        device.pull(remote=remote_path, local=local_path)
+        end = time.time()
+        speeds.append(file_size_mb / float(end - begin))
+
+    analyze("pull %dMiB" % file_size_mb, speeds)
+
+def benchmark_shell(device=None, file_size_mb=100):
+    if device == None:
+        device = adb.get_device()
+
+    lock_max(device)
+
+    speeds = list()
+    for _ in range(0, 5):
+        begin = time.time()
+        device.shell(["dd", "if=/dev/zero", "bs=1m",
+                      "count=" + str(file_size_mb)])
+        end = time.time()
+        speeds.append(file_size_mb / float(end - begin))
+
+    analyze("shell %dMiB" % file_size_mb, speeds)
+
+def main():
+    benchmark_pull()
+
+if __name__ == "__main__":
+    main()
diff --git a/adb/socket.h b/adb/socket.h
index 27e5b05..0905aab 100644
--- a/adb/socket.h
+++ b/adb/socket.h
@@ -62,7 +62,7 @@
     int fd = -1;
 
     // queue of data waiting to be written
-    std::deque<Range> packet_queue;
+    IOVector packet_queue;
 
     std::string smart_socket_data;
 
diff --git a/adb/sockets.cpp b/adb/sockets.cpp
index 9a6dcbe..de3215d 100644
--- a/adb/sockets.cpp
+++ b/adb/sockets.cpp
@@ -113,14 +113,14 @@
 };
 
 static SocketFlushResult local_socket_flush_incoming(asocket* s) {
-    while (!s->packet_queue.empty()) {
-        Range& r = s->packet_queue.front();
-
-        int rc = adb_write(s->fd, r.data(), r.size());
-        if (rc == static_cast<int>(r.size())) {
-            s->packet_queue.pop_front();
+    if (!s->packet_queue.empty()) {
+        std::vector<adb_iovec> iov = s->packet_queue.iovecs();
+        ssize_t rc = adb_writev(s->fd, iov.data(), iov.size());
+        if (rc > 0 && static_cast<size_t>(rc) == s->packet_queue.size()) {
+            s->packet_queue.clear();
         } else if (rc > 0) {
-            r.drop_front(rc);
+            // TODO: Implement a faster drop_front?
+            s->packet_queue.take_front(rc);
             fdevent_add(s->fde, FDE_WRITE);
             return SocketFlushResult::TryAgain;
         } else if (rc == -1 && errno == EAGAIN) {
@@ -130,7 +130,6 @@
             // We failed to write, but it's possible that we can still read from the socket.
             // Give that a try before giving up.
             s->has_write_error = true;
-            break;
         }
     }
 
@@ -217,8 +216,7 @@
 static int local_socket_enqueue(asocket* s, apacket::payload_type data) {
     D("LS(%d): enqueue %zu", s->id, data.size());
 
-    Range r(std::move(data));
-    s->packet_queue.push_back(std::move(r));
+    s->packet_queue.append(std::move(data));
     switch (local_socket_flush_incoming(s)) {
         case SocketFlushResult::Destroyed:
             return -1;
@@ -622,7 +620,7 @@
     D("SS(%d): enqueue %zu", s->id, data.size());
 
     if (s->smart_socket_data.empty()) {
-        // TODO: Make this a BlockChain?
+        // TODO: Make this an IOVector?
         s->smart_socket_data.assign(data.begin(), data.end());
     } else {
         std::copy(data.begin(), data.end(), std::back_inserter(s->smart_socket_data));
diff --git a/adb/types.h b/adb/types.h
index dd3e063..c6b3f07 100644
--- a/adb/types.h
+++ b/adb/types.h
@@ -17,11 +17,15 @@
 #pragma once
 
 #include <algorithm>
+#include <deque>
+#include <type_traits>
 #include <utility>
+#include <vector>
 
 #include <android-base/logging.h>
 
 #include "sysdeps/memory.h"
+#include "sysdeps/uio.h"
 
 // Essentially std::vector<char>, except without zero initialization or reallocation.
 struct Block {
@@ -130,34 +134,205 @@
     payload_type payload;
 };
 
-struct Range {
-    explicit Range(apacket::payload_type data) : data_(std::move(data)) {}
+struct IOVector {
+    using value_type = char;
+    using block_type = Block;
+    using size_type = size_t;
 
-    Range(const Range& copy) = delete;
-    Range& operator=(const Range& copy) = delete;
+    IOVector() {}
 
-    Range(Range&& move) = default;
-    Range& operator=(Range&& move) = default;
+    explicit IOVector(std::unique_ptr<block_type> block) {
+        append(std::move(block));
+    }
 
-    size_t size() const { return data_.size() - begin_offset_ - end_offset_; };
+    IOVector(const IOVector& copy) = delete;
+    IOVector(IOVector&& move) : IOVector() {
+        *this = std::move(move);
+    }
+
+    IOVector& operator=(const IOVector& copy) = delete;
+    IOVector& operator=(IOVector&& move) {
+        chain_ = std::move(move.chain_);
+        chain_length_ = move.chain_length_;
+        begin_offset_ = move.begin_offset_;
+        end_offset_ = move.end_offset_;
+
+        move.chain_.clear();
+        move.chain_length_ = 0;
+        move.begin_offset_ = 0;
+        move.end_offset_ = 0;
+
+        return *this;
+    }
+
+    size_type size() const { return chain_length_ - begin_offset_ - end_offset_; }
     bool empty() const { return size() == 0; }
 
-    void drop_front(size_t n) {
-        CHECK_GE(size(), n);
-        begin_offset_ += n;
+    void clear() {
+        chain_length_ = 0;
+        begin_offset_ = 0;
+        end_offset_ = 0;
+        chain_.clear();
     }
 
-    void drop_end(size_t n) {
-        CHECK_GE(size(), n);
-        end_offset_ += n;
+    // Split the first |len| bytes out of this chain into its own.
+    IOVector take_front(size_type len) {
+        IOVector head;
+
+        if (len == 0) {
+            return head;
+        }
+        CHECK_GE(size(), len);
+
+        std::shared_ptr<const block_type> first_block = chain_.front();
+        CHECK_GE(first_block->size(), begin_offset_);
+        head.append_shared(std::move(first_block));
+        head.begin_offset_ = begin_offset_;
+
+        while (head.size() < len) {
+            pop_front_block();
+            CHECK(!chain_.empty());
+
+            head.append_shared(chain_.front());
+        }
+
+        if (head.size() == len) {
+            // Head takes full ownership of the last block it took.
+            head.end_offset_ = 0;
+            begin_offset_ = 0;
+            pop_front_block();
+        } else {
+            // Head takes partial ownership of the last block it took.
+            size_t bytes_taken = head.size() - len;
+            head.end_offset_ = bytes_taken;
+            CHECK_GE(chain_.front()->size(), bytes_taken);
+            begin_offset_ = chain_.front()->size() - bytes_taken;
+        }
+
+        return head;
     }
 
-    char* data() { return &data_[0] + begin_offset_; }
+    // Add a nonempty block to the chain.
+    // The end of the chain must be a complete block (i.e. end_offset_ == 0).
+    void append(std::unique_ptr<const block_type> block) {
+        CHECK_NE(0ULL, block->size());
+        CHECK_EQ(0ULL, end_offset_);
+        chain_length_ += block->size();
+        chain_.emplace_back(std::move(block));
+    }
 
-    apacket::payload_type::iterator begin() { return data_.begin() + begin_offset_; }
-    apacket::payload_type::iterator end() { return data_.end() - end_offset_; }
+    void append(block_type&& block) { append(std::make_unique<block_type>(std::move(block))); }
 
-    apacket::payload_type data_;
+    void trim_front() {
+        if (begin_offset_ == 0) {
+            return;
+        }
+
+        const block_type* first_block = chain_.front().get();
+        auto copy = std::make_unique<block_type>(first_block->size() - begin_offset_);
+        memcpy(copy->data(), first_block->data() + begin_offset_, copy->size());
+        chain_.front() = std::move(copy);
+
+        chain_length_ -= begin_offset_;
+        begin_offset_ = 0;
+    }
+
+  private:
+    // append, except takes a shared_ptr.
+    // Private to prevent exterior mutation of blocks.
+    void append_shared(std::shared_ptr<const block_type> block) {
+        CHECK_NE(0ULL, block->size());
+        CHECK_EQ(0ULL, end_offset_);
+        chain_length_ += block->size();
+        chain_.emplace_back(std::move(block));
+    }
+
+    // Drop the front block from the chain, and update chain_length_ appropriately.
+    void pop_front_block() {
+        chain_length_ -= chain_.front()->size();
+        begin_offset_ = 0;
+        chain_.pop_front();
+    }
+
+    // Iterate over the blocks with a callback with an operator()(const char*, size_t).
+    template <typename Fn>
+    void iterate_blocks(Fn&& callback) const {
+        if (chain_.size() == 0) {
+            return;
+        }
+
+        for (size_t i = 0; i < chain_.size(); ++i) {
+            const std::shared_ptr<const block_type>& block = chain_.at(i);
+            const char* begin = block->data();
+            size_t length = block->size();
+
+            // Note that both of these conditions can be true if there's only one block.
+            if (i == 0) {
+                CHECK_GE(block->size(), begin_offset_);
+                begin += begin_offset_;
+                length -= begin_offset_;
+            }
+
+            if (i == chain_.size() - 1) {
+                CHECK_GE(length, end_offset_);
+                length -= end_offset_;
+            }
+
+            callback(begin, length);
+        }
+    }
+
+  public:
+    // Copy all of the blocks into a single block.
+    template <typename CollectionType = block_type>
+    CollectionType coalesce() const {
+        CollectionType result;
+        if (size() == 0) {
+            return result;
+        }
+
+        result.resize(size());
+
+        size_t offset = 0;
+        iterate_blocks([&offset, &result](const char* data, size_t len) {
+            memcpy(&result[offset], data, len);
+            offset += len;
+        });
+
+        return result;
+    }
+
+    template <typename FunctionType>
+    auto coalesced(FunctionType&& f) const ->
+        typename std::result_of<FunctionType(const char*, size_t)>::type {
+        if (chain_.size() == 1) {
+            // If we only have one block, we can use it directly.
+            return f(chain_.front()->data() + begin_offset_, size());
+        } else {
+            // Otherwise, copy to a single block.
+            auto data = coalesce();
+            return f(data.data(), data.size());
+        }
+    }
+
+    // Get a list of iovecs that can be used to write out all of the blocks.
+    std::vector<adb_iovec> iovecs() const {
+        std::vector<adb_iovec> result;
+        iterate_blocks([&result](const char* data, size_t len) {
+            adb_iovec iov;
+            iov.iov_base = const_cast<char*>(data);
+            iov.iov_len = len;
+            result.emplace_back(iov);
+        });
+
+        return result;
+    }
+
+  private:
+    // Total length of all of the blocks in the chain.
+    size_t chain_length_ = 0;
+
     size_t begin_offset_ = 0;
     size_t end_offset_ = 0;
+    std::deque<std::shared_ptr<const block_type>> chain_;
 };
diff --git a/adb/types_test.cpp b/adb/types_test.cpp
new file mode 100644
index 0000000..31ab90a
--- /dev/null
+++ b/adb/types_test.cpp
@@ -0,0 +1,119 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "sysdeps/memory.h"
+#include "types.h"
+
+static std::unique_ptr<IOVector::block_type> create_block(const std::string& string) {
+    return std::make_unique<IOVector::block_type>(string.begin(), string.end());
+}
+
+static std::unique_ptr<IOVector::block_type> create_block(char value, size_t len) {
+    auto block = std::make_unique<IOVector::block_type>();
+    block->resize(len);
+    memset(&(*block)[0], value, len);
+    return block;
+}
+
+template <typename T>
+static std::unique_ptr<IOVector::block_type> copy_block(T&& block) {
+    auto copy = std::make_unique<IOVector::block_type>();
+    copy->assign(block->begin(), block->end());
+    return copy;
+}
+
+TEST(IOVector, empty) {
+    // Empty IOVector.
+    IOVector bc;
+    CHECK_EQ(0ULL, bc.coalesce().size());
+}
+
+TEST(IOVector, single_block) {
+    // A single block.
+    auto block = create_block('x', 100);
+    IOVector bc;
+    bc.append(copy_block(block));
+    ASSERT_EQ(100ULL, bc.size());
+    auto coalesced = bc.coalesce();
+    ASSERT_EQ(*block, coalesced);
+}
+
+TEST(IOVector, single_block_split) {
+    // One block split.
+    IOVector bc;
+    bc.append(create_block("foobar"));
+    IOVector foo = bc.take_front(3);
+    ASSERT_EQ(3ULL, foo.size());
+    ASSERT_EQ(3ULL, bc.size());
+    ASSERT_EQ(*create_block("foo"), foo.coalesce());
+    ASSERT_EQ(*create_block("bar"), bc.coalesce());
+}
+
+TEST(IOVector, aligned_split) {
+    IOVector bc;
+    bc.append(create_block("foo"));
+    bc.append(create_block("bar"));
+    bc.append(create_block("baz"));
+    ASSERT_EQ(9ULL, bc.size());
+
+    IOVector foo = bc.take_front(3);
+    ASSERT_EQ(3ULL, foo.size());
+    ASSERT_EQ(*create_block("foo"), foo.coalesce());
+
+    IOVector bar = bc.take_front(3);
+    ASSERT_EQ(3ULL, bar.size());
+    ASSERT_EQ(*create_block("bar"), bar.coalesce());
+
+    IOVector baz = bc.take_front(3);
+    ASSERT_EQ(3ULL, baz.size());
+    ASSERT_EQ(*create_block("baz"), baz.coalesce());
+
+    ASSERT_EQ(0ULL, bc.size());
+}
+
+TEST(IOVector, misaligned_split) {
+    IOVector bc;
+    bc.append(create_block("foo"));
+    bc.append(create_block("bar"));
+    bc.append(create_block("baz"));
+    bc.append(create_block("qux"));
+    bc.append(create_block("quux"));
+
+    // Aligned left, misaligned right, across multiple blocks.
+    IOVector foob = bc.take_front(4);
+    ASSERT_EQ(4ULL, foob.size());
+    ASSERT_EQ(*create_block("foob"), foob.coalesce());
+
+    // Misaligned left, misaligned right, in one block.
+    IOVector a = bc.take_front(1);
+    ASSERT_EQ(1ULL, a.size());
+    ASSERT_EQ(*create_block("a"), a.coalesce());
+
+    // Misaligned left, misaligned right, across two blocks.
+    IOVector rba = bc.take_front(3);
+    ASSERT_EQ(3ULL, rba.size());
+    ASSERT_EQ(*create_block("rba"), rba.coalesce());
+
+    // Misaligned left, misaligned right, across three blocks.
+    IOVector zquxquu = bc.take_front(7);
+    ASSERT_EQ(7ULL, zquxquu.size());
+    ASSERT_EQ(*create_block("zquxquu"), zquxquu.coalesce());
+
+    ASSERT_EQ(1ULL, bc.size());
+    ASSERT_EQ(*create_block("x"), bc.coalesce());
+}