blob: 9a99cb51760a2d1be0d3e18ac5d95849c0766b1d [file] [log] [blame]
Primiano Tucci4f9b6d72017-12-05 20:59:16 +00001/*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "perfetto/base/unix_task_runner.h"
18
19#include "perfetto/base/build_config.h"
20
Primiano Tucci5aab7582017-12-07 12:22:03 +000021#include <errno.h>
Primiano Tucci4f9b6d72017-12-05 20:59:16 +000022#include <fcntl.h>
23#include <stdlib.h>
24#include <unistd.h>
25
Primiano Tucci8934c6c2018-03-15 11:39:27 +000026#include <limits>
27
Primiano Tucci4f9b6d72017-12-05 20:59:16 +000028namespace perfetto {
29namespace base {
30
31UnixTaskRunner::UnixTaskRunner() {
32 // Create a self-pipe which is used to wake up the main thread from inside
33 // poll(2).
34 int pipe_fds[2];
35 PERFETTO_CHECK(pipe(pipe_fds) == 0);
36
37 // Make the pipe non-blocking so that we never block the waking thread (either
38 // the main thread or another one) when scheduling a wake-up.
39 for (auto fd : pipe_fds) {
40 int flags = fcntl(fd, F_GETFL, 0);
41 PERFETTO_CHECK(flags != -1);
42 PERFETTO_CHECK(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
43 PERFETTO_CHECK(fcntl(fd, F_SETFD, FD_CLOEXEC) == 0);
44 }
45 control_read_.reset(pipe_fds[0]);
46 control_write_.reset(pipe_fds[1]);
47
Oystein Eftevaagff729592018-02-12 14:24:06 -080048#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX)
Primiano Tucci4f9b6d72017-12-05 20:59:16 +000049 // We are never expecting to have more than a few bytes in the wake-up pipe.
50 // Reduce the buffer size on Linux. Note that this gets rounded up to the page
51 // size.
52 PERFETTO_CHECK(fcntl(control_read_.get(), F_SETPIPE_SZ, 1) > 0);
53#endif
54
55 AddFileDescriptorWatch(control_read_.get(), [] {
56 // Not reached -- see PostFileDescriptorWatches().
57 PERFETTO_DCHECK(false);
58 });
59}
60
61UnixTaskRunner::~UnixTaskRunner() = default;
62
Primiano Tucci4f9b6d72017-12-05 20:59:16 +000063void UnixTaskRunner::WakeUp() {
64 const char dummy = 'P';
65 if (write(control_write_.get(), &dummy, 1) <= 0 && errno != EAGAIN)
66 PERFETTO_DPLOG("write()");
67}
68
69void UnixTaskRunner::Run() {
70 PERFETTO_DCHECK_THREAD(thread_checker_);
71 quit_ = false;
72 while (true) {
73 int poll_timeout_ms;
74 {
75 std::lock_guard<std::mutex> lock(lock_);
76 if (quit_)
77 return;
Primiano Tucci8934c6c2018-03-15 11:39:27 +000078 poll_timeout_ms = GetDelayMsToNextTaskLocked();
Primiano Tucci4f9b6d72017-12-05 20:59:16 +000079 UpdateWatchTasksLocked();
80 }
81 int ret = PERFETTO_EINTR(poll(
82 &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
83 PERFETTO_CHECK(ret >= 0);
84
85 // To avoid starvation we always interleave all types of tasks -- immediate,
86 // delayed and file descriptor watches.
87 PostFileDescriptorWatches();
88 RunImmediateAndDelayedTask();
89 }
90}
91
92void UnixTaskRunner::Quit() {
93 {
94 std::lock_guard<std::mutex> lock(lock_);
95 quit_ = true;
96 }
97 WakeUp();
98}
99
100bool UnixTaskRunner::IsIdleForTesting() {
101 std::lock_guard<std::mutex> lock(lock_);
102 return immediate_tasks_.empty();
103}
104
105void UnixTaskRunner::UpdateWatchTasksLocked() {
106 PERFETTO_DCHECK_THREAD(thread_checker_);
107 if (!watch_tasks_changed_)
108 return;
109 watch_tasks_changed_ = false;
110 poll_fds_.clear();
111 for (auto& it : watch_tasks_) {
112 it.second.poll_fd_index = poll_fds_.size();
113 poll_fds_.push_back({it.first, POLLIN | POLLHUP, 0});
114 }
115}
116
117void UnixTaskRunner::RunImmediateAndDelayedTask() {
118 // TODO(skyostil): Add a separate work queue in case in case locking overhead
119 // becomes an issue.
120 std::function<void()> immediate_task;
121 std::function<void()> delayed_task;
Primiano Tucci8934c6c2018-03-15 11:39:27 +0000122 TimeMillis now = GetWallTimeMs();
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000123 {
124 std::lock_guard<std::mutex> lock(lock_);
125 if (!immediate_tasks_.empty()) {
126 immediate_task = std::move(immediate_tasks_.front());
127 immediate_tasks_.pop_front();
128 }
129 if (!delayed_tasks_.empty()) {
130 auto it = delayed_tasks_.begin();
131 if (now >= it->first) {
132 delayed_task = std::move(it->second);
133 delayed_tasks_.erase(it);
134 }
135 }
136 }
Primiano Tucci5aab7582017-12-07 12:22:03 +0000137
138 errno = 0;
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000139 if (immediate_task)
Florian Mayer26ecb392018-02-01 17:25:31 +0000140 RunTask(immediate_task);
Primiano Tucci5aab7582017-12-07 12:22:03 +0000141 errno = 0;
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000142 if (delayed_task)
Florian Mayer26ecb392018-02-01 17:25:31 +0000143 RunTask(delayed_task);
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000144}
145
146void UnixTaskRunner::PostFileDescriptorWatches() {
147 PERFETTO_DCHECK_THREAD(thread_checker_);
148 for (size_t i = 0; i < poll_fds_.size(); i++) {
149 if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
150 continue;
151 poll_fds_[i].revents = 0;
152
153 // The wake-up event is handled inline to avoid an infinite recursion of
154 // posted tasks.
155 if (poll_fds_[i].fd == control_read_.get()) {
156 // Drain the byte(s) written to the wake-up pipe. We can potentially read
157 // more than one byte if several wake-ups have been scheduled.
158 char buffer[16];
159 if (read(control_read_.get(), &buffer[0], sizeof(buffer)) <= 0 &&
160 errno != EAGAIN) {
161 PERFETTO_DPLOG("read()");
162 }
163 continue;
164 }
165
166 // Binding to |this| is safe since we are the only object executing the
167 // task.
168 PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this,
169 poll_fds_[i].fd));
170
171 // Make the fd negative while a posted task is pending. This makes poll(2)
172 // ignore the fd.
173 PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
174 poll_fds_[i].fd = -poll_fds_[i].fd;
175 }
176}
177
178void UnixTaskRunner::RunFileDescriptorWatch(int fd) {
179 std::function<void()> task;
180 {
181 std::lock_guard<std::mutex> lock(lock_);
182 auto it = watch_tasks_.find(fd);
183 if (it == watch_tasks_.end())
184 return;
185 // Make poll(2) pay attention to the fd again. Since another thread may have
186 // updated this watch we need to refresh the set first.
187 UpdateWatchTasksLocked();
188 size_t fd_index = it->second.poll_fd_index;
189 PERFETTO_DCHECK(fd_index < poll_fds_.size());
190 PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
191 poll_fds_[fd_index].fd = fd;
192 task = it->second.callback;
193 }
Primiano Tucci5aab7582017-12-07 12:22:03 +0000194 errno = 0;
Florian Mayer26ecb392018-02-01 17:25:31 +0000195 RunTask(task);
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000196}
197
Primiano Tucci8934c6c2018-03-15 11:39:27 +0000198int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000199 PERFETTO_DCHECK_THREAD(thread_checker_);
200 if (!immediate_tasks_.empty())
Primiano Tucci8934c6c2018-03-15 11:39:27 +0000201 return 0;
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000202 if (!delayed_tasks_.empty()) {
Primiano Tucci8934c6c2018-03-15 11:39:27 +0000203 TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs();
204 return std::max(0, static_cast<int>(diff.count()));
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000205 }
Primiano Tucci8934c6c2018-03-15 11:39:27 +0000206 return -1;
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000207}
208
209void UnixTaskRunner::PostTask(std::function<void()> task) {
210 bool was_empty;
211 {
212 std::lock_guard<std::mutex> lock(lock_);
213 was_empty = immediate_tasks_.empty();
214 immediate_tasks_.push_back(std::move(task));
215 }
216 if (was_empty)
217 WakeUp();
218}
219
Primiano Tucci3cbb10a2018-04-10 17:52:40 +0100220void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
221 uint32_t delay_ms) {
Primiano Tucci8934c6c2018-03-15 11:39:27 +0000222 TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000223 {
224 std::lock_guard<std::mutex> lock(lock_);
225 delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
226 }
227 WakeUp();
228}
229
230void UnixTaskRunner::AddFileDescriptorWatch(int fd,
231 std::function<void()> task) {
232 PERFETTO_DCHECK(fd >= 0);
233 {
234 std::lock_guard<std::mutex> lock(lock_);
235 PERFETTO_DCHECK(!watch_tasks_.count(fd));
236 watch_tasks_[fd] = {std::move(task), SIZE_MAX};
237 watch_tasks_changed_ = true;
238 }
239 WakeUp();
240}
241
242void UnixTaskRunner::RemoveFileDescriptorWatch(int fd) {
243 PERFETTO_DCHECK(fd >= 0);
244 {
245 std::lock_guard<std::mutex> lock(lock_);
246 PERFETTO_DCHECK(watch_tasks_.count(fd));
247 watch_tasks_.erase(fd);
248 watch_tasks_changed_ = true;
249 }
250 // No need to schedule a wake-up for this.
251}
252
253} // namespace base
254} // namespace perfetto