Merge changes Iff1ec14b,I32123c51,I517dbcba,Ic215c7fe
* changes:
adb: try harder to fill our test sockets.
adb: don't set has_write_error on success.
adb: move ownership of the fdevent thread into FdeventTest.
adb: guarantee that fdevent_run_on_main_thread happens last.
diff --git a/adb/fdevent.cpp b/adb/fdevent.cpp
index 9776c1b..cf441cf 100644
--- a/adb/fdevent.cpp
+++ b/adb/fdevent.cpp
@@ -75,6 +75,7 @@
static bool main_thread_valid;
static uint64_t main_thread_id;
+static bool run_needs_flush = false;
static auto& run_queue_notify_fd = *new unique_fd();
static auto& run_queue_mutex = *new std::mutex();
static auto& run_queue GUARDED_BY(run_queue_mutex) = *new std::deque<std::function<void()>>();
@@ -317,7 +318,8 @@
PLOG(FATAL) << "failed to empty run queue notify fd";
}
- fdevent_run_flush();
+ // Mark that we need to flush, and then run it at the end of fdevent_loop.
+ run_needs_flush = true;
}
static void fdevent_run_setup() {
@@ -378,6 +380,11 @@
g_pending_list.pop_front();
fdevent_call_fdfunc(fde);
}
+
+ if (run_needs_flush) {
+ fdevent_run_flush();
+ run_needs_flush = false;
+ }
}
}
diff --git a/adb/fdevent_test.cpp b/adb/fdevent_test.cpp
index e3d5a35..2f0ff18 100644
--- a/adb/fdevent_test.cpp
+++ b/adb/fdevent_test.cpp
@@ -80,30 +80,7 @@
TEST_F(FdeventTest, fdevent_terminate) {
PrepareThread();
-
- std::thread thread(fdevent_loop);
- TerminateThread(thread);
-}
-
-static void FdEventThreadFunc(ThreadArg* arg) {
- std::vector<int> read_fds;
- std::vector<int> write_fds;
-
- read_fds.push_back(arg->first_read_fd);
- for (size_t i = 0; i < arg->middle_pipe_count; ++i) {
- int fds[2];
- ASSERT_EQ(0, adb_socketpair(fds));
- read_fds.push_back(fds[0]);
- write_fds.push_back(fds[1]);
- }
- write_fds.push_back(arg->last_write_fd);
-
- std::vector<std::unique_ptr<FdHandler>> fd_handlers;
- for (size_t i = 0; i < read_fds.size(); ++i) {
- fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i]));
- }
-
- fdevent_loop();
+ TerminateThread();
}
TEST_F(FdeventTest, smoke) {
@@ -122,7 +99,26 @@
int reader = fd_pair2[0];
PrepareThread();
- std::thread thread(FdEventThreadFunc, &thread_arg);
+
+ std::vector<std::unique_ptr<FdHandler>> fd_handlers;
+ fdevent_run_on_main_thread([&thread_arg, &fd_handlers]() {
+ std::vector<int> read_fds;
+ std::vector<int> write_fds;
+
+ read_fds.push_back(thread_arg.first_read_fd);
+ for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) {
+ int fds[2];
+ ASSERT_EQ(0, adb_socketpair(fds));
+ read_fds.push_back(fds[0]);
+ write_fds.push_back(fds[1]);
+ }
+ write_fds.push_back(thread_arg.last_write_fd);
+
+ for (size_t i = 0; i < read_fds.size(); ++i) {
+ fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i]));
+ }
+ });
+ WaitForFdeventLoop();
for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
std::string read_buffer = MESSAGE;
@@ -132,7 +128,10 @@
ASSERT_EQ(read_buffer, write_buffer);
}
- TerminateThread(thread);
+ fdevent_run_on_main_thread([&fd_handlers]() { fd_handlers.clear(); });
+ WaitForFdeventLoop();
+
+ TerminateThread();
ASSERT_EQ(0, adb_close(writer));
ASSERT_EQ(0, adb_close(reader));
}
@@ -143,7 +142,7 @@
size_t* happened_event_count;
};
-static void InvalidFdEventCallback(int fd, unsigned events, void* userdata) {
+static void InvalidFdEventCallback(int, unsigned events, void* userdata) {
InvalidFdArg* arg = reinterpret_cast<InvalidFdArg*>(userdata);
ASSERT_EQ(arg->expected_events, events);
fdevent_remove(&arg->fde);
@@ -179,7 +178,6 @@
std::vector<int> vec;
PrepareThread();
- std::thread thread(fdevent_loop);
// Block the main thread for a long time while we queue our callbacks.
fdevent_run_on_main_thread([]() {
@@ -194,7 +192,7 @@
});
}
- TerminateThread(thread);
+ TerminateThread();
ASSERT_EQ(1000000u, vec.size());
for (int i = 0; i < 1000000; ++i) {
@@ -218,11 +216,8 @@
std::vector<int> vec;
PrepareThread();
- std::thread thread(fdevent_loop);
-
fdevent_run_on_main_thread(make_appender(&vec, 0));
-
- TerminateThread(thread);
+ TerminateThread();
ASSERT_EQ(100u, vec.size());
for (int i = 0; i < 100; ++i) {
diff --git a/adb/fdevent_test.h b/adb/fdevent_test.h
index 1a2d41c..5a417e0 100644
--- a/adb/fdevent_test.h
+++ b/adb/fdevent_test.h
@@ -16,10 +16,31 @@
#include <gtest/gtest.h>
+#include <condition_variable>
+#include <mutex>
#include <thread>
#include "socket.h"
#include "sysdeps.h"
+#include "sysdeps/chrono.h"
+
+static void WaitForFdeventLoop() {
+ // Sleep for a bit to make sure that network events have propagated.
+ std::this_thread::sleep_for(100ms);
+
+ // fdevent_run_on_main_thread has a guaranteed ordering, and is guaranteed to happen after
+ // socket events, so as soon as our function is called, we know that we've processed all
+ // previous events.
+ std::mutex mutex;
+ std::condition_variable cv;
+ std::unique_lock<std::mutex> lock(mutex);
+ fdevent_run_on_main_thread([&]() {
+ mutex.lock();
+ mutex.unlock();
+ cv.notify_one();
+ });
+ cv.wait(lock);
+}
class FdeventTest : public ::testing::Test {
protected:
@@ -49,6 +70,9 @@
}
dummy_socket->ready(dummy_socket);
dummy = dummy_fds[0];
+
+ thread_ = std::thread([]() { fdevent_loop(); });
+ WaitForFdeventLoop();
}
size_t GetAdditionalLocalSocketCount() {
@@ -56,10 +80,12 @@
return 2;
}
- void TerminateThread(std::thread& thread) {
+ void TerminateThread() {
fdevent_terminate_loop();
ASSERT_TRUE(WriteFdExactly(dummy, "", 1));
- thread.join();
+ thread_.join();
ASSERT_EQ(0, adb_close(dummy));
}
+
+ std::thread thread_;
};
diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp
index f587fdb..6c4a8b2 100644
--- a/adb/socket_test.cpp
+++ b/adb/socket_test.cpp
@@ -42,10 +42,6 @@
class LocalSocketTest : public FdeventTest {};
-static void WaitForFdeventLoop() {
- std::this_thread::sleep_for(100ms);
-}
-
TEST_F(LocalSocketTest, smoke) {
// Join two socketpairs with a chain of intermediate socketpairs.
int first[2];
@@ -86,7 +82,6 @@
connect(prev_tail, end);
PrepareThread();
- std::thread thread(fdevent_loop);
for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
std::string read_buffer = MESSAGE;
@@ -102,7 +97,7 @@
// Wait until the local sockets are closed.
WaitForFdeventLoop();
ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
- TerminateThread(thread);
+ TerminateThread();
}
struct CloseWithPacketArg {
@@ -111,24 +106,39 @@
int cause_close_fd;
};
-static void CloseWithPacketThreadFunc(CloseWithPacketArg* arg) {
- asocket* s = create_local_socket(arg->socket_fd);
- ASSERT_TRUE(s != nullptr);
- arg->bytes_written = 0;
+static void CreateCloser(CloseWithPacketArg* arg) {
+ fdevent_run_on_main_thread([arg]() {
+ asocket* s = create_local_socket(arg->socket_fd);
+ ASSERT_TRUE(s != nullptr);
+ arg->bytes_written = 0;
- std::string data;
- data.resize(MAX_PAYLOAD);
- arg->bytes_written += data.size();
- int ret = s->enqueue(s, std::move(data));
- ASSERT_EQ(1, ret);
+ // On platforms that implement sockets via underlying sockets (e.g. Wine),
+ // a socket can appear to be full, and then become available for writes
+ // again without read being called on the other end. Loop and sleep after
+ // each write to give the underlying implementation time to flush.
+ bool socket_filled = false;
+ for (int i = 0; i < 128; ++i) {
+ std::string data;
+ data.resize(MAX_PAYLOAD);
+ arg->bytes_written += data.size();
+ int ret = s->enqueue(s, std::move(data));
+ if (ret == 1) {
+ socket_filled = true;
+ break;
+ }
+ ASSERT_NE(-1, ret);
- asocket* cause_close_s = create_local_socket(arg->cause_close_fd);
- ASSERT_TRUE(cause_close_s != nullptr);
- cause_close_s->peer = s;
- s->peer = cause_close_s;
- cause_close_s->ready(cause_close_s);
+ std::this_thread::sleep_for(250ms);
+ }
+ ASSERT_TRUE(socket_filled);
- fdevent_loop();
+ asocket* cause_close_s = create_local_socket(arg->cause_close_fd);
+ ASSERT_TRUE(cause_close_s != nullptr);
+ cause_close_s->peer = s;
+ s->peer = cause_close_s;
+ cause_close_s->ready(cause_close_s);
+ });
+ WaitForFdeventLoop();
}
// This test checks if we can close local socket in the following situation:
@@ -145,9 +155,8 @@
arg.cause_close_fd = cause_close_fd[1];
PrepareThread();
- std::thread thread(CloseWithPacketThreadFunc, &arg);
+ CreateCloser(&arg);
- WaitForFdeventLoop();
ASSERT_EQ(0, adb_close(cause_close_fd[0]));
WaitForFdeventLoop();
@@ -156,7 +165,7 @@
WaitForFdeventLoop();
ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
- TerminateThread(thread);
+ TerminateThread();
}
// This test checks if we can read packets from a closing local socket.
@@ -170,7 +179,7 @@
arg.cause_close_fd = cause_close_fd[1];
PrepareThread();
- std::thread thread(CloseWithPacketThreadFunc, &arg);
+ CreateCloser(&arg);
WaitForFdeventLoop();
ASSERT_EQ(0, adb_close(cause_close_fd[0]));
@@ -186,7 +195,7 @@
WaitForFdeventLoop();
ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
- TerminateThread(thread);
+ TerminateThread();
}
// This test checks if we can close local socket in the following situation:
@@ -203,7 +212,7 @@
arg.cause_close_fd = cause_close_fd[1];
PrepareThread();
- std::thread thread(CloseWithPacketThreadFunc, &arg);
+ CreateCloser(&arg);
WaitForFdeventLoop();
EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count());
@@ -211,7 +220,7 @@
WaitForFdeventLoop();
ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
- TerminateThread(thread);
+ TerminateThread();
}
// Ensure that if we fail to write output to an fd, we will still flush data coming from it.
@@ -231,7 +240,6 @@
tail->ready(tail);
PrepareThread();
- std::thread thread(fdevent_loop);
EXPECT_TRUE(WriteFdExactly(head_fd[0], "foo", 3));
@@ -249,7 +257,7 @@
WaitForFdeventLoop();
ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
- TerminateThread(thread);
+ TerminateThread();
}
#if defined(__linux__)
@@ -258,21 +266,10 @@
std::string error;
int fd = network_loopback_client(5038, SOCK_STREAM, &error);
ASSERT_GE(fd, 0) << error;
- std::this_thread::sleep_for(200ms);
+ std::this_thread::sleep_for(1s);
ASSERT_EQ(0, adb_close(fd));
}
-struct CloseRdHupSocketArg {
- int socket_fd;
-};
-
-static void CloseRdHupSocketThreadFunc(CloseRdHupSocketArg* arg) {
- asocket* s = create_local_socket(arg->socket_fd);
- ASSERT_TRUE(s != nullptr);
-
- fdevent_loop();
-}
-
// This test checks if we can close sockets in CLOSE_WAIT state.
TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) {
std::string error;
@@ -283,11 +280,13 @@
int accept_fd = adb_socket_accept(listen_fd, nullptr, nullptr);
ASSERT_GE(accept_fd, 0);
- CloseRdHupSocketArg arg;
- arg.socket_fd = accept_fd;
PrepareThread();
- std::thread thread(CloseRdHupSocketThreadFunc, &arg);
+
+ fdevent_run_on_main_thread([accept_fd]() {
+ asocket* s = create_local_socket(accept_fd);
+ ASSERT_TRUE(s != nullptr);
+ });
WaitForFdeventLoop();
EXPECT_EQ(1u + GetAdditionalLocalSocketCount(), fdevent_installed_count());
@@ -297,7 +296,7 @@
WaitForFdeventLoop();
ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
- TerminateThread(thread);
+ TerminateThread();
}
#endif // defined(__linux__)
diff --git a/adb/sockets.cpp b/adb/sockets.cpp
index e05a3db..0887e6f 100644
--- a/adb/sockets.cpp
+++ b/adb/sockets.cpp
@@ -126,12 +126,12 @@
} else if (rc == -1 && errno == EAGAIN) {
fdevent_add(&s->fde, FDE_WRITE);
return SocketFlushResult::TryAgain;
+ } else {
+ // We failed to write, but it's possible that we can still read from the socket.
+ // Give that a try before giving up.
+ s->has_write_error = true;
+ break;
}
-
- // We failed to write, but it's possible that we can still read from the socket.
- // Give that a try before giving up.
- s->has_write_error = true;
- break;
}
// If we sent the last packet of a closing socket, we can now destroy it.