adb: add fdevent callback that passes the fdevent.

This is useful for when we don't want to actually store the fdevent
into a separate struct to be able to destroy it, but instead want to
destroy it immediately from the callback.

Test: adb_test
Change-Id: Ief3dbf8ea6a6bd72dc7e73f9ab9b7429e48fc181
diff --git a/adb/fdevent.cpp b/adb/fdevent.cpp
index e096560..fa3738d 100644
--- a/adb/fdevent.cpp
+++ b/adb/fdevent.cpp
@@ -33,6 +33,8 @@
 #include <list>
 #include <mutex>
 #include <unordered_map>
+#include <utility>
+#include <variant>
 #include <vector>
 
 #include <android-base/chrono_utils.h>
@@ -121,13 +123,8 @@
                                        state.c_str());
 }
 
-void fdevent_install(fdevent* fde, int fd, fd_func func, void* arg) {
-    check_main_thread();
-    CHECK_GE(fd, 0);
-    memset(fde, 0, sizeof(fdevent));
-}
-
-fdevent* fdevent_create(int fd, fd_func func, void* arg) {
+template <typename F>
+static fdevent* fdevent_create_impl(int fd, F func, void* arg) {
     check_main_thread();
     CHECK_GE(fd, 0);
 
@@ -150,6 +147,14 @@
     return fde;
 }
 
+fdevent* fdevent_create(int fd, fd_func func, void* arg) {
+    return fdevent_create_impl(fd, func, arg);
+}
+
+fdevent* fdevent_create(int fd, fd_func2 func, void* arg) {
+    return fdevent_create_impl(fd, func, arg);
+}
+
 unique_fd fdevent_release(fdevent* fde) {
     check_main_thread();
     if (!fde) {
@@ -290,13 +295,27 @@
     }
 }
 
+template <class T>
+struct always_false : std::false_type {};
+
 static void fdevent_call_fdfunc(fdevent* fde) {
     unsigned events = fde->events;
     fde->events = 0;
     CHECK(fde->state & FDE_PENDING);
     fde->state &= (~FDE_PENDING);
     D("fdevent_call_fdfunc %s", dump_fde(fde).c_str());
-    fde->func(fde->fd.get(), events, fde->arg);
+    std::visit(
+            [&](auto&& f) {
+                using F = std::decay_t<decltype(f)>;
+                if constexpr (std::is_same_v<fd_func, F>) {
+                    f(fde->fd.get(), events, fde->arg);
+                } else if constexpr (std::is_same_v<fd_func2, F>) {
+                    f(fde, events, fde->arg);
+                } else {
+                    static_assert(always_false<F>::value, "non-exhaustive visitor");
+                }
+            },
+            fde->func);
 }
 
 static void fdevent_run_flush() EXCLUDES(run_queue_mutex) {
diff --git a/adb/fdevent.h b/adb/fdevent.h
index df2339a..70e0a96 100644
--- a/adb/fdevent.h
+++ b/adb/fdevent.h
@@ -21,6 +21,7 @@
 #include <stdint.h>  /* for int64_t */
 
 #include <functional>
+#include <variant>
 
 #include "adb_unique_fd.h"
 
@@ -30,6 +31,7 @@
 #define FDE_ERROR             0x0004
 
 typedef void (*fd_func)(int fd, unsigned events, void *userdata);
+typedef void (*fd_func2)(struct fdevent* fde, unsigned events, void* userdata);
 
 struct fdevent {
     uint64_t id;
@@ -40,15 +42,14 @@
     uint16_t state = 0;
     uint16_t events = 0;
 
-    fd_func func = nullptr;
+    std::variant<fd_func, fd_func2> func;
     void* arg = nullptr;
 };
 
-/* Allocate and initialize a new fdevent object
- * Note: use FD_TIMER as 'fd' to create a fd-less object
- * (used to implement timers).
-*/
+// Allocate and initialize a new fdevent object
+// TODO: Switch these to unique_fd.
 fdevent *fdevent_create(int fd, fd_func func, void *arg);
+fdevent* fdevent_create(int fd, fd_func2 func, void* arg);
 
 // Deallocate an fdevent object that was created by fdevent_create.
 void fdevent_destroy(fdevent *fde);
@@ -56,16 +57,14 @@
 // fdevent_destroy, except releasing the file descriptor previously owned by the fdevent.
 unique_fd fdevent_release(fdevent* fde);
 
-/* Change which events should cause notifications
-*/
+// Change which events should cause notifications
 void fdevent_set(fdevent *fde, unsigned events);
 void fdevent_add(fdevent *fde, unsigned events);
 void fdevent_del(fdevent *fde, unsigned events);
 
 void fdevent_set_timeout(fdevent *fde, int64_t  timeout_ms);
 
-/* loop forever, handling events.
-*/
+// Loop forever, handling events.
 void fdevent_loop();
 
 void check_main_thread();
diff --git a/adb/fdevent_test.cpp b/adb/fdevent_test.cpp
index 816134f..a9746bb 100644
--- a/adb/fdevent_test.cpp
+++ b/adb/fdevent_test.cpp
@@ -30,10 +30,16 @@
 
 class FdHandler {
   public:
-    FdHandler(int read_fd, int write_fd) : read_fd_(read_fd), write_fd_(write_fd) {
-        read_fde_ = fdevent_create(read_fd_, FdEventCallback, this);
+    FdHandler(int read_fd, int write_fd, bool use_new_callback)
+        : read_fd_(read_fd), write_fd_(write_fd) {
+        if (use_new_callback) {
+            read_fde_ = fdevent_create(read_fd_, FdEventNewCallback, this);
+            write_fde_ = fdevent_create(write_fd_, FdEventNewCallback, this);
+        } else {
+            read_fde_ = fdevent_create(read_fd_, FdEventCallback, this);
+            write_fde_ = fdevent_create(write_fd_, FdEventCallback, this);
+        }
         fdevent_add(read_fde_, FDE_READ);
-        write_fde_ = fdevent_create(write_fd_, FdEventCallback, this);
     }
 
     ~FdHandler() {
@@ -64,6 +70,29 @@
         }
     }
 
+    static void FdEventNewCallback(fdevent* fde, unsigned events, void* userdata) {
+        int fd = fde->fd.get();
+        FdHandler* handler = reinterpret_cast<FdHandler*>(userdata);
+        ASSERT_EQ(0u, (events & ~(FDE_READ | FDE_WRITE))) << "unexpected events: " << events;
+        if (events & FDE_READ) {
+            ASSERT_EQ(fd, handler->read_fd_);
+            char c;
+            ASSERT_EQ(1, adb_read(fd, &c, 1));
+            handler->queue_.push(c);
+            fdevent_add(handler->write_fde_, FDE_WRITE);
+        }
+        if (events & FDE_WRITE) {
+            ASSERT_EQ(fd, handler->write_fd_);
+            ASSERT_FALSE(handler->queue_.empty());
+            char c = handler->queue_.front();
+            handler->queue_.pop();
+            ASSERT_EQ(1, adb_write(fd, &c, 1));
+            if (handler->queue_.empty()) {
+                fdevent_del(handler->write_fde_, FDE_WRITE);
+            }
+        }
+    }
+
   private:
     const int read_fd_;
     const int write_fd_;
@@ -84,56 +113,60 @@
 }
 
 TEST_F(FdeventTest, smoke) {
-    const size_t PIPE_COUNT = 10;
-    const size_t MESSAGE_LOOP_COUNT = 100;
-    const std::string MESSAGE = "fdevent_test";
-    int fd_pair1[2];
-    int fd_pair2[2];
-    ASSERT_EQ(0, adb_socketpair(fd_pair1));
-    ASSERT_EQ(0, adb_socketpair(fd_pair2));
-    ThreadArg thread_arg;
-    thread_arg.first_read_fd = fd_pair1[0];
-    thread_arg.last_write_fd = fd_pair2[1];
-    thread_arg.middle_pipe_count = PIPE_COUNT;
-    int writer = fd_pair1[1];
-    int reader = fd_pair2[0];
+    for (bool use_new_callback : {true, false}) {
+        fdevent_reset();
+        const size_t PIPE_COUNT = 10;
+        const size_t MESSAGE_LOOP_COUNT = 100;
+        const std::string MESSAGE = "fdevent_test";
+        int fd_pair1[2];
+        int fd_pair2[2];
+        ASSERT_EQ(0, adb_socketpair(fd_pair1));
+        ASSERT_EQ(0, adb_socketpair(fd_pair2));
+        ThreadArg thread_arg;
+        thread_arg.first_read_fd = fd_pair1[0];
+        thread_arg.last_write_fd = fd_pair2[1];
+        thread_arg.middle_pipe_count = PIPE_COUNT;
+        int writer = fd_pair1[1];
+        int reader = fd_pair2[0];
 
-    PrepareThread();
+        PrepareThread();
 
-    std::vector<std::unique_ptr<FdHandler>> fd_handlers;
-    fdevent_run_on_main_thread([&thread_arg, &fd_handlers]() {
-        std::vector<int> read_fds;
-        std::vector<int> write_fds;
+        std::vector<std::unique_ptr<FdHandler>> fd_handlers;
+        fdevent_run_on_main_thread([&thread_arg, &fd_handlers, use_new_callback]() {
+            std::vector<int> read_fds;
+            std::vector<int> write_fds;
 
-        read_fds.push_back(thread_arg.first_read_fd);
-        for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) {
-            int fds[2];
-            ASSERT_EQ(0, adb_socketpair(fds));
-            read_fds.push_back(fds[0]);
-            write_fds.push_back(fds[1]);
+            read_fds.push_back(thread_arg.first_read_fd);
+            for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) {
+                int fds[2];
+                ASSERT_EQ(0, adb_socketpair(fds));
+                read_fds.push_back(fds[0]);
+                write_fds.push_back(fds[1]);
+            }
+            write_fds.push_back(thread_arg.last_write_fd);
+
+            for (size_t i = 0; i < read_fds.size(); ++i) {
+                fd_handlers.push_back(
+                        std::make_unique<FdHandler>(read_fds[i], write_fds[i], use_new_callback));
+            }
+        });
+        WaitForFdeventLoop();
+
+        for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
+            std::string read_buffer = MESSAGE;
+            std::string write_buffer(MESSAGE.size(), 'a');
+            ASSERT_TRUE(WriteFdExactly(writer, read_buffer.c_str(), read_buffer.size()));
+            ASSERT_TRUE(ReadFdExactly(reader, &write_buffer[0], write_buffer.size()));
+            ASSERT_EQ(read_buffer, write_buffer);
         }
-        write_fds.push_back(thread_arg.last_write_fd);
 
-        for (size_t i = 0; i < read_fds.size(); ++i) {
-            fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i]));
-        }
-    });
-    WaitForFdeventLoop();
+        fdevent_run_on_main_thread([&fd_handlers]() { fd_handlers.clear(); });
+        WaitForFdeventLoop();
 
-    for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
-        std::string read_buffer = MESSAGE;
-        std::string write_buffer(MESSAGE.size(), 'a');
-        ASSERT_TRUE(WriteFdExactly(writer, read_buffer.c_str(), read_buffer.size()));
-        ASSERT_TRUE(ReadFdExactly(reader, &write_buffer[0], write_buffer.size()));
-        ASSERT_EQ(read_buffer, write_buffer);
+        TerminateThread();
+        ASSERT_EQ(0, adb_close(writer));
+        ASSERT_EQ(0, adb_close(reader));
     }
-
-    fdevent_run_on_main_thread([&fd_handlers]() { fd_handlers.clear(); });
-    WaitForFdeventLoop();
-
-    TerminateThread();
-    ASSERT_EQ(0, adb_close(writer));
-    ASSERT_EQ(0, adb_close(reader));
 }
 
 struct InvalidFdArg {