blob: 2e9fe7967af7cfd3e06352470800a08b1e9ec403 [file] [log] [blame]
Primiano Tucci4f9b6d72017-12-05 20:59:16 +00001/*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "src/ipc/host_impl.h"
18
19#include <inttypes.h>
20
21#include <algorithm>
22#include <utility>
23
24#include "perfetto/base/task_runner.h"
25#include "perfetto/base/utils.h"
26#include "perfetto/ipc/service.h"
27#include "perfetto/ipc/service_descriptor.h"
28
29#include "src/ipc/wire_protocol.pb.h"
30
31// TODO(primiano): put limits on #connections/uid and req. queue (b/69093705).
32
33namespace perfetto {
34namespace ipc {
35
36// static
37std::unique_ptr<Host> Host::CreateInstance(const char* socket_name,
38 base::TaskRunner* task_runner) {
39 std::unique_ptr<HostImpl> host(new HostImpl(socket_name, task_runner));
40 if (!host->sock()->is_listening())
41 return nullptr;
42 return std::move(host);
43}
44
Primiano Tucci9b5d3362017-12-21 21:42:17 +010045// static
46std::unique_ptr<Host> Host::CreateInstance(base::ScopedFile socket_fd,
47 base::TaskRunner* task_runner) {
48 std::unique_ptr<HostImpl> host(
49 new HostImpl(std::move(socket_fd), task_runner));
50 if (!host->sock()->is_listening())
51 return nullptr;
52 return std::move(host);
53}
54
55HostImpl::HostImpl(base::ScopedFile socket_fd, base::TaskRunner* task_runner)
56 : task_runner_(task_runner), weak_ptr_factory_(this) {
57 GOOGLE_PROTOBUF_VERIFY_VERSION;
58 PERFETTO_DCHECK_THREAD(thread_checker_);
59 sock_ = UnixSocket::Listen(std::move(socket_fd), this, task_runner_);
60}
61
Primiano Tucci4f9b6d72017-12-05 20:59:16 +000062HostImpl::HostImpl(const char* socket_name, base::TaskRunner* task_runner)
63 : task_runner_(task_runner), weak_ptr_factory_(this) {
64 GOOGLE_PROTOBUF_VERIFY_VERSION;
65 PERFETTO_DCHECK_THREAD(thread_checker_);
66 sock_ = UnixSocket::Listen(socket_name, this, task_runner_);
67}
68
69HostImpl::~HostImpl() = default;
70
71bool HostImpl::ExposeService(std::unique_ptr<Service> service) {
72 PERFETTO_DCHECK_THREAD(thread_checker_);
73 const std::string& service_name = service->GetDescriptor().service_name;
74 if (GetServiceByName(service_name)) {
75 PERFETTO_DLOG("Duplicate ExposeService(): %s", service_name.c_str());
76 return false;
77 }
78 ServiceID sid = ++last_service_id_;
79 ExposedService exposed_service(sid, service_name, std::move(service));
80 services_.emplace(sid, std::move(exposed_service));
81 return true;
82}
83
84void HostImpl::OnNewIncomingConnection(UnixSocket*,
85 std::unique_ptr<UnixSocket> new_conn) {
86 PERFETTO_DCHECK_THREAD(thread_checker_);
87 std::unique_ptr<ClientConnection> client(new ClientConnection());
88 ClientID client_id = ++last_client_id_;
89 clients_by_socket_[new_conn.get()] = client.get();
90 client->id = client_id;
91 client->sock = std::move(new_conn);
92 clients_[client_id] = std::move(client);
93}
94
95void HostImpl::OnDataAvailable(UnixSocket* sock) {
96 PERFETTO_DCHECK_THREAD(thread_checker_);
97 auto it = clients_by_socket_.find(sock);
98 if (it == clients_by_socket_.end())
99 return;
100 ClientConnection* client = it->second;
101 BufferedFrameDeserializer& frame_deserializer = client->frame_deserializer;
102
103 size_t rsize;
104 do {
105 auto buf = frame_deserializer.BeginReceive();
Florian Mayerd16508e2018-03-02 17:06:40 +0000106 base::ScopedFile fd;
107 rsize = client->sock->Receive(buf.data, buf.size, &fd);
108 if (fd) {
109 PERFETTO_DCHECK(!client->received_fd);
110 client->received_fd = std::move(fd);
111 }
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000112 if (!frame_deserializer.EndReceive(rsize))
113 return OnDisconnect(client->sock.get());
114 } while (rsize > 0);
115
116 for (;;) {
117 std::unique_ptr<Frame> frame = frame_deserializer.PopNextFrame();
118 if (!frame)
119 break;
120 OnReceivedFrame(client, *frame);
121 }
122}
123
124void HostImpl::OnReceivedFrame(ClientConnection* client,
125 const Frame& req_frame) {
Florian Mayer22e4b392018-03-08 10:20:11 +0000126 if (req_frame.msg_case() == Frame::kMsgBindService)
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000127 return OnBindService(client, req_frame);
Florian Mayer22e4b392018-03-08 10:20:11 +0000128 if (req_frame.msg_case() == Frame::kMsgInvokeMethod)
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000129 return OnInvokeMethod(client, req_frame);
Florian Mayer22e4b392018-03-08 10:20:11 +0000130
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000131 PERFETTO_DLOG("Received invalid RPC frame %u from client %" PRIu64,
132 req_frame.msg_case(), client->id);
133 Frame reply_frame;
134 reply_frame.set_request_id(req_frame.request_id());
135 reply_frame.mutable_msg_request_error()->set_error("unknown request");
136 SendFrame(client, reply_frame);
137}
138
139void HostImpl::OnBindService(ClientConnection* client, const Frame& req_frame) {
140 // Binding a service doesn't do anything major. It just returns back the
141 // service id and its method map.
142 const Frame::BindService& req = req_frame.msg_bind_service();
143 Frame reply_frame;
144 reply_frame.set_request_id(req_frame.request_id());
145 auto* reply = reply_frame.mutable_msg_bind_service_reply();
146 const ExposedService* service = GetServiceByName(req.service_name());
147 if (service) {
148 reply->set_success(true);
149 reply->set_service_id(service->id);
150 uint32_t method_id = 1; // method ids start at index 1.
151 for (const auto& desc_method : service->instance->GetDescriptor().methods) {
152 Frame::BindServiceReply::MethodInfo* method_info = reply->add_methods();
153 method_info->set_name(desc_method.name);
154 method_info->set_id(method_id++);
155 }
156 }
157 SendFrame(client, reply_frame);
158}
159
160void HostImpl::OnInvokeMethod(ClientConnection* client,
161 const Frame& req_frame) {
162 const Frame::InvokeMethod& req = req_frame.msg_invoke_method();
163 Frame reply_frame;
164 RequestID request_id = req_frame.request_id();
165 reply_frame.set_request_id(request_id);
166 reply_frame.mutable_msg_invoke_method_reply()->set_success(false);
167 auto svc_it = services_.find(req.service_id());
168 if (svc_it == services_.end())
169 return SendFrame(client, reply_frame); // |success| == false by default.
170
171 Service* service = svc_it->second.instance.get();
172 const ServiceDescriptor& svc = service->GetDescriptor();
173 const auto& methods = svc.methods;
Primiano Tucci3cbb10a2018-04-10 17:52:40 +0100174 const uint32_t method_id = req.method_id();
175 if (method_id == 0 || method_id > methods.size())
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000176 return SendFrame(client, reply_frame);
177
Primiano Tucci3cbb10a2018-04-10 17:52:40 +0100178 const ServiceDescriptor::Method& method = methods[method_id - 1];
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000179 std::unique_ptr<ProtoMessage> decoded_req_args(
180 method.request_proto_decoder(req.args_proto()));
181 if (!decoded_req_args)
182 return SendFrame(client, reply_frame);
183
184 Deferred<ProtoMessage> deferred_reply;
185 base::WeakPtr<HostImpl> host_weak_ptr = weak_ptr_factory_.GetWeakPtr();
186 ClientID client_id = client->id;
Primiano Tucci2d0b2252018-01-25 13:37:50 +0000187
Primiano Tucci3e69ed92018-03-14 14:52:29 +0000188 if (!req.drop_reply()) {
Primiano Tucci2d0b2252018-01-25 13:37:50 +0000189 deferred_reply.Bind([host_weak_ptr, client_id,
190 request_id](AsyncResult<ProtoMessage> reply) {
191 if (!host_weak_ptr)
192 return; // The reply came too late, the HostImpl has gone.
193 host_weak_ptr->ReplyToMethodInvocation(client_id, request_id,
194 std::move(reply));
195 });
196 }
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000197
198 service->client_info_ = ClientInfo(client->id, client->sock->peer_uid());
Florian Mayerd16508e2018-03-02 17:06:40 +0000199 service->received_fd_ = &client->received_fd;
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000200 method.invoker(service, *decoded_req_args, std::move(deferred_reply));
Florian Mayerd16508e2018-03-02 17:06:40 +0000201 service->received_fd_ = nullptr;
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000202 service->client_info_ = ClientInfo();
203}
204
205void HostImpl::ReplyToMethodInvocation(ClientID client_id,
206 RequestID request_id,
207 AsyncResult<ProtoMessage> reply) {
208 auto client_iter = clients_.find(client_id);
209 if (client_iter == clients_.end())
210 return; // client has disconnected by the time we got the async reply.
211
212 ClientConnection* client = client_iter->second.get();
213 Frame reply_frame;
214 reply_frame.set_request_id(request_id);
215
Florian Mayeraab53552018-01-24 14:13:55 +0000216 // TODO(fmayer): add a test to guarantee that the reply is consumed within the
217 // same call stack and not kept around. ConsumerIPCService::OnTraceData()
218 // relies on this behavior.
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000219 auto* reply_frame_data = reply_frame.mutable_msg_invoke_method_reply();
220 reply_frame_data->set_has_more(reply.has_more());
221 if (reply.success()) {
222 std::string reply_proto;
223 if (reply->SerializeToString(&reply_proto)) {
224 reply_frame_data->set_reply_proto(reply_proto);
225 reply_frame_data->set_success(true);
226 }
227 }
228 SendFrame(client, reply_frame, reply.fd());
229}
230
231// static
232void HostImpl::SendFrame(ClientConnection* client, const Frame& frame, int fd) {
233 std::string buf = BufferedFrameDeserializer::Serialize(frame);
234
Primiano Tucci7a265b62017-12-07 18:37:31 +0000235 // TODO(primiano): this should do non-blocking I/O. But then what if the
236 // socket buffer is full? We might want to either drop the request or throttle
237 // the send and PostTask the reply later? Right now we are making Send()
238 // blocking as a workaround. Propagate bakpressure to the caller instead.
239 bool res = client->sock->Send(buf.data(), buf.size(), fd,
240 UnixSocket::BlockingMode::kBlocking);
241 PERFETTO_CHECK(res || !client->sock->is_connected());
Primiano Tucci4f9b6d72017-12-05 20:59:16 +0000242}
243
244void HostImpl::OnDisconnect(UnixSocket* sock) {
245 PERFETTO_DCHECK_THREAD(thread_checker_);
246 auto it = clients_by_socket_.find(sock);
247 if (it == clients_by_socket_.end())
248 return;
249 ClientID client_id = it->second->id;
250 ClientInfo client_info(client_id, sock->peer_uid());
251 clients_by_socket_.erase(it);
252 PERFETTO_DCHECK(clients_.count(client_id));
253 clients_.erase(client_id);
254
255 for (const auto& service_it : services_) {
256 Service& service = *service_it.second.instance;
257 service.client_info_ = client_info;
258 service.OnClientDisconnected();
259 service.client_info_ = ClientInfo();
260 }
261}
262
263const HostImpl::ExposedService* HostImpl::GetServiceByName(
264 const std::string& name) {
265 // This could be optimized by using another map<name,ServiceID>. However this
266 // is used only by Bind/ExposeService that are quite rare (once per client
267 // connection and once per service instance), not worth it.
268 for (const auto& it : services_) {
269 if (it.second.name == name)
270 return &it.second;
271 }
272 return nullptr;
273}
274
275HostImpl::ExposedService::ExposedService(ServiceID id_,
276 const std::string& name_,
277 std::unique_ptr<Service> instance_)
278 : id(id_), name(name_), instance(std::move(instance_)) {}
279
280HostImpl::ExposedService::ExposedService(ExposedService&&) noexcept = default;
281HostImpl::ExposedService& HostImpl::ExposedService::operator=(
282 HostImpl::ExposedService&&) = default;
283HostImpl::ExposedService::~ExposedService() = default;
284
285HostImpl::ClientConnection::~ClientConnection() = default;
286
287} // namespace ipc
288} // namespace perfetto