Merge "Retry write operation when getting ENOBUFS." into oc-dev
diff --git a/debuggerd/Android.bp b/debuggerd/Android.bp
index 024bc3d..4783d6e 100644
--- a/debuggerd/Android.bp
+++ b/debuggerd/Android.bp
@@ -163,6 +163,7 @@
             srcs: [
                 "client/debuggerd_client_test.cpp",
                 "debuggerd_test.cpp",
+                "tombstoned_client.cpp",
                 "util.cpp"
             ],
         },
@@ -176,7 +177,8 @@
     ],
 
     static_libs: [
-        "libdebuggerd"
+        "libdebuggerd",
+        "libc_logging",
     ],
 
     local_include_dirs: [
diff --git a/debuggerd/client/debuggerd_client.cpp b/debuggerd/client/debuggerd_client.cpp
index b9fb512..224444f 100644
--- a/debuggerd/client/debuggerd_client.cpp
+++ b/debuggerd/client/debuggerd_client.cpp
@@ -65,24 +65,25 @@
   auto time_left = [timeout_ms, &end]() { return end - std::chrono::steady_clock::now(); };
   auto set_timeout = [timeout_ms, &time_left](int sockfd) {
     if (timeout_ms <= 0) {
-      return true;
+      return -1;
     }
 
     auto remaining = time_left();
     if (remaining < decltype(remaining)::zero()) {
-      return false;
+      LOG(ERROR) << "timeout expired";
+      return -1;
     }
     struct timeval timeout;
     populate_timeval(&timeout, remaining);
 
     if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) != 0) {
-      return false;
+      return -1;
     }
     if (setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)) != 0) {
-      return false;
+      return -1;
     }
 
-    return true;
+    return sockfd;
   };
 
   sockfd.reset(socket(AF_LOCAL, SOCK_SEQPACKET, 0));
@@ -91,12 +92,7 @@
     return false;
   }
 
-  if (!set_timeout(sockfd)) {
-    PLOG(ERROR) << "libdebugger_client: failed to set timeout";
-    return false;
-  }
-
-  if (socket_local_client_connect(sockfd.get(), kTombstonedInterceptSocketName,
+  if (socket_local_client_connect(set_timeout(sockfd.get()), kTombstonedInterceptSocketName,
                                   ANDROID_SOCKET_NAMESPACE_RESERVED, SOCK_SEQPACKET) == -1) {
     PLOG(ERROR) << "libdebuggerd_client: failed to connect to tombstoned";
     return false;
@@ -115,21 +111,35 @@
     return false;
   }
 
-  if (send_fd(sockfd.get(), &req, sizeof(req), std::move(pipe_write)) != sizeof(req)) {
+  if (send_fd(set_timeout(sockfd), &req, sizeof(req), std::move(pipe_write)) != sizeof(req)) {
     PLOG(ERROR) << "libdebuggerd_client: failed to send output fd to tombstoned";
     return false;
   }
 
+  // Check to make sure we've successfully registered.
+  InterceptResponse response;
+  ssize_t rc =
+      TEMP_FAILURE_RETRY(recv(set_timeout(sockfd.get()), &response, sizeof(response), MSG_TRUNC));
+  if (rc == 0) {
+    LOG(ERROR) << "libdebuggerd_client: failed to read response from tombstoned: timeout reached?";
+    return false;
+  } else if (rc != sizeof(response)) {
+    LOG(ERROR)
+        << "libdebuggerd_client: received packet of unexpected length from tombstoned: expected "
+        << sizeof(response) << ", received " << rc;
+    return false;
+  }
+
+  if (response.status != InterceptStatus::kRegistered) {
+    LOG(ERROR) << "libdebuggerd_client: unexpected registration response: "
+               << static_cast<int>(response.status);
+    return false;
+  }
+
   bool backtrace = dump_type == kDebuggerdBacktrace;
   send_signal(pid, backtrace);
 
-  if (!set_timeout(sockfd)) {
-    PLOG(ERROR) << "libdebuggerd_client: failed to set timeout";
-    return false;
-  }
-
-  InterceptResponse response;
-  ssize_t rc = TEMP_FAILURE_RETRY(recv(sockfd.get(), &response, sizeof(response), MSG_TRUNC));
+  rc = TEMP_FAILURE_RETRY(recv(set_timeout(sockfd.get()), &response, sizeof(response), MSG_TRUNC));
   if (rc == 0) {
     LOG(ERROR) << "libdebuggerd_client: failed to read response from tombstoned: timeout reached?";
     return false;
@@ -140,7 +150,7 @@
     return false;
   }
 
-  if (response.success != 1) {
+  if (response.status != InterceptStatus::kStarted) {
     response.error_message[sizeof(response.error_message) - 1] = '\0';
     LOG(ERROR) << "libdebuggerd_client: tombstoned reported failure: " << response.error_message;
     return false;
diff --git a/debuggerd/debuggerd_test.cpp b/debuggerd/debuggerd_test.cpp
index 1a27f3f..1befcb1 100644
--- a/debuggerd/debuggerd_test.cpp
+++ b/debuggerd/debuggerd_test.cpp
@@ -36,6 +36,7 @@
 #include <cutils/sockets.h>
 #include <debuggerd/handler.h>
 #include <debuggerd/protocol.h>
+#include <debuggerd/tombstoned.h>
 #include <debuggerd/util.h>
 #include <gtest/gtest.h>
 
@@ -77,6 +78,54 @@
     }                                                                           \
   } while (0)
 
+static void tombstoned_intercept(pid_t target_pid, unique_fd* intercept_fd, unique_fd* output_fd) {
+  intercept_fd->reset(socket_local_client(kTombstonedInterceptSocketName,
+                                          ANDROID_SOCKET_NAMESPACE_RESERVED, SOCK_SEQPACKET));
+  if (intercept_fd->get() == -1) {
+    FAIL() << "failed to contact tombstoned: " << strerror(errno);
+  }
+
+  InterceptRequest req = {.pid = target_pid};
+
+  unique_fd output_pipe_write;
+  if (!Pipe(output_fd, &output_pipe_write)) {
+    FAIL() << "failed to create output pipe: " << strerror(errno);
+  }
+
+  std::string pipe_size_str;
+  int pipe_buffer_size;
+  if (!android::base::ReadFileToString("/proc/sys/fs/pipe-max-size", &pipe_size_str)) {
+    FAIL() << "failed to read /proc/sys/fs/pipe-max-size: " << strerror(errno);
+  }
+
+  pipe_size_str = android::base::Trim(pipe_size_str);
+
+  if (!android::base::ParseInt(pipe_size_str.c_str(), &pipe_buffer_size, 0)) {
+    FAIL() << "failed to parse pipe max size";
+  }
+
+  if (fcntl(output_fd->get(), F_SETPIPE_SZ, pipe_buffer_size) != pipe_buffer_size) {
+    FAIL() << "failed to set pipe size: " << strerror(errno);
+  }
+
+  if (send_fd(intercept_fd->get(), &req, sizeof(req), std::move(output_pipe_write)) != sizeof(req)) {
+    FAIL() << "failed to send output fd to tombstoned: " << strerror(errno);
+  }
+
+  InterceptResponse response;
+  ssize_t rc = TEMP_FAILURE_RETRY(read(intercept_fd->get(), &response, sizeof(response)));
+  if (rc == -1) {
+    FAIL() << "failed to read response from tombstoned: " << strerror(errno);
+  } else if (rc == 0) {
+    FAIL() << "failed to read response from tombstoned (EOF)";
+  } else if (rc != sizeof(response)) {
+    FAIL() << "received packet of unexpected length from tombstoned: expected " << sizeof(response)
+           << ", received " << rc;
+  }
+
+  ASSERT_EQ(InterceptStatus::kRegistered, response.status);
+}
+
 class CrasherTest : public ::testing::Test {
  public:
   pid_t crasher_pid = -1;
@@ -118,38 +167,7 @@
     FAIL() << "crasher hasn't been started";
   }
 
-  intercept_fd.reset(socket_local_client(kTombstonedInterceptSocketName,
-                                         ANDROID_SOCKET_NAMESPACE_RESERVED, SOCK_SEQPACKET));
-  if (intercept_fd == -1) {
-    FAIL() << "failed to contact tombstoned: " << strerror(errno);
-  }
-
-  InterceptRequest req = {.pid = crasher_pid };
-
-  unique_fd output_pipe_write;
-  if (!Pipe(output_fd, &output_pipe_write)) {
-    FAIL() << "failed to create output pipe: " << strerror(errno);
-  }
-
-  std::string pipe_size_str;
-  int pipe_buffer_size;
-  if (!android::base::ReadFileToString("/proc/sys/fs/pipe-max-size", &pipe_size_str)) {
-    FAIL() << "failed to read /proc/sys/fs/pipe-max-size: " << strerror(errno);
-  }
-
-  pipe_size_str = android::base::Trim(pipe_size_str);
-
-  if (!android::base::ParseInt(pipe_size_str.c_str(), &pipe_buffer_size, 0)) {
-    FAIL() << "failed to parse pipe max size";
-  }
-
-  if (fcntl(output_fd->get(), F_SETPIPE_SZ, pipe_buffer_size) != pipe_buffer_size) {
-    FAIL() << "failed to set pipe size: " << strerror(errno);
-  }
-
-  if (send_fd(intercept_fd.get(), &req, sizeof(req), std::move(output_pipe_write)) != sizeof(req)) {
-    FAIL() << "failed to send output fd to tombstoned: " << strerror(errno);
-  }
+  tombstoned_intercept(crasher_pid, &this->intercept_fd, output_fd);
 }
 
 void CrasherTest::FinishIntercept(int* result) {
@@ -165,7 +183,7 @@
     FAIL() << "received packet of unexpected length from tombstoned: expected " << sizeof(response)
            << ", received " << rc;
   } else {
-    *result = response.success;
+    *result = response.status == InterceptStatus::kStarted ? 1 : 0;
   }
 }
 
@@ -508,3 +526,97 @@
     ASSERT_EQ(0, WEXITSTATUS(status));
   }
 }
+
+TEST(tombstoned, no_notify) {
+  // Do this a few times.
+  for (int i = 0; i < 3; ++i) {
+    pid_t pid = 123'456'789 + i;
+
+    unique_fd intercept_fd, output_fd;
+    tombstoned_intercept(pid, &intercept_fd, &output_fd);
+
+    {
+      unique_fd tombstoned_socket, input_fd;
+      ASSERT_TRUE(tombstoned_connect(pid, &tombstoned_socket, &input_fd));
+      ASSERT_TRUE(android::base::WriteFully(input_fd.get(), &pid, sizeof(pid)));
+    }
+
+    pid_t read_pid;
+    ASSERT_TRUE(android::base::ReadFully(output_fd.get(), &read_pid, sizeof(read_pid)));
+    ASSERT_EQ(read_pid, pid);
+  }
+}
+
+TEST(tombstoned, stress) {
+  // Spawn threads to simultaneously do a bunch of failing dumps and a bunch of successful dumps.
+  static constexpr int kDumpCount = 100;
+
+  std::atomic<bool> start(false);
+  std::vector<std::thread> threads;
+  threads.emplace_back([&start]() {
+    while (!start) {
+      continue;
+    }
+
+    // Use a way out of range pid, to avoid stomping on an actual process.
+    pid_t pid_base = 1'000'000;
+
+    for (int dump = 0; dump < kDumpCount; ++dump) {
+      pid_t pid = pid_base + dump;
+
+      unique_fd intercept_fd, output_fd;
+      tombstoned_intercept(pid, &intercept_fd, &output_fd);
+
+      // Pretend to crash, and then immediately close the socket.
+      unique_fd sockfd(socket_local_client(kTombstonedCrashSocketName,
+                                           ANDROID_SOCKET_NAMESPACE_RESERVED, SOCK_SEQPACKET));
+      if (sockfd == -1) {
+        FAIL() << "failed to connect to tombstoned: " << strerror(errno);
+      }
+      TombstonedCrashPacket packet = {};
+      packet.packet_type = CrashPacketType::kDumpRequest;
+      packet.packet.dump_request.pid = pid;
+      if (TEMP_FAILURE_RETRY(write(sockfd, &packet, sizeof(packet))) != sizeof(packet)) {
+        FAIL() << "failed to write to tombstoned: " << strerror(errno);
+      }
+
+      continue;
+    }
+  });
+
+  threads.emplace_back([&start]() {
+    while (!start) {
+      continue;
+    }
+
+    // Use a way out of range pid, to avoid stomping on an actual process.
+    pid_t pid_base = 2'000'000;
+
+    for (int dump = 0; dump < kDumpCount; ++dump) {
+      pid_t pid = pid_base + dump;
+
+      unique_fd intercept_fd, output_fd;
+      tombstoned_intercept(pid, &intercept_fd, &output_fd);
+
+      {
+        unique_fd tombstoned_socket, input_fd;
+        ASSERT_TRUE(tombstoned_connect(pid, &tombstoned_socket, &input_fd));
+        ASSERT_TRUE(android::base::WriteFully(input_fd.get(), &pid, sizeof(pid)));
+        tombstoned_notify_completion(tombstoned_socket.get());
+      }
+
+      // TODO: Fix the race that requires this sleep.
+      std::this_thread::sleep_for(50ms);
+
+      pid_t read_pid;
+      ASSERT_TRUE(android::base::ReadFully(output_fd.get(), &read_pid, sizeof(read_pid)));
+      ASSERT_EQ(read_pid, pid);
+    }
+  });
+
+  start = true;
+
+  for (std::thread& thread : threads) {
+    thread.join();
+  }
+}
diff --git a/debuggerd/include/debuggerd/protocol.h b/debuggerd/include/debuggerd/protocol.h
index bb2ab0d..0756876 100644
--- a/debuggerd/include/debuggerd/protocol.h
+++ b/debuggerd/include/debuggerd/protocol.h
@@ -56,8 +56,14 @@
   int32_t pid;
 };
 
+enum class InterceptStatus : uint8_t {
+  kFailed,
+  kStarted,
+  kRegistered,
+};
+
 // Sent either immediately upon failure, or when the intercept has been used.
 struct InterceptResponse {
-  uint8_t success;          // 0 or 1
+  InterceptStatus status;
   char error_message[127];  // always null-terminated
 };
diff --git a/debuggerd/tombstoned/intercept_manager.cpp b/debuggerd/tombstoned/intercept_manager.cpp
index 789260d..dff942c 100644
--- a/debuggerd/tombstoned/intercept_manager.cpp
+++ b/debuggerd/tombstoned/intercept_manager.cpp
@@ -105,6 +105,7 @@
     // We trust the other side, so only do minimal validity checking.
     if (intercept_request.pid <= 0 || intercept_request.pid > std::numeric_limits<pid_t>::max()) {
       InterceptResponse response = {};
+      response.status = InterceptStatus::kFailed;
       snprintf(response.error_message, sizeof(response.error_message), "invalid pid %" PRId32,
                intercept_request.pid);
       TEMP_FAILURE_RETRY(write(sockfd, &response, sizeof(response)));
@@ -113,9 +114,10 @@
 
     intercept->intercept_pid = intercept_request.pid;
 
-    // Register the intercept with the InterceptManager.
+    // Check if it's already registered.
     if (intercept_manager->intercepts.count(intercept_request.pid) > 0) {
       InterceptResponse response = {};
+      response.status = InterceptStatus::kFailed;
       snprintf(response.error_message, sizeof(response.error_message),
                "pid %" PRId32 " already intercepted", intercept_request.pid);
       TEMP_FAILURE_RETRY(write(sockfd, &response, sizeof(response)));
@@ -123,6 +125,15 @@
       goto fail;
     }
 
+    // Let the other side know that the intercept has been registered, now that we know we can't
+    // fail. tombstoned is single threaded, so this isn't racy.
+    InterceptResponse response = {};
+    response.status = InterceptStatus::kRegistered;
+    if (TEMP_FAILURE_RETRY(write(sockfd, &response, sizeof(response))) == -1) {
+      PLOG(WARNING) << "failed to notify interceptor of registration";
+      goto fail;
+    }
+
     intercept->output_fd = std::move(rcv_fd);
     intercept_manager->intercepts[intercept_request.pid] = std::unique_ptr<Intercept>(intercept);
     intercept->registered = true;
@@ -174,7 +185,7 @@
 
   LOG(INFO) << "found intercept fd " << intercept->output_fd.get() << " for pid " << pid;
   InterceptResponse response = {};
-  response.success = 1;
+  response.status = InterceptStatus::kStarted;
   TEMP_FAILURE_RETRY(write(intercept->sockfd, &response, sizeof(response)));
   *out_fd = std::move(intercept->output_fd);
 
diff --git a/debuggerd/tombstoned/tombstoned.cpp b/debuggerd/tombstoned/tombstoned.cpp
index 6754508..2248a21 100644
--- a/debuggerd/tombstoned/tombstoned.cpp
+++ b/debuggerd/tombstoned/tombstoned.cpp
@@ -126,9 +126,7 @@
   return result;
 }
 
-static void dequeue_request(Crash* crash) {
-  ++num_concurrent_dumps;
-
+static void perform_request(Crash* crash) {
   unique_fd output_fd;
   if (!intercept_manager->GetIntercept(crash->crash_pid, &output_fd)) {
     output_fd = get_tombstone_fd();
@@ -153,12 +151,22 @@
                  crash_completed_cb, crash);
     event_add(crash->crash_event, &timeout);
   }
+
+  ++num_concurrent_dumps;
   return;
 
 fail:
   delete crash;
 }
 
+static void dequeue_requests() {
+  while (!queued_requests.empty() && num_concurrent_dumps < kMaxConcurrentDumps) {
+    Crash* next_crash = queued_requests.front();
+    queued_requests.pop_front();
+    perform_request(next_crash);
+  }
+}
+
 static void crash_accept_cb(evconnlistener* listener, evutil_socket_t sockfd, sockaddr*, int,
                             void*) {
   event_base* base = evconnlistener_get_base(listener);
@@ -207,7 +215,7 @@
     LOG(INFO) << "enqueueing crash request for pid " << crash->crash_pid;
     queued_requests.push_back(crash);
   } else {
-    dequeue_request(crash);
+    perform_request(crash);
   }
 
   return;
@@ -247,11 +255,7 @@
   delete crash;
 
   // If there's something queued up, let them proceed.
-  if (!queued_requests.empty()) {
-    Crash* next_crash = queued_requests.front();
-    queued_requests.pop_front();
-    dequeue_request(next_crash);
-  }
+  dequeue_requests();
 }
 
 int main(int, char* []) {