[IPC 1] Add class to handle UNIX sockets and shared memory
Adds a foundational class to listen / connect / send / receive
data over a UNIX socket, either named or abstract.
The UnixSocket class also allows to move file descriptors across
process boundaries using POSIX SCM_RIGHTS.
Bug: 68854111
Test: ipc_unittests --gtest_filter=UnixSocketTest.*
Change-Id: I073ccdc29a327395943129a861620071b85d18af
diff --git a/BUILD.gn b/BUILD.gn
index fec540a..3211ebd 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -31,6 +31,7 @@
"//base:base_unittests",
"//ftrace_reader:ftrace_reader_integrationtests",
"//ftrace_reader:ftrace_reader_unittests",
+ "//ipc:ipc_unittests",
"//protozero:protozero_unittests",
"//tools/sanitizers_unittests",
"//tracing:tracing_benchmarks",
diff --git a/base/test/test_task_runner.cc b/base/test/test_task_runner.cc
index 7efd300..2b5c92b 100644
--- a/base/test/test_task_runner.cc
+++ b/base/test/test_task_runner.cc
@@ -19,6 +19,8 @@
#include <stdio.h>
#include <unistd.h>
+#include <chrono>
+
#include "base/logging.h"
// TODO: the current implementation quite hacky as it keeps waking up every 1ms.
@@ -26,54 +28,92 @@
namespace perfetto {
namespace base {
+namespace {
+constexpr int kFileDescriptorWatchTimeoutMs = 100;
+} // namespace
+
TestTaskRunner::TestTaskRunner() = default;
TestTaskRunner::~TestTaskRunner() = default;
void TestTaskRunner::Run() {
- while (RunUntilIdle()) {
+ for (;;)
+ RunUntilIdle();
+}
+
+void TestTaskRunner::RunUntilIdle() {
+ do {
+ QueueFileDescriptorWatches(/* blocking = */ task_queue_.empty());
+ } while (RunOneTask());
+}
+
+void TestTaskRunner::RunUntilCheckpoint(const std::string& checkpoint,
+ int timeout_ms) {
+ PERFETTO_DCHECK(checkpoints_.count(checkpoint) == 1);
+ auto tstart = std::chrono::system_clock::now();
+ auto deadline = tstart + std::chrono::milliseconds(timeout_ms);
+ while (!checkpoints_[checkpoint]) {
+ QueueFileDescriptorWatches(/* blocking = */ task_queue_.empty());
+ RunOneTask();
+ if (std::chrono::system_clock::now() > deadline) {
+ fprintf(stderr, "[TestTaskRunner] Failed to reach checkpoint \"%s\"\n",
+ checkpoint.c_str());
+ abort();
+ }
}
}
-bool TestTaskRunner::RunUntilIdle() {
- while (!task_queue_.empty()) {
- std::function<void()> closure = std::move(task_queue_.front());
- task_queue_.pop_front();
- closure();
- }
-
- int res = RunFileDescriptorWatches(100);
- if (res < 0)
+bool TestTaskRunner::RunOneTask() {
+ if (task_queue_.empty())
return false;
+ std::function<void()> closure = std::move(task_queue_.front());
+ task_queue_.pop_front();
+ closure();
return true;
}
-bool TestTaskRunner::RunFileDescriptorWatches(int timeout_ms) {
+std::function<void()> TestTaskRunner::CreateCheckpoint(
+ const std::string& checkpoint) {
+ PERFETTO_DCHECK(checkpoints_.count(checkpoint) == 0);
+ auto checkpoint_iter = checkpoints_.emplace(checkpoint, false);
+ return [checkpoint_iter] { checkpoint_iter.first->second = true; };
+}
+
+void TestTaskRunner::QueueFileDescriptorWatches(bool blocking) {
+ uint32_t timeout_ms = blocking ? kFileDescriptorWatchTimeoutMs : 0;
struct timeval timeout;
timeout.tv_usec = (timeout_ms % 1000) * 1000L;
timeout.tv_sec = static_cast<time_t>(timeout_ms / 1000);
int max_fd = 0;
- fd_set fds = {};
+ fd_set fds_in = {};
+ fd_set fds_err = {};
for (const auto& it : watched_fds_) {
- FD_SET(it.first, &fds);
+ FD_SET(it.first, &fds_in);
+ FD_SET(it.first, &fds_err);
max_fd = std::max(max_fd, it.first);
}
- int res = select(max_fd + 1, &fds, nullptr, nullptr, &timeout);
-
+ int res = select(max_fd + 1, &fds_in, nullptr, &fds_err, &timeout);
if (res < 0) {
perror("select() failed");
- return false;
+ abort();
}
if (res == 0)
- return true; // timeout
+ return; // timeout
for (int fd = 0; fd <= max_fd; ++fd) {
- if (!FD_ISSET(fd, &fds))
+ if (!FD_ISSET(fd, &fds_in) && !FD_ISSET(fd, &fds_err)) {
continue;
+ }
auto fd_and_callback = watched_fds_.find(fd);
PERFETTO_DCHECK(fd_and_callback != watched_fds_.end());
- fd_and_callback->second();
+ if (fd_watch_task_queued_[fd])
+ continue;
+ auto callback = fd_and_callback->second;
+ task_queue_.emplace_back([this, callback, fd]() {
+ fd_watch_task_queued_[fd] = false;
+ callback();
+ });
+ fd_watch_task_queued_[fd] = true;
}
- return true;
}
// TaskRunner implementation.
@@ -86,12 +126,14 @@
PERFETTO_DCHECK(fd >= 0);
PERFETTO_DCHECK(watched_fds_.count(fd) == 0);
watched_fds_.emplace(fd, std::move(callback));
+ fd_watch_task_queued_[fd] = false;
}
void TestTaskRunner::RemoveFileDescriptorWatch(int fd) {
PERFETTO_DCHECK(fd >= 0);
PERFETTO_DCHECK(watched_fds_.count(fd) == 1);
watched_fds_.erase(fd);
+ fd_watch_task_queued_.erase(fd);
}
} // namespace base
diff --git a/base/test/test_task_runner.h b/base/test/test_task_runner.h
index 33b37d2..92f9197 100644
--- a/base/test/test_task_runner.h
+++ b/base/test/test_task_runner.h
@@ -22,6 +22,7 @@
#include <functional>
#include <list>
#include <map>
+#include <string>
#include "base/task_runner.h"
@@ -33,10 +34,11 @@
TestTaskRunner();
~TestTaskRunner() override;
- void Run();
+ void RunUntilIdle();
+ void __attribute__((__noreturn__)) Run();
- // Returns false in case of errors.
- bool RunUntilIdle();
+ std::function<void()> CreateCheckpoint(const std::string& checkpoint);
+ void RunUntilCheckpoint(const std::string& checkpoint, int timeout_ms = 5000);
// TaskRunner implementation.
void PostTask(std::function<void()> closure) override;
@@ -47,11 +49,13 @@
TestTaskRunner(const TestTaskRunner&) = delete;
TestTaskRunner& operator=(const TestTaskRunner&) = delete;
- // Returns false in case of errors.
- bool RunFileDescriptorWatches(int timeout_ms);
+ bool RunOneTask();
+ void QueueFileDescriptorWatches(bool blocking);
std::list<std::function<void()>> task_queue_;
std::map<int, std::function<void()>> watched_fds_;
+ std::map<int, bool> fd_watch_task_queued_;
+ std::map<std::string, bool> checkpoints_;
};
} // namespace base
diff --git a/ipc/BUILD.gn b/ipc/BUILD.gn
new file mode 100644
index 0000000..da81b42
--- /dev/null
+++ b/ipc/BUILD.gn
@@ -0,0 +1,35 @@
+# Copyright (C) 2017 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+source_set("ipc") {
+ deps += [ "//base" ]
+ sources = [
+ "src/unix_socket.cc",
+ ]
+}
+
+executable("ipc_unittests") {
+ testonly = true
+ deps += [
+ ":ipc",
+ "//base",
+ "//base:test_support",
+ "//buildtools:gmock",
+ "//buildtools:gtest",
+ "//buildtools:gtest_main",
+ ]
+ sources = [
+ "src/unix_socket_unittest.cc",
+ ]
+}
diff --git a/ipc/src/unix_socket.cc b/ipc/src/unix_socket.cc
new file mode 100644
index 0000000..a9e5f13
--- /dev/null
+++ b/ipc/src/unix_socket.cc
@@ -0,0 +1,428 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ipc/src/unix_socket.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <memory>
+
+#include "base/build_config.h"
+#include "base/logging.h"
+#include "base/task_runner.h"
+#include "base/utils.h"
+
+namespace perfetto {
+namespace ipc {
+
+// TODO(primiano): Add ThreadChecker to methods of this class.
+
+namespace {
+
+// MSG_NOSIGNAL is not supported on Mac OS X, but in that case the socket is
+// created with SO_NOSIGPIPE (See InitializeSocket()).
+#if BUILDFLAG(OS_MACOSX)
+constexpr int kNoSigPipe = 0;
+#else
+constexpr int kNoSigPipe = MSG_NOSIGNAL;
+#endif
+
+// Android takes an int instead of socklen_t for the control buffer size.
+#if BUILDFLAG(OS_ANDROID)
+using CBufLenType = size_t;
+#else
+using CBufLenType = socklen_t;
+#endif
+
+bool MakeSockAddr(const std::string& socket_name,
+ sockaddr_un* addr,
+ socklen_t* addr_size) {
+ memset(addr, 0, sizeof(*addr));
+ const size_t name_len = socket_name.size();
+ if (name_len >= sizeof(addr->sun_path)) {
+ errno = ENAMETOOLONG;
+ return false;
+ }
+ memcpy(addr->sun_path, socket_name.data(), name_len);
+ if (addr->sun_path[0] == '@')
+ addr->sun_path[0] = '\0';
+ addr->sun_family = AF_UNIX;
+ *addr_size = static_cast<socklen_t>(
+ __builtin_offsetof(sockaddr_un, sun_path) + name_len + 1);
+ return true;
+}
+
+} // namespace
+
+// static
+std::unique_ptr<UnixSocket> UnixSocket::Listen(const std::string& socket_name,
+ EventListener* event_listener,
+ base::TaskRunner* task_runner) {
+ std::unique_ptr<UnixSocket> sock(new UnixSocket(event_listener, task_runner));
+ sock->DoListen(socket_name);
+ return sock;
+}
+
+// static
+std::unique_ptr<UnixSocket> UnixSocket::Connect(const std::string& socket_name,
+ EventListener* event_listener,
+ base::TaskRunner* task_runner) {
+ std::unique_ptr<UnixSocket> sock(new UnixSocket(event_listener, task_runner));
+ sock->DoConnect(socket_name);
+ return sock;
+}
+
+UnixSocket::UnixSocket(EventListener* event_listener,
+ base::TaskRunner* task_runner)
+ : UnixSocket(event_listener, task_runner, base::ScopedFile()) {}
+
+UnixSocket::UnixSocket(EventListener* event_listener,
+ base::TaskRunner* task_runner,
+ base::ScopedFile adopt_fd)
+ : event_listener_(event_listener),
+ task_runner_(task_runner),
+ weak_ref_(new WeakRef(this)) {
+ if (adopt_fd) {
+ // Only in the case of OnNewIncomingConnection().
+ fd_ = std::move(adopt_fd);
+ state_ = State::kConnected;
+ } else {
+ fd_.reset(socket(AF_UNIX, SOCK_STREAM, 0));
+ }
+ if (!fd_) {
+ last_error_ = errno;
+ return;
+ }
+
+#if BUILDFLAG(OS_MACOSX)
+ const int no_sigpipe = 1;
+ setsockopt(*fd_, SOL_SOCKET, SO_NOSIGPIPE, &no_sigpipe, sizeof(no_sigpipe));
+#endif
+ // There is no reason why a socket should outlive the process in case of
+ // exec() by default, this is just working around a broken unix design.
+ int fcntl_res = fcntl(*fd_, FD_CLOEXEC);
+ PERFETTO_DCHECK(fcntl_res == 0);
+
+ // Set non-blocking mode.
+ int flags = fcntl(*fd_, F_GETFL, 0);
+ flags |= O_NONBLOCK;
+ fcntl_res = fcntl(fd(), F_SETFL, flags);
+ PERFETTO_CHECK(fcntl_res == 0);
+
+ std::shared_ptr<WeakRef> weak_ref = weak_ref_;
+ task_runner_->AddFileDescriptorWatch(*fd_, [weak_ref]() {
+ if (weak_ref->sock)
+ weak_ref->sock->OnEvent();
+ });
+}
+
+UnixSocket::~UnixSocket() {
+ weak_ref_->sock = nullptr; // This will no-op any future callback.
+ Shutdown();
+}
+
+// Called only by the Listen() static constructor.
+void UnixSocket::DoListen(const std::string& socket_name) {
+ PERFETTO_DCHECK(state_ == State::kDisconnected);
+ if (!fd_)
+ return; // This is the only thing that can gracefully fail in the ctor.
+
+ sockaddr_un addr;
+ socklen_t addr_size;
+ if (!MakeSockAddr(socket_name, &addr, &addr_size)) {
+ last_error_ = errno;
+ return;
+ }
+
+// Android takes an int as 3rd argument of bind() instead of socklen_t.
+#if BUILDFLAG(OS_ANDROID)
+ const int bind_size = static_cast<int>(addr_size);
+#else
+ const socklen_t bind_size = addr_size;
+#endif
+
+ if (bind(*fd_, reinterpret_cast<sockaddr*>(&addr), bind_size)) {
+ last_error_ = errno;
+ PERFETTO_DPLOG("bind()");
+ return;
+ }
+ if (listen(*fd_, SOMAXCONN)) {
+ last_error_ = errno;
+ PERFETTO_DPLOG("listen()");
+ return;
+ }
+
+ last_error_ = 0;
+ state_ = State::kListening;
+}
+
+// Called only by the Connect() static constructor.
+void UnixSocket::DoConnect(const std::string& socket_name) {
+ PERFETTO_DCHECK(state_ == State::kDisconnected);
+
+ // This is the only thing that can gracefully fail in the ctor.
+ if (!fd_)
+ return NotifyConnectionState(false);
+
+ sockaddr_un addr;
+ socklen_t addr_size;
+ if (!MakeSockAddr(socket_name, &addr, &addr_size)) {
+ last_error_ = errno;
+ return NotifyConnectionState(false);
+ }
+
+ int res = PERFETTO_EINTR(
+ connect(*fd_, reinterpret_cast<sockaddr*>(&addr), addr_size));
+ if (res && errno != EINPROGRESS) {
+ last_error_ = errno;
+ return NotifyConnectionState(false);
+ }
+
+ // At this point either |res| == 0 (the connect() succeeded) or started
+ // asynchronously (EINPROGRESS).
+ last_error_ = 0;
+ state_ = State::kConnecting;
+
+ // Even if the socket is non-blocking, connecting to a UNIX socket can be
+ // acknowledged straight away rather than returning EINPROGRESS. In this case
+ // just trigger an OnEvent without waiting for the FD watch. That will poll
+ // the SO_ERROR and evolve the state into either kConnected or kDisconnected.
+ if (res == 0) {
+ std::shared_ptr<WeakRef> weak_ref = weak_ref_;
+ task_runner_->PostTask([weak_ref]() {
+ if (weak_ref->sock)
+ weak_ref->sock->OnEvent();
+ });
+ }
+}
+
+void UnixSocket::OnEvent() {
+ if (state_ == State::kDisconnected)
+ return; // Some spurious event, typically queued just before Shutdown().
+
+ if (state_ == State::kConnected)
+ return event_listener_->OnDataAvailable(this);
+
+ if (state_ == State::kConnecting) {
+ PERFETTO_DCHECK(fd_);
+ int sock_err = EINVAL;
+ socklen_t err_len = sizeof(sock_err);
+ int res = getsockopt(*fd_, SOL_SOCKET, SO_ERROR, &sock_err, &err_len);
+ if (res == 0 && sock_err == EINPROGRESS)
+ return; // Not connected yet, just a spurious FD watch wakeup.
+ if (res == 0 && sock_err == 0) {
+ state_ = State::kConnected;
+ return event_listener_->OnConnect(this, true /* connected */);
+ }
+ last_error_ = sock_err;
+ return event_listener_->OnConnect(this, false /* connected */);
+ }
+
+ // New incoming connection.
+ if (state_ == State::kListening) {
+ // There could be more than one incoming connection behind each FD watch
+ // notification. Drain'em all.
+ for (;;) {
+ sockaddr_un cli_addr = {};
+ socklen_t size = sizeof(cli_addr);
+ base::ScopedFile new_fd(PERFETTO_EINTR(
+ accept(*fd_, reinterpret_cast<sockaddr*>(&cli_addr), &size)));
+ if (!new_fd)
+ return;
+ std::unique_ptr<UnixSocket> new_sock(
+ new UnixSocket(event_listener_, task_runner_, std::move(new_fd)));
+ event_listener_->OnNewIncomingConnection(this, std::move(new_sock));
+ }
+ }
+}
+
+bool UnixSocket::Send(const std::string& msg) {
+ return Send(msg.c_str(), msg.size() + 1);
+}
+
+bool UnixSocket::Send(const void* msg, size_t len, int send_fd) {
+ if (state_ != State::kConnected) {
+ last_error_ = ENOTCONN;
+ return false;
+ }
+
+ msghdr msg_hdr = {};
+ iovec iov = {const_cast<void*>(msg), len};
+ msg_hdr.msg_iov = &iov;
+ msg_hdr.msg_iovlen = 1;
+ alignas(cmsghdr) char control_buf[256];
+
+ if (send_fd > -1) {
+ const CBufLenType control_buf_len =
+ static_cast<CBufLenType>(CMSG_SPACE(sizeof(int)));
+ PERFETTO_CHECK(control_buf_len <= sizeof(control_buf));
+ memset(control_buf, 0, sizeof(control_buf));
+ msg_hdr.msg_control = control_buf;
+ msg_hdr.msg_controllen = control_buf_len;
+ struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+ memcpy(CMSG_DATA(cmsg), &send_fd, sizeof(int));
+ msg_hdr.msg_controllen = cmsg->cmsg_len;
+ }
+
+ const ssize_t sz = PERFETTO_EINTR(sendmsg(*fd_, &msg_hdr, kNoSigPipe));
+ if (sz >= 0) {
+ // There should be no way a non-blocking socket returns < |len|.
+ // If the queueing fails, sendmsg() must return -1 + errno = EWOULDBLOCK.
+ PERFETTO_CHECK(static_cast<size_t>(sz) == len);
+ last_error_ = 0;
+ return true;
+ }
+
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ // A genuine out-of-buffer. The client should retry or give up.
+ // Man pages specify that EAGAIN and EWOULDBLOCK have the same semantic here
+ // and clients should check for both.
+ last_error_ = EAGAIN;
+ return false;
+ }
+
+ // Either the the other endpoint disconnect (ECONNRESET) or some other error
+ // happened.
+ last_error_ = errno;
+ PERFETTO_DPLOG("sendmsg() failed");
+ Shutdown();
+ return false;
+}
+
+void UnixSocket::Shutdown() {
+ std::shared_ptr<WeakRef>& weak_ref = weak_ref_;
+ if (state_ == State::kConnected) {
+ task_runner_->PostTask([weak_ref]() {
+ if (weak_ref->sock)
+ weak_ref->sock->event_listener_->OnDisconnect(weak_ref->sock);
+ });
+ } else if (state_ == State::kConnecting) {
+ task_runner_->PostTask([weak_ref]() {
+ if (weak_ref->sock)
+ weak_ref->sock->event_listener_->OnConnect(weak_ref->sock, false);
+ });
+ }
+ if (fd_) {
+ shutdown(*fd_, SHUT_RDWR);
+ task_runner_->RemoveFileDescriptorWatch(*fd_);
+ fd_.reset();
+ }
+ state_ = State::kDisconnected;
+}
+
+size_t UnixSocket::Receive(void* msg, size_t len, base::ScopedFile* recv_fd) {
+ if (state_ != State::kConnected) {
+ last_error_ = ENOTCONN;
+ return 0;
+ }
+
+ msghdr msg_hdr = {};
+ iovec iov = {msg, len};
+ msg_hdr.msg_iov = &iov;
+ msg_hdr.msg_iovlen = 1;
+ alignas(cmsghdr) char control_buf[256];
+
+ if (recv_fd) {
+ msg_hdr.msg_control = control_buf;
+ msg_hdr.msg_controllen = static_cast<CBufLenType>(CMSG_SPACE(sizeof(int)));
+ PERFETTO_CHECK(msg_hdr.msg_controllen <= sizeof(control_buf));
+ }
+ const ssize_t sz = PERFETTO_EINTR(recvmsg(*fd_, &msg_hdr, kNoSigPipe));
+ if (sz < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ last_error_ = EAGAIN;
+ return 0;
+ }
+ if (sz == 0) {
+ last_error_ = errno;
+ Shutdown();
+ return 0;
+ }
+ PERFETTO_CHECK(static_cast<size_t>(sz) <= len);
+
+ int* fds = nullptr;
+ uint32_t fds_len = 0;
+
+ if (msg_hdr.msg_controllen > 0) {
+ for (cmsghdr* cmsg = CMSG_FIRSTHDR(&msg_hdr); cmsg;
+ cmsg = CMSG_NXTHDR(&msg_hdr, cmsg)) {
+ const size_t payload_len = cmsg->cmsg_len - CMSG_LEN(0);
+ if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
+ PERFETTO_DCHECK(payload_len % sizeof(int) == 0u);
+ PERFETTO_DCHECK(fds == nullptr);
+ fds = reinterpret_cast<int*>(CMSG_DATA(cmsg));
+ fds_len = static_cast<uint32_t>(payload_len / sizeof(int));
+ }
+ }
+ }
+
+ if (msg_hdr.msg_flags & MSG_TRUNC || msg_hdr.msg_flags & MSG_CTRUNC) {
+ for (size_t i = 0; fds && i < fds_len; ++i)
+ close(fds[i]);
+ last_error_ = EMSGSIZE;
+ Shutdown();
+ return 0;
+ }
+
+ for (size_t i = 0; fds && i < fds_len; ++i) {
+ if (recv_fd && i == 0) {
+ recv_fd->reset(fds[i]);
+ } else {
+ close(fds[i]);
+ }
+ }
+
+ last_error_ = 0;
+ return static_cast<size_t>(sz);
+}
+
+std::string UnixSocket::ReceiveString(size_t max_length) {
+ std::unique_ptr<char[]> buf(new char[max_length + 1]);
+ size_t rsize = Receive(buf.get(), max_length);
+ PERFETTO_CHECK(static_cast<size_t>(rsize) <= max_length);
+ buf[static_cast<size_t>(rsize)] = '\0';
+ return std::string(buf.get());
+}
+
+void UnixSocket::NotifyConnectionState(bool success) {
+ std::shared_ptr<WeakRef> weak_ref = weak_ref_;
+ task_runner_->PostTask([weak_ref, success]() {
+ if (weak_ref->sock)
+ weak_ref->sock->event_listener_->OnConnect(weak_ref->sock, success);
+ });
+}
+
+UnixSocket::EventListener::~EventListener() {}
+void UnixSocket::EventListener::OnNewIncomingConnection(
+ UnixSocket*,
+ std::unique_ptr<UnixSocket>) {}
+void UnixSocket::EventListener::OnConnect(UnixSocket*, bool) {}
+void UnixSocket::EventListener::OnDisconnect(UnixSocket*) {}
+void UnixSocket::EventListener::OnDataAvailable(UnixSocket*) {}
+
+} // namespace ipc
+} // namespace perfetto
diff --git a/ipc/src/unix_socket.h b/ipc/src/unix_socket.h
new file mode 100644
index 0000000..b91db29
--- /dev/null
+++ b/ipc/src/unix_socket.h
@@ -0,0 +1,200 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef IPC_SRC_UNIX_SOCKET_H_
+#define IPC_SRC_UNIX_SOCKET_H_
+
+#include <stdint.h>
+#include <sys/types.h>
+
+#include <memory>
+#include <string>
+
+#include "base/logging.h"
+#include "base/scoped_file.h"
+
+namespace perfetto {
+
+namespace base {
+class TaskRunner;
+} // namespace base.
+
+namespace ipc {
+
+// A non-blocking UNIX domain socket in SOCK_STREAM mode. Allows also to
+// transfer file descriptors. None of the methods in this class are blocking.
+// The main design goal is API simplicity and strong guarantees on the
+// EventListener callbacks, in order to avoid ending in some undefined state.
+// In case of any error it will aggressively just shut down the socket and
+// notify the failure with OnConnect(false) or OnDisconnect() depending on the
+// state of the socket (see below).
+// EventListener callbacks stop happening as soon as the instance is destroyed.
+//
+// Lifecycle of a client socket:
+//
+// Connect()
+// |
+// +------------------+------------------+
+// | (success) | (failure or Shutdown())
+// V V
+// OnConnect(true) OnConnect(false)
+// |
+// V
+// OnDataAvailable()
+// |
+// V
+// OnDisconnect() (failure or shutdown)
+//
+//
+// Lifecycle of a server socket:
+//
+// Listen() --> returns false in case of errors.
+// |
+// V
+// OnNewIncomingConnection(new_socket)
+//
+// (|new_socket| inherits the same EventListener)
+// |
+// V
+// OnDataAvailable()
+// | (failure or Shutdown())
+// V
+// OnDisconnect()
+class UnixSocket {
+ public:
+ class EventListener {
+ public:
+ virtual ~EventListener();
+
+ // After Listen().
+ virtual void OnNewIncomingConnection(
+ UnixSocket* self,
+ std::unique_ptr<UnixSocket> new_connection);
+
+ // After Connect(), whether successful or not.
+ virtual void OnConnect(UnixSocket* self, bool connected);
+
+ // After a successful Connect() or OnNewIncomingConnection(). Either the
+ // other endpoint did disconnect or some other error happened.
+ virtual void OnDisconnect(UnixSocket* self);
+
+ // Whenever there is data available to Receive(). Note that spurious FD
+ // watch events are possible, so it is possible that Receive() soon after
+ // OnDataAvailable() returns 0 (just ignore those).
+ virtual void OnDataAvailable(UnixSocket* self);
+ };
+
+ enum class State {
+ kDisconnected = 0, // Failed connection, peer disconnection or Shutdown().
+ kConnecting, // Soon after Connect(), before it either succeeds or fails.
+ kConnected, // After a successful Connect().
+ kListening // After Listen(), until Shutdown().
+ };
+
+ // Creates a Unix domain socket and starts listening. If |socket_name|
+ // starts with a '@', an abstract socket will be created (Linux/Android only).
+ // Returns always an instance. In case of failure (e.g., another socket
+ // with the same name is already listening) the returned socket will have
+ // is_listening() == false and last_error() will contain the failure reason.
+ static std::unique_ptr<UnixSocket> Listen(const std::string& socket_name,
+ EventListener*,
+ base::TaskRunner*);
+
+ // Creates a Unix domain socket and connects to the listening endpoint.
+ // Returns always an instance. EventListener::OnConnect(bool success) will
+ // be called always, whether the connection succeeded or not.
+ static std::unique_ptr<UnixSocket> Connect(const std::string& socket_name,
+ EventListener*,
+ base::TaskRunner*);
+
+ // This class gives the hard guarantee that no callback is called on the
+ // passed EventListener immediately after the object has been destroyed.
+ // Any queued callback will be silently dropped.
+ ~UnixSocket();
+
+ // Shuts down the current connection, if any. If the socket was Listen()-ing,
+ // stops listening. The socket goes back to kNotInitialized state, so it can
+ // be reused with Listen() or Connect().
+ void Shutdown();
+
+ // Returns true is the message was queued, false if there was no space in the
+ // output buffer, in which case the client should retry or give up.
+ // If any other error happens the socket will be shutdown and
+ // EventListener::OnDisconnect() will be called.
+ // If the socket is not connected, Send() will just return false.
+ // Does not append a null string terminator to msg in any case.
+ bool Send(const void* msg, size_t len, int send_fd = -1);
+ bool Send(const std::string& msg);
+
+ // Returns the number of bytes (<= |len|) written in |msg| or 0 if there
+ // is no data in the buffer to read or an error occurs (in which case a
+ // EventListener::OnDisconnect() will follow).
+ // If the ScopedFile pointer is not null and a FD is received, it moves the
+ // received FD into that. If a FD is received but the ScopedFile pointer is
+ // null, the FD will be automatically closed.
+ size_t Receive(void* msg, size_t len, base::ScopedFile* = nullptr);
+
+ // Only for tests. This is slower than Receive() as it requires a heap
+ // allocation and a copy for the std::string. Guarantees that the returned
+ // string is null terminated even if the underlying message sent by the peer
+ // is not.
+ std::string ReceiveString(size_t max_length = 1024);
+
+ bool is_connected() const { return state_ == State::kConnected; }
+ bool is_listening() const { return state_ == State::kListening; }
+ int fd() const { return fd_.get(); }
+ int last_error() const { return last_error_; }
+
+ private:
+ // Used to decouple the lifetime of the UnixSocket from the callbacks
+ // registered on the TaskRunner, which might happen after UnixSocket has been
+ // destroyed. This is essentially a single-instance weak_ptr<UnixSocket>.
+ // Unfortunately C++11's weak_ptr would require UnixSocket to be a shared_ptr,
+ // which is undesirable here. The |sock| pointer is invalidated by the dtor
+ // of UnixSocket.
+ struct WeakRef {
+ explicit WeakRef(UnixSocket* s) : sock(s) {}
+ ~WeakRef() = default;
+ WeakRef(const WeakRef&) = delete;
+ WeakRef& operator=(const WeakRef&) = delete;
+
+ UnixSocket* sock;
+ };
+
+ UnixSocket(EventListener*, base::TaskRunner*);
+ UnixSocket(EventListener*, base::TaskRunner*, base::ScopedFile);
+ UnixSocket(const UnixSocket&) = delete;
+ UnixSocket& operator=(const UnixSocket&) = delete;
+
+ // Called once by the corresponding public static factory methods.
+ void DoConnect(const std::string& socket_name);
+ void DoListen(const std::string& socket_name);
+
+ void OnEvent();
+ void NotifyConnectionState(bool success);
+
+ base::ScopedFile fd_;
+ State state_ = State::kDisconnected;
+ int last_error_ = 0;
+ EventListener* event_listener_;
+ base::TaskRunner* task_runner_;
+ std::shared_ptr<WeakRef> weak_ref_;
+};
+
+} // namespace ipc
+} // namespace perfetto
+
+#endif // IPC_SRC_UNIX_SOCKET_H_
diff --git a/ipc/src/unix_socket_unittest.cc b/ipc/src/unix_socket_unittest.cc
new file mode 100644
index 0000000..555f2f5
--- /dev/null
+++ b/ipc/src/unix_socket_unittest.cc
@@ -0,0 +1,386 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ipc/src/unix_socket.h"
+
+#include <sys/mman.h>
+
+#include <list>
+
+#include "base/build_config.h"
+#include "base/logging.h"
+#include "base/test/test_task_runner.h"
+#include "base/utils.h"
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+namespace perfetto {
+namespace ipc {
+namespace {
+
+using ::testing::_;
+using ::testing::Invoke;
+using ::testing::Mock;
+
+// Mac OS X doesn't support abstract (i.e. unnamed) sockets.
+#if BUILDFLAG(OS_MACOSX)
+static const char kSocketName[] = "/tmp/test_socket";
+void UnlinkSocket() {
+ unlink(kSocketName);
+}
+#else
+static const char kSocketName[] = "@test_socket";
+void UnlinkSocket() {}
+#endif
+
+class MockEventListener : public UnixSocket::EventListener {
+ public:
+ MOCK_METHOD2(OnNewIncomingConnection, void(UnixSocket*, UnixSocket*));
+ MOCK_METHOD2(OnConnect, void(UnixSocket*, bool));
+ MOCK_METHOD1(OnDisconnect, void(UnixSocket*));
+ MOCK_METHOD1(OnDataAvailable, void(UnixSocket*));
+
+ // GMock doesn't support mocking methods with non-copiable args.
+ void OnNewIncomingConnection(
+ UnixSocket* self,
+ std::unique_ptr<UnixSocket> new_connection) override {
+ incoming_connections_.emplace_back(std::move(new_connection));
+ OnNewIncomingConnection(self, incoming_connections_.back().get());
+ }
+
+ std::unique_ptr<UnixSocket> GetIncomingConnection() {
+ if (incoming_connections_.empty())
+ return nullptr;
+ std::unique_ptr<UnixSocket> sock = std::move(incoming_connections_.front());
+ incoming_connections_.pop_front();
+ return sock;
+ }
+
+ private:
+ std::list<std::unique_ptr<UnixSocket>> incoming_connections_;
+};
+
+class UnixSocketTest : public ::testing::Test {
+ protected:
+ void SetUp() override { UnlinkSocket(); }
+ void TearDown() override { UnlinkSocket(); }
+
+ base::TestTaskRunner task_runner_;
+ MockEventListener event_listener_;
+};
+
+TEST_F(UnixSocketTest, ConnectionFailureIfUnreachable) {
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ ASSERT_FALSE(cli->is_connected());
+ auto checkpoint = task_runner_.CreateCheckpoint("failure");
+ EXPECT_CALL(event_listener_, OnConnect(cli.get(), false))
+ .WillOnce(Invoke([checkpoint](UnixSocket*, bool) { checkpoint(); }));
+ task_runner_.RunUntilCheckpoint("failure");
+}
+
+// Both server and client should see an OnDisconnect() if the server drops
+// incoming connections immediately as they are created.
+TEST_F(UnixSocketTest, ConnectionImmediatelyDroppedByServer) {
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ ASSERT_TRUE(srv->is_listening());
+
+ // The server will immediately shutdown the connection upon
+ // OnNewIncomingConnection().
+ auto srv_did_shutdown = task_runner_.CreateCheckpoint("srv_did_shutdown");
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .WillOnce(
+ Invoke([this, srv_did_shutdown](UnixSocket*, UnixSocket* new_conn) {
+ EXPECT_CALL(event_listener_, OnDisconnect(new_conn));
+ new_conn->Shutdown();
+ srv_did_shutdown();
+ }));
+
+ auto checkpoint = task_runner_.CreateCheckpoint("cli_connected");
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+ .WillOnce(Invoke([checkpoint](UnixSocket*, bool) { checkpoint(); }));
+ task_runner_.RunUntilCheckpoint("cli_connected");
+ task_runner_.RunUntilCheckpoint("srv_did_shutdown");
+
+ // Trying to send something will trigger the disconnection notification.
+ auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
+ EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
+ .WillOnce(
+ Invoke([cli_disconnected](UnixSocket*) { cli_disconnected(); }));
+ EXPECT_FALSE(cli->Send("whatever"));
+ task_runner_.RunUntilCheckpoint("cli_disconnected");
+}
+
+TEST_F(UnixSocketTest, ClientAndServerExchangeData) {
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ ASSERT_TRUE(srv->is_listening());
+
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
+ auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+ auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .WillOnce(Invoke([this, cli_connected, srv_disconnected](
+ UnixSocket*, UnixSocket* srv_conn) {
+ EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
+ .WillOnce(Invoke(
+ [srv_disconnected](UnixSocket*) { srv_disconnected(); }));
+ cli_connected();
+ }));
+ task_runner_.RunUntilCheckpoint("cli_connected");
+
+ auto srv_conn = event_listener_.GetIncomingConnection();
+ ASSERT_TRUE(srv_conn);
+ ASSERT_TRUE(cli->is_connected());
+
+ auto cli_did_recv = task_runner_.CreateCheckpoint("cli_did_recv");
+ EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
+ .WillOnce(Invoke([cli_did_recv](UnixSocket* s) {
+ ASSERT_EQ("srv>cli", s->ReceiveString());
+ cli_did_recv();
+ }));
+
+ auto srv_did_recv = task_runner_.CreateCheckpoint("srv_did_recv");
+ EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn.get()))
+ .WillOnce(Invoke([srv_did_recv](UnixSocket* s) {
+ ASSERT_EQ("cli>srv", s->ReceiveString());
+ srv_did_recv();
+ }));
+ ASSERT_TRUE(cli->Send("cli>srv"));
+ ASSERT_TRUE(srv_conn->Send("srv>cli"));
+ task_runner_.RunUntilCheckpoint("cli_did_recv");
+ task_runner_.RunUntilCheckpoint("srv_did_recv");
+
+ // Check that Send/Receive() fails gracefully once the socket is closed.
+ auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
+ EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
+ .WillOnce(
+ Invoke([cli_disconnected](UnixSocket*) { cli_disconnected(); }));
+ cli->Shutdown();
+ char msg[4];
+ ASSERT_EQ(0u, cli->Receive(&msg, sizeof(msg)));
+ ASSERT_EQ("", cli->ReceiveString());
+ ASSERT_EQ(0u, srv_conn->Receive(&msg, sizeof(msg)));
+ ASSERT_EQ("", srv_conn->ReceiveString());
+ ASSERT_FALSE(cli->Send("foo"));
+ ASSERT_FALSE(srv_conn->Send("bar"));
+ srv->Shutdown();
+ task_runner_.RunUntilCheckpoint("cli_disconnected");
+ task_runner_.RunUntilCheckpoint("srv_disconnected");
+}
+
+// Mostly a stress tests. Connects kNumClients clients to the same server and
+// tests that all can exchange data and can see the expected sequence of events.
+TEST_F(UnixSocketTest, SeveralClients) {
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ ASSERT_TRUE(srv->is_listening());
+ constexpr size_t kNumClients = 32;
+ std::unique_ptr<UnixSocket> cli[kNumClients];
+
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .Times(kNumClients)
+ .WillRepeatedly(Invoke([this](UnixSocket*, UnixSocket* s) {
+ EXPECT_CALL(event_listener_, OnDataAvailable(s))
+ .WillOnce(Invoke([](UnixSocket* t) {
+ ASSERT_EQ("PING", t->ReceiveString());
+ ASSERT_TRUE(t->Send("PONG"));
+ }));
+ }));
+
+ for (size_t i = 0; i < kNumClients; i++) {
+ cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
+ .WillOnce(Invoke([](UnixSocket* s, bool success) {
+ ASSERT_TRUE(success);
+ ASSERT_TRUE(s->Send("PING"));
+ }));
+
+ auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
+ EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
+ .WillOnce(Invoke([checkpoint](UnixSocket* s) {
+ ASSERT_EQ("PONG", s->ReceiveString());
+ checkpoint();
+ }));
+ }
+
+ for (size_t i = 0; i < kNumClients; i++) {
+ task_runner_.RunUntilCheckpoint(std::to_string(i));
+ ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
+ }
+}
+
+// Creates two processes. The server process creates a file and passes it over
+// the socket to the client. Both processes mmap the file in shared mode and
+// check that they see the same contents.
+TEST_F(UnixSocketTest, SharedMemory) {
+ int pipes[2];
+ ASSERT_EQ(0, pipe(pipes));
+
+ pid_t pid = fork();
+ ASSERT_GE(pid, 0);
+ constexpr size_t kTmpSize = 4096;
+
+ if (pid == 0) {
+ // Child process.
+ FILE* tmp = tmpfile();
+ ASSERT_NE(nullptr, tmp);
+ int tmp_fd = fileno(tmp);
+ ASSERT_FALSE(ftruncate(tmp_fd, kTmpSize));
+ char* mem = reinterpret_cast<char*>(
+ mmap(nullptr, kTmpSize, PROT_READ | PROT_WRITE, MAP_SHARED, tmp_fd, 0));
+ ASSERT_NE(nullptr, mem);
+ memcpy(mem, "shm rocks", 10);
+
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ ASSERT_TRUE(srv->is_listening());
+ // Signal the other process that it can connect.
+ ASSERT_EQ(1, PERFETTO_EINTR(write(pipes[1], ".", 1)));
+ auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_server");
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .WillOnce(Invoke(
+ [this, tmp_fd, checkpoint, mem](UnixSocket*, UnixSocket* new_conn) {
+ ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd));
+ // Wait for the client to change this again.
+ EXPECT_CALL(event_listener_, OnDataAvailable(new_conn))
+ .WillOnce(Invoke([checkpoint, mem](UnixSocket* s) {
+ ASSERT_EQ("change notify", s->ReceiveString());
+ ASSERT_STREQ("rock more", mem);
+ checkpoint();
+ }));
+ }));
+ task_runner_.RunUntilCheckpoint("change_seen_by_server");
+ ASSERT_TRUE(Mock::VerifyAndClearExpectations(&event_listener_));
+ _exit(0);
+ } else {
+ char sync_cmd = '\0';
+ ASSERT_EQ(1, PERFETTO_EINTR(read(pipes[0], &sync_cmd, 1)));
+ ASSERT_EQ('.', sync_cmd);
+ auto cli =
+ UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+ EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
+ auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_client");
+ EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
+ .WillOnce(Invoke([checkpoint](UnixSocket* s) {
+ char msg[32];
+ base::ScopedFile fd;
+ ASSERT_EQ(5u, s->Receive(msg, sizeof(msg), &fd));
+ ASSERT_STREQ("txfd", msg);
+ ASSERT_TRUE(fd);
+ char* mem = reinterpret_cast<char*>(mmap(
+ nullptr, kTmpSize, PROT_READ | PROT_WRITE, MAP_SHARED, *fd, 0));
+ ASSERT_NE(nullptr, mem);
+ mem[9] = '\0'; // Just to get a clean error in case of test failure.
+ ASSERT_STREQ("shm rocks", mem);
+
+ // Now change the shared memory and ping the other process.
+ memcpy(mem, "rock more", 10);
+ ASSERT_TRUE(s->Send("change notify"));
+ checkpoint();
+ }));
+ task_runner_.RunUntilCheckpoint("change_seen_by_client");
+ int st = 0;
+ PERFETTO_EINTR(waitpid(pid, &st, 0));
+ ASSERT_FALSE(WIFSIGNALED(st)) << "Server died with signal " << WTERMSIG(st);
+ EXPECT_TRUE(WIFEXITED(st));
+ ASSERT_EQ(0, WEXITSTATUS(st));
+ }
+}
+
+constexpr size_t kAtomicWrites_FrameSize = 1123;
+bool AtomicWrites_SendAttempt(UnixSocket* s,
+ base::TaskRunner* task_runner,
+ int num_frame) {
+ char buf[kAtomicWrites_FrameSize];
+ memset(buf, static_cast<char>(num_frame), sizeof(buf));
+ if (s->Send(buf, sizeof(buf)))
+ return true;
+ task_runner->PostTask(
+ std::bind(&AtomicWrites_SendAttempt, s, task_runner, num_frame));
+ return false;
+}
+
+// Creates a client-server pair. The client sends continuously data to the
+// server. Upon each Send() attempt, the client sends a buffer which is memset()
+// with a unique number (0 to kNumFrames). We are deliberately trying to fill
+// the socket output buffer, so we expect some of these Send()s to fail.
+// The client is extremely aggressive and, when a Send() fails, just keeps
+// re-posting it with the same unique number. The server verifies that we
+// receive one and exactly one of each buffers, without any gaps or truncation.
+TEST_F(UnixSocketTest, SendIsAtomic) {
+ static constexpr int kNumFrames = 127;
+
+ auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
+ ASSERT_TRUE(srv->is_listening());
+
+ auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
+
+ auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
+ std::set<int> received_iterations;
+ EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
+ .WillOnce(Invoke([this, &received_iterations, all_frames_done](
+ UnixSocket*, UnixSocket* srv_conn) {
+ EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
+ .WillRepeatedly(
+ Invoke([&received_iterations, all_frames_done](UnixSocket* s) {
+ char buf[kAtomicWrites_FrameSize];
+ size_t res = s->Receive(buf, sizeof(buf));
+ if (res == 0)
+ return; // Spurious select(), could happen.
+ ASSERT_EQ(kAtomicWrites_FrameSize, res);
+ // Check that we didn't get two truncated frames.
+ for (size_t i = 0; i < sizeof(buf); i++)
+ ASSERT_EQ(buf[0], buf[i]);
+ ASSERT_EQ(0u, received_iterations.count(buf[0]));
+ received_iterations.insert(buf[0]);
+ if (received_iterations.size() == kNumFrames)
+ all_frames_done();
+ }));
+ }));
+
+ auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
+ EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
+ .WillOnce(
+ Invoke([cli_connected](UnixSocket*, bool) { cli_connected(); }));
+ task_runner_.RunUntilCheckpoint("cli_connected");
+ ASSERT_TRUE(cli->is_connected());
+
+ bool did_requeue = false;
+ for (int i = 0; i < kNumFrames; i++)
+ did_requeue |= !AtomicWrites_SendAttempt(cli.get(), &task_runner_, i);
+
+ // We expect that at least one of the kNumFrames didn't fit in the socket
+ // buffer and was re-posted, otherwise this entire test would be pointless.
+ ASSERT_TRUE(did_requeue);
+
+ task_runner_.RunUntilCheckpoint("all_frames_done");
+}
+
+// TODO(primiano): add a test to check that in the case of a peer sending a fd
+// and the other end just doing a recv (without taking it), the fd is closed and
+// not left around.
+
+// TODO(primiano); add a test to check that a socket can be reused after
+// Shutdown(),
+
+// TODO(primiano): add a test to check that OnDisconnect() is called in all
+// possible cases.
+
+// TODO(primiano): add tests that destroy the socket in all possible stages and
+// verify that no spurious EventListener callback is received.
+
+} // namespace
+} // namespace ipc
+} // namespace perfetto