adb: fdevent: move run queue to fdevent_context.
am: 95eef6b097
Change-Id: I131984a407220281364c016f832aef45e0f9a457
diff --git a/adb/fdevent/fdevent.cpp b/adb/fdevent/fdevent.cpp
index e80bb5a..c858f6b 100644
--- a/adb/fdevent/fdevent.cpp
+++ b/adb/fdevent/fdevent.cpp
@@ -49,6 +49,32 @@
state.c_str());
}
+void fdevent_context::Run(std::function<void()> fn) {
+ {
+ std::lock_guard<std::mutex> lock(run_queue_mutex_);
+ run_queue_.push_back(std::move(fn));
+ }
+
+ Interrupt();
+}
+
+void fdevent_context::FlushRunQueue() {
+ // We need to be careful around reentrancy here, since a function we call can queue up another
+ // function.
+ while (true) {
+ std::function<void()> fn;
+ {
+ std::lock_guard<std::mutex> lock(this->run_queue_mutex_);
+ if (this->run_queue_.empty()) {
+ break;
+ }
+ fn = this->run_queue_.front();
+ this->run_queue_.pop_front();
+ }
+ fn();
+ }
+}
+
static auto& g_ambient_fdevent_context =
*new std::unique_ptr<fdevent_context>(new fdevent_context_poll());
diff --git a/adb/fdevent/fdevent.h b/adb/fdevent/fdevent.h
index b46219c..5a2f2c6 100644
--- a/adb/fdevent/fdevent.h
+++ b/adb/fdevent/fdevent.h
@@ -21,10 +21,14 @@
#include <stdint.h>
#include <chrono>
+#include <deque>
#include <functional>
+#include <mutex>
#include <optional>
#include <variant>
+#include <android-base/thread_annotations.h>
+
#include "adb_unique_fd.h"
// Events that may be observed
@@ -48,6 +52,7 @@
std::string dump_fde(const fdevent* fde);
struct fdevent_context {
+ public:
virtual ~fdevent_context() = default;
// Allocate and initialize a new fdevent object.
@@ -68,17 +73,29 @@
virtual void SetTimeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout) = 0;
// Loop forever, handling events.
+ // Implementations should call FlushRunQueue on every iteration.
virtual void Loop() = 0;
// Assert that the caller is running on the context's main thread.
virtual void CheckMainThread() = 0;
// Queue an operation to be run on the main thread.
- virtual void Run(std::function<void()> fn) = 0;
+ void Run(std::function<void()> fn);
// Test-only functionality:
virtual void TerminateLoop() = 0;
virtual size_t InstalledCount() = 0;
+
+ protected:
+ // Interrupt the run loop.
+ virtual void Interrupt() = 0;
+
+ // Run all pending functions enqueued via Run().
+ void FlushRunQueue() EXCLUDES(run_queue_mutex_);
+
+ private:
+ std::mutex run_queue_mutex_;
+ std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_);
};
struct fdevent {
diff --git a/adb/fdevent/fdevent_poll.cpp b/adb/fdevent/fdevent_poll.cpp
index 6e016f6..7615859 100644
--- a/adb/fdevent/fdevent_poll.cpp
+++ b/adb/fdevent/fdevent_poll.cpp
@@ -50,6 +50,35 @@
#include "fdevent.h"
#include "sysdeps/chrono.h"
+static void fdevent_interrupt(int fd, unsigned, void*) {
+ char buf[BUFSIZ];
+ ssize_t rc = TEMP_FAILURE_RETRY(adb_read(fd, buf, sizeof(buf)));
+ if (rc == -1) {
+ PLOG(FATAL) << "failed to read from fdevent interrupt fd";
+ }
+}
+
+fdevent_context_poll::fdevent_context_poll() {
+ int s[2];
+ if (adb_socketpair(s) != 0) {
+ PLOG(FATAL) << "failed to create fdevent interrupt socketpair";
+ }
+
+ if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) {
+ PLOG(FATAL) << "failed to make fdevent interrupt socket nonblocking";
+ }
+
+ this->interrupt_fd_.reset(s[0]);
+ fdevent* fde = this->Create(unique_fd(s[1]), fdevent_interrupt, nullptr);
+ CHECK(fde != nullptr);
+ this->Add(fde, FDE_READ);
+}
+
+fdevent_context_poll::~fdevent_context_poll() {
+ main_thread_valid_ = false;
+ this->Destroy(this->interrupt_fde_);
+}
+
void fdevent_context_poll::CheckMainThread() {
if (main_thread_valid_) {
CHECK_EQ(main_thread_id_, android::base::GetThreadId());
@@ -291,79 +320,6 @@
fde->func);
}
-static void fdevent_run_flush(fdevent_context_poll* ctx) EXCLUDES(ctx->run_queue_mutex_) {
- // We need to be careful around reentrancy here, since a function we call can queue up another
- // function.
- while (true) {
- std::function<void()> fn;
- {
- std::lock_guard<std::mutex> lock(ctx->run_queue_mutex_);
- if (ctx->run_queue_.empty()) {
- break;
- }
- fn = ctx->run_queue_.front();
- ctx->run_queue_.pop_front();
- }
- fn();
- }
-}
-
-static void fdevent_run_func(int fd, unsigned ev, void* data) {
- CHECK_GE(fd, 0);
- CHECK(ev & FDE_READ);
-
- bool* run_needs_flush = static_cast<bool*>(data);
- char buf[1024];
-
- // Empty the fd.
- if (adb_read(fd, buf, sizeof(buf)) == -1) {
- PLOG(FATAL) << "failed to empty run queue notify fd";
- }
-
- // Mark that we need to flush, and then run it at the end of fdevent_loop.
- *run_needs_flush = true;
-}
-
-static void fdevent_run_setup(fdevent_context_poll* ctx) {
- {
- std::lock_guard<std::mutex> lock(ctx->run_queue_mutex_);
- CHECK(ctx->run_queue_notify_fd_.get() == -1);
- int s[2];
- if (adb_socketpair(s) != 0) {
- PLOG(FATAL) << "failed to create run queue notify socketpair";
- }
-
- if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) {
- PLOG(FATAL) << "failed to make run queue notify socket nonblocking";
- }
-
- ctx->run_queue_notify_fd_.reset(s[0]);
- fdevent* fde = ctx->Create(unique_fd(s[1]), fdevent_run_func, &ctx->run_needs_flush_);
- CHECK(fde != nullptr);
- ctx->Add(fde, FDE_READ);
- }
-
- fdevent_run_flush(ctx);
-}
-
-void fdevent_context_poll::Run(std::function<void()> fn) {
- std::lock_guard<std::mutex> lock(run_queue_mutex_);
- run_queue_.push_back(std::move(fn));
-
- // run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up.
- // In that case, rely on the setup code to flush the queue without a notification being needed.
- if (run_queue_notify_fd_ != -1) {
- int rc = adb_write(run_queue_notify_fd_.get(), "", 1);
-
- // It's possible that we get EAGAIN here, if lots of notifications came in while handling.
- if (rc == 0) {
- PLOG(FATAL) << "run queue notify fd was closed?";
- } else if (rc == -1 && errno != EAGAIN) {
- PLOG(FATAL) << "failed to write to run queue notify fd";
- }
- }
-}
-
static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) {
// Check to see if we're spinning because we forgot about an fdevent
// by keeping track of how long fdevents have been continuously pending.
@@ -424,7 +380,6 @@
void fdevent_context_poll::Loop() {
this->main_thread_id_ = android::base::GetThreadId();
this->main_thread_valid_ = true;
- fdevent_run_setup(this);
uint64_t cycle = 0;
while (true) {
@@ -444,17 +399,27 @@
fdevent_call_fdfunc(fde);
}
- if (run_needs_flush_) {
- fdevent_run_flush(this);
- run_needs_flush_ = false;
- }
+ this->FlushRunQueue();
}
}
void fdevent_context_poll::TerminateLoop() {
terminate_loop_ = true;
+ Interrupt();
}
size_t fdevent_context_poll::InstalledCount() {
- return poll_node_map_.size();
+ // We always have an installed fde for interrupt.
+ return poll_node_map_.size() - 1;
+}
+
+void fdevent_context_poll::Interrupt() {
+ int rc = adb_write(this->interrupt_fd_, "", 1);
+
+ // It's possible that we get EAGAIN here, if lots of notifications came in while handling.
+ if (rc == 0) {
+ PLOG(FATAL) << "fdevent interrupt fd was closed?";
+ } else if (rc == -1 && errno != EAGAIN) {
+ PLOG(FATAL) << "failed to write to fdevent interrupt fd";
+ }
}
diff --git a/adb/fdevent/fdevent_poll.h b/adb/fdevent/fdevent_poll.h
index f5720ca..1b505a7 100644
--- a/adb/fdevent/fdevent_poll.h
+++ b/adb/fdevent/fdevent_poll.h
@@ -25,6 +25,7 @@
#include <android-base/thread_annotations.h>
+#include "adb_unique_fd.h"
#include "fdevent.h"
struct PollNode {
@@ -44,7 +45,8 @@
};
struct fdevent_context_poll : public fdevent_context {
- virtual ~fdevent_context_poll() = default;
+ fdevent_context_poll();
+ virtual ~fdevent_context_poll();
virtual fdevent* Create(unique_fd fd, std::variant<fd_func, fd_func2> func, void* arg) final;
virtual unique_fd Destroy(fdevent* fde) final;
@@ -58,11 +60,13 @@
virtual void CheckMainThread() final;
- virtual void Run(std::function<void()> fn) final;
-
virtual void TerminateLoop() final;
virtual size_t InstalledCount() final;
+ protected:
+ virtual void Interrupt() final;
+
+ public:
// All operations to fdevent should happen only in the main thread.
// That's why we don't need a lock for fdevent.
std::unordered_map<int, PollNode> poll_node_map_;
@@ -71,10 +75,7 @@
uint64_t main_thread_id_ = 0;
uint64_t fdevent_id_ = 0;
- bool run_needs_flush_ = false;
- unique_fd run_queue_notify_fd_;
- std::mutex run_queue_mutex_;
- std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_);
-
+ unique_fd interrupt_fd_;
+ fdevent* interrupt_fde_ = nullptr;
std::atomic<bool> terminate_loop_ = false;
};
diff --git a/adb/fdevent/fdevent_test.h b/adb/fdevent/fdevent_test.h
index 24bce59..2139d0f 100644
--- a/adb/fdevent/fdevent_test.h
+++ b/adb/fdevent/fdevent_test.h
@@ -78,8 +78,8 @@
}
size_t GetAdditionalLocalSocketCount() {
- // dummy socket installed in PrepareThread() + fdevent_run_on_main_thread socket
- return 2;
+ // dummy socket installed in PrepareThread()
+ return 1;
}
void TerminateThread() {