Bluetooth: Watch multiple FDs with AsyncFdWatcher

Test: AsyncFdWatcherSocketTest.WatchTwoFileDescriptors
Change-Id: I2444515541e9be01720341c693012a580b3cb04f
diff --git a/bluetooth/1.0/default/async_fd_watcher.cc b/bluetooth/1.0/default/async_fd_watcher.cc
index 287d007..2f23a69 100644
--- a/bluetooth/1.0/default/async_fd_watcher.cc
+++ b/bluetooth/1.0/default/async_fd_watcher.cc
@@ -19,6 +19,7 @@
 #include <algorithm>
 #include <atomic>
 #include <condition_variable>
+#include <map>
 #include <mutex>
 #include <thread>
 #include <vector>
@@ -26,6 +27,8 @@
 #include "sys/select.h"
 #include "unistd.h"
 
+static const int INVALID_FD = -1;
+
 namespace android {
 namespace hardware {
 namespace bluetooth {
@@ -36,8 +39,7 @@
   // Add file descriptor and callback
   {
     std::unique_lock<std::mutex> guard(internal_mutex_);
-    read_fd_ = file_descriptor;
-    cb_ = on_read_fd_ready_callback;
+    watched_fds_[file_descriptor] = on_read_fd_ready_callback;
   }
 
   // Start the thread if not started yet
@@ -58,7 +60,7 @@
   return 0;
 }
 
-void AsyncFdWatcher::StopWatchingFileDescriptor() { stopThread(); }
+void AsyncFdWatcher::StopWatchingFileDescriptors() { stopThread(); }
 
 AsyncFdWatcher::~AsyncFdWatcher() {}
 
@@ -90,8 +92,7 @@
 
   {
     std::unique_lock<std::mutex> guard(internal_mutex_);
-    cb_ = nullptr;
-    read_fd_ = -1;
+    watched_fds_.clear();
   }
 
   {
@@ -115,7 +116,11 @@
     fd_set read_fds;
     FD_ZERO(&read_fds);
     FD_SET(notification_listen_fd_, &read_fds);
-    FD_SET(read_fd_, &read_fds);
+    int max_read_fd = INVALID_FD;
+    for (auto& it : watched_fds_) {
+      FD_SET(it.first, &read_fds);
+      max_read_fd = std::max(max_read_fd, it.first);
+    }
 
     struct timeval timeout;
     struct timeval* timeout_ptr = NULL;
@@ -126,7 +131,7 @@
     }
 
     // Wait until there is data available to read on some FD.
-    int nfds = std::max(notification_listen_fd_, read_fd_);
+    int nfds = std::max(notification_listen_fd_, max_read_fd);
     int retval = select(nfds + 1, &read_fds, NULL, NULL, timeout_ptr);
 
     // There was some error.
@@ -153,10 +158,21 @@
       continue;
     }
 
-    // Invoke the data ready callback if appropriate.
-    if (FD_ISSET(read_fd_, &read_fds)) {
+    // Invoke the data ready callbacks if appropriate.
+    std::vector<decltype(watched_fds_)::value_type> saved_callbacks;
+    {
       std::unique_lock<std::mutex> guard(internal_mutex_);
-      if (cb_) cb_(read_fd_);
+      for (auto& it : watched_fds_) {
+        if (FD_ISSET(it.first, &read_fds)) {
+          saved_callbacks.push_back(it);
+        }
+      }
+    }
+
+    for (auto& it : saved_callbacks) {
+      if (it.second) {
+        it.second(it.first);
+      }
     }
   }
 }
diff --git a/bluetooth/1.0/default/async_fd_watcher.h b/bluetooth/1.0/default/async_fd_watcher.h
index 3f7ff54..358cbc3 100644
--- a/bluetooth/1.0/default/async_fd_watcher.h
+++ b/bluetooth/1.0/default/async_fd_watcher.h
@@ -16,6 +16,7 @@
 
 #pragma once
 
+#include <map>
 #include <mutex>
 #include <thread>
 
@@ -36,7 +37,7 @@
                                  const ReadCallback& on_read_fd_ready_callback);
   int ConfigureTimeout(const std::chrono::milliseconds timeout,
                        const TimeoutCallback& on_timeout_callback);
-  void StopWatchingFileDescriptor();
+  void StopWatchingFileDescriptors();
 
  private:
   AsyncFdWatcher(const AsyncFdWatcher&) = delete;
@@ -52,10 +53,9 @@
   std::mutex internal_mutex_;
   std::mutex timeout_mutex_;
 
-  int read_fd_;
+  std::map<int, ReadCallback> watched_fds_;
   int notification_listen_fd_;
   int notification_write_fd_;
-  ReadCallback cb_;
   TimeoutCallback timeout_cb_;
   std::chrono::milliseconds timeout_ms_;
 };
diff --git a/bluetooth/1.0/default/test/async_fd_watcher_unittest.cc b/bluetooth/1.0/default/test/async_fd_watcher_unittest.cc
index a7f5bda..b0c533c 100644
--- a/bluetooth/1.0/default/test/async_fd_watcher_unittest.cc
+++ b/bluetooth/1.0/default/test/async_fd_watcher_unittest.cc
@@ -14,6 +14,8 @@
 // limitations under the License.
 //
 
+#define LOG_TAG "async_fd_watcher_unittest"
+
 #include "async_fd_watcher.h"
 #include <gtest/gtest.h>
 #include <cstdint>
@@ -122,8 +124,8 @@
   }
 
   void CleanUpServer() {
-    async_fd_watcher_.StopWatchingFileDescriptor();
-    conn_watcher_.StopWatchingFileDescriptor();
+    async_fd_watcher_.StopWatchingFileDescriptors();
+    conn_watcher_.StopWatchingFileDescriptors();
     close(socket_fd_);
   }
 
@@ -211,7 +213,7 @@
   });
 
   ConnectClient();
-  conn_watcher.StopWatchingFileDescriptor();
+  conn_watcher.StopWatchingFileDescriptors();
   close(socket_fd);
 }
 
@@ -233,7 +235,7 @@
   EXPECT_FALSE(timed_out);
   sleep(1);
   EXPECT_TRUE(timed_out);
-  conn_watcher.StopWatchingFileDescriptor();
+  conn_watcher.StopWatchingFileDescriptors();
   close(socket_fd);
 }
 
@@ -265,10 +267,64 @@
   sleep(1);
   EXPECT_TRUE(timed_out);
   EXPECT_TRUE(timed_out2);
-  conn_watcher.StopWatchingFileDescriptor();
+  conn_watcher.StopWatchingFileDescriptors();
   close(socket_fd);
 }
 
+// Use a single AsyncFdWatcher to watch two file descriptors.
+TEST_F(AsyncFdWatcherSocketTest, WatchTwoFileDescriptors) {
+  int sockfd[2];
+  socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd);
+  bool cb1_called = false;
+  bool* cb1_called_ptr = &cb1_called;
+  bool cb2_called = false;
+  bool* cb2_called_ptr = &cb2_called;
+
+  AsyncFdWatcher watcher;
+  watcher.WatchFdForNonBlockingReads(sockfd[0], [cb1_called_ptr](int fd) {
+    char read_buf[1] = {0};
+    int n = TEMP_FAILURE_RETRY(read(fd, read_buf, sizeof(read_buf)));
+    ASSERT_TRUE(n == sizeof(read_buf));
+    ASSERT_TRUE(read_buf[0] == '1');
+    *cb1_called_ptr = true;
+  });
+
+  watcher.WatchFdForNonBlockingReads(sockfd[1], [cb2_called_ptr](int fd) {
+    char read_buf[1] = {0};
+    int n = TEMP_FAILURE_RETRY(read(fd, read_buf, sizeof(read_buf)));
+    ASSERT_TRUE(n == sizeof(read_buf));
+    ASSERT_TRUE(read_buf[0] == '2');
+    *cb2_called_ptr = true;
+  });
+
+  // Fail if the test doesn't pass within 3 seconds
+  watcher.ConfigureTimeout(std::chrono::seconds(3), [this]() {
+    bool connection_timeout = true;
+    ASSERT_FALSE(connection_timeout);
+  });
+
+  EXPECT_FALSE(cb1_called);
+  EXPECT_FALSE(cb2_called);
+
+  char one_buf[1] = {'1'};
+  TEMP_FAILURE_RETRY(write(sockfd[1], one_buf, sizeof(one_buf)));
+
+  sleep(1);
+
+  EXPECT_TRUE(cb1_called);
+  EXPECT_FALSE(cb2_called);
+
+  char two_buf[1] = {'2'};
+  TEMP_FAILURE_RETRY(write(sockfd[0], two_buf, sizeof(two_buf)));
+
+  sleep(1);
+
+  EXPECT_TRUE(cb1_called);
+  EXPECT_TRUE(cb2_called);
+
+  watcher.StopWatchingFileDescriptors();
+}
+
 // Use two AsyncFdWatchers to set up a server socket.
 TEST_F(AsyncFdWatcherSocketTest, ClientServer) {
   ConfigureServer();
diff --git a/bluetooth/1.0/default/vendor_interface.cc b/bluetooth/1.0/default/vendor_interface.cc
index 3878129..2c55d1c 100644
--- a/bluetooth/1.0/default/vendor_interface.cc
+++ b/bluetooth/1.0/default/vendor_interface.cc
@@ -274,7 +274,7 @@
 }
 
 void VendorInterface::Close() {
-  fd_watcher_.StopWatchingFileDescriptor();
+  fd_watcher_.StopWatchingFileDescriptors();
 
   if (lib_interface_ != nullptr) {
     bt_vendor_lpm_mode_t mode = BT_VND_LPM_DISABLE;