Bluetooth: Watch multiple FDs with AsyncFdWatcher
Test: AsyncFdWatcherSocketTest.WatchTwoFileDescriptors
Change-Id: I2444515541e9be01720341c693012a580b3cb04f
diff --git a/bluetooth/1.0/default/async_fd_watcher.cc b/bluetooth/1.0/default/async_fd_watcher.cc
index 287d007..2f23a69 100644
--- a/bluetooth/1.0/default/async_fd_watcher.cc
+++ b/bluetooth/1.0/default/async_fd_watcher.cc
@@ -19,6 +19,7 @@
#include <algorithm>
#include <atomic>
#include <condition_variable>
+#include <map>
#include <mutex>
#include <thread>
#include <vector>
@@ -26,6 +27,8 @@
#include "sys/select.h"
#include "unistd.h"
+static const int INVALID_FD = -1;
+
namespace android {
namespace hardware {
namespace bluetooth {
@@ -36,8 +39,7 @@
// Add file descriptor and callback
{
std::unique_lock<std::mutex> guard(internal_mutex_);
- read_fd_ = file_descriptor;
- cb_ = on_read_fd_ready_callback;
+ watched_fds_[file_descriptor] = on_read_fd_ready_callback;
}
// Start the thread if not started yet
@@ -58,7 +60,7 @@
return 0;
}
-void AsyncFdWatcher::StopWatchingFileDescriptor() { stopThread(); }
+void AsyncFdWatcher::StopWatchingFileDescriptors() { stopThread(); }
AsyncFdWatcher::~AsyncFdWatcher() {}
@@ -90,8 +92,7 @@
{
std::unique_lock<std::mutex> guard(internal_mutex_);
- cb_ = nullptr;
- read_fd_ = -1;
+ watched_fds_.clear();
}
{
@@ -115,7 +116,11 @@
fd_set read_fds;
FD_ZERO(&read_fds);
FD_SET(notification_listen_fd_, &read_fds);
- FD_SET(read_fd_, &read_fds);
+ int max_read_fd = INVALID_FD;
+ for (auto& it : watched_fds_) {
+ FD_SET(it.first, &read_fds);
+ max_read_fd = std::max(max_read_fd, it.first);
+ }
struct timeval timeout;
struct timeval* timeout_ptr = NULL;
@@ -126,7 +131,7 @@
}
// Wait until there is data available to read on some FD.
- int nfds = std::max(notification_listen_fd_, read_fd_);
+ int nfds = std::max(notification_listen_fd_, max_read_fd);
int retval = select(nfds + 1, &read_fds, NULL, NULL, timeout_ptr);
// There was some error.
@@ -153,10 +158,21 @@
continue;
}
- // Invoke the data ready callback if appropriate.
- if (FD_ISSET(read_fd_, &read_fds)) {
+ // Invoke the data ready callbacks if appropriate.
+ std::vector<decltype(watched_fds_)::value_type> saved_callbacks;
+ {
std::unique_lock<std::mutex> guard(internal_mutex_);
- if (cb_) cb_(read_fd_);
+ for (auto& it : watched_fds_) {
+ if (FD_ISSET(it.first, &read_fds)) {
+ saved_callbacks.push_back(it);
+ }
+ }
+ }
+
+ for (auto& it : saved_callbacks) {
+ if (it.second) {
+ it.second(it.first);
+ }
}
}
}
diff --git a/bluetooth/1.0/default/async_fd_watcher.h b/bluetooth/1.0/default/async_fd_watcher.h
index 3f7ff54..358cbc3 100644
--- a/bluetooth/1.0/default/async_fd_watcher.h
+++ b/bluetooth/1.0/default/async_fd_watcher.h
@@ -16,6 +16,7 @@
#pragma once
+#include <map>
#include <mutex>
#include <thread>
@@ -36,7 +37,7 @@
const ReadCallback& on_read_fd_ready_callback);
int ConfigureTimeout(const std::chrono::milliseconds timeout,
const TimeoutCallback& on_timeout_callback);
- void StopWatchingFileDescriptor();
+ void StopWatchingFileDescriptors();
private:
AsyncFdWatcher(const AsyncFdWatcher&) = delete;
@@ -52,10 +53,9 @@
std::mutex internal_mutex_;
std::mutex timeout_mutex_;
- int read_fd_;
+ std::map<int, ReadCallback> watched_fds_;
int notification_listen_fd_;
int notification_write_fd_;
- ReadCallback cb_;
TimeoutCallback timeout_cb_;
std::chrono::milliseconds timeout_ms_;
};
diff --git a/bluetooth/1.0/default/test/async_fd_watcher_unittest.cc b/bluetooth/1.0/default/test/async_fd_watcher_unittest.cc
index a7f5bda..b0c533c 100644
--- a/bluetooth/1.0/default/test/async_fd_watcher_unittest.cc
+++ b/bluetooth/1.0/default/test/async_fd_watcher_unittest.cc
@@ -14,6 +14,8 @@
// limitations under the License.
//
+#define LOG_TAG "async_fd_watcher_unittest"
+
#include "async_fd_watcher.h"
#include <gtest/gtest.h>
#include <cstdint>
@@ -122,8 +124,8 @@
}
void CleanUpServer() {
- async_fd_watcher_.StopWatchingFileDescriptor();
- conn_watcher_.StopWatchingFileDescriptor();
+ async_fd_watcher_.StopWatchingFileDescriptors();
+ conn_watcher_.StopWatchingFileDescriptors();
close(socket_fd_);
}
@@ -211,7 +213,7 @@
});
ConnectClient();
- conn_watcher.StopWatchingFileDescriptor();
+ conn_watcher.StopWatchingFileDescriptors();
close(socket_fd);
}
@@ -233,7 +235,7 @@
EXPECT_FALSE(timed_out);
sleep(1);
EXPECT_TRUE(timed_out);
- conn_watcher.StopWatchingFileDescriptor();
+ conn_watcher.StopWatchingFileDescriptors();
close(socket_fd);
}
@@ -265,10 +267,64 @@
sleep(1);
EXPECT_TRUE(timed_out);
EXPECT_TRUE(timed_out2);
- conn_watcher.StopWatchingFileDescriptor();
+ conn_watcher.StopWatchingFileDescriptors();
close(socket_fd);
}
+// Use a single AsyncFdWatcher to watch two file descriptors.
+TEST_F(AsyncFdWatcherSocketTest, WatchTwoFileDescriptors) {
+ int sockfd[2];
+ socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd);
+ bool cb1_called = false;
+ bool* cb1_called_ptr = &cb1_called;
+ bool cb2_called = false;
+ bool* cb2_called_ptr = &cb2_called;
+
+ AsyncFdWatcher watcher;
+ watcher.WatchFdForNonBlockingReads(sockfd[0], [cb1_called_ptr](int fd) {
+ char read_buf[1] = {0};
+ int n = TEMP_FAILURE_RETRY(read(fd, read_buf, sizeof(read_buf)));
+ ASSERT_TRUE(n == sizeof(read_buf));
+ ASSERT_TRUE(read_buf[0] == '1');
+ *cb1_called_ptr = true;
+ });
+
+ watcher.WatchFdForNonBlockingReads(sockfd[1], [cb2_called_ptr](int fd) {
+ char read_buf[1] = {0};
+ int n = TEMP_FAILURE_RETRY(read(fd, read_buf, sizeof(read_buf)));
+ ASSERT_TRUE(n == sizeof(read_buf));
+ ASSERT_TRUE(read_buf[0] == '2');
+ *cb2_called_ptr = true;
+ });
+
+ // Fail if the test doesn't pass within 3 seconds
+ watcher.ConfigureTimeout(std::chrono::seconds(3), [this]() {
+ bool connection_timeout = true;
+ ASSERT_FALSE(connection_timeout);
+ });
+
+ EXPECT_FALSE(cb1_called);
+ EXPECT_FALSE(cb2_called);
+
+ char one_buf[1] = {'1'};
+ TEMP_FAILURE_RETRY(write(sockfd[1], one_buf, sizeof(one_buf)));
+
+ sleep(1);
+
+ EXPECT_TRUE(cb1_called);
+ EXPECT_FALSE(cb2_called);
+
+ char two_buf[1] = {'2'};
+ TEMP_FAILURE_RETRY(write(sockfd[0], two_buf, sizeof(two_buf)));
+
+ sleep(1);
+
+ EXPECT_TRUE(cb1_called);
+ EXPECT_TRUE(cb2_called);
+
+ watcher.StopWatchingFileDescriptors();
+}
+
// Use two AsyncFdWatchers to set up a server socket.
TEST_F(AsyncFdWatcherSocketTest, ClientServer) {
ConfigureServer();
diff --git a/bluetooth/1.0/default/vendor_interface.cc b/bluetooth/1.0/default/vendor_interface.cc
index 3878129..2c55d1c 100644
--- a/bluetooth/1.0/default/vendor_interface.cc
+++ b/bluetooth/1.0/default/vendor_interface.cc
@@ -274,7 +274,7 @@
}
void VendorInterface::Close() {
- fd_watcher_.StopWatchingFileDescriptor();
+ fd_watcher_.StopWatchingFileDescriptors();
if (lib_interface_ != nullptr) {
bt_vendor_lpm_mode_t mode = BT_VND_LPM_DISABLE;