Rewrite input transport using sockets.
Since we will not longer be modifying events in place, we don't need
to use an ashmem region for input. Simplified the code to instead
use a socket of type SOCK_SEQPACKET.
This is part of a series of changes to improve input system pipelining.
Bug: 5963420
Change-Id: I05909075ed8b61b93900913e44c6db84857340d8
diff --git a/services/input/InputDispatcher.cpp b/services/input/InputDispatcher.cpp
index 1953aa8..87cf0dea 100644
--- a/services/input/InputDispatcher.cpp
+++ b/services/input/InputDispatcher.cpp
@@ -196,8 +196,8 @@
drainInboundQueueLocked();
}
- while (mConnectionsByReceiveFd.size() != 0) {
- unregisterInputChannel(mConnectionsByReceiveFd.valueAt(0)->inputChannel);
+ while (mConnectionsByFd.size() != 0) {
+ unregisterInputChannel(mConnectionsByFd.valueAt(0)->inputChannel);
}
}
@@ -888,7 +888,7 @@
ssize_t connectionIndex = getConnectionIndexLocked(inputTarget.inputChannel);
if (connectionIndex >= 0) {
- sp<Connection> connection = mConnectionsByReceiveFd.valueAt(connectionIndex);
+ sp<Connection> connection = mConnectionsByFd.valueAt(connectionIndex);
prepareDispatchCycleLocked(currentTime, connection, eventEntry, &inputTarget);
} else {
#if DEBUG_FOCUS
@@ -994,7 +994,7 @@
if (inputChannel.get()) {
ssize_t connectionIndex = getConnectionIndexLocked(inputChannel);
if (connectionIndex >= 0) {
- sp<Connection> connection = mConnectionsByReceiveFd.valueAt(connectionIndex);
+ sp<Connection> connection = mConnectionsByFd.valueAt(connectionIndex);
if (connection->status == Connection::STATUS_NORMAL) {
CancelationOptions options(CancelationOptions::CANCEL_ALL_EVENTS,
"application not responding");
@@ -1643,7 +1643,7 @@
const sp<InputWindowHandle>& windowHandle) {
ssize_t connectionIndex = getConnectionIndexLocked(windowHandle->getInputChannel());
if (connectionIndex >= 0) {
- sp<Connection> connection = mConnectionsByReceiveFd.valueAt(connectionIndex);
+ sp<Connection> connection = mConnectionsByFd.valueAt(connectionIndex);
return connection->outboundQueue.isEmpty();
} else {
return true;
@@ -1957,15 +1957,6 @@
}
}
- // Send the dispatch signal.
- status = connection->inputPublisher.sendDispatchSignal();
- if (status) {
- ALOGE("channel '%s' ~ Could not send dispatch signal, status=%d",
- connection->getInputChannelName(), status);
- abortBrokenDispatchCycleLocked(currentTime, connection, true /*notify*/);
- return;
- }
-
// Record information about the newly started dispatch cycle.
connection->lastEventTime = eventEntry->eventTime;
connection->lastDispatchTime = currentTime;
@@ -1990,17 +1981,6 @@
return;
}
- // Reset the publisher since the event has been consumed.
- // We do this now so that the publisher can release some of its internal resources
- // while waiting for the next dispatch cycle to begin.
- status_t status = connection->inputPublisher.reset();
- if (status) {
- ALOGE("channel '%s' ~ Could not reset publisher, status=%d",
- connection->getInputChannelName(), status);
- abortBrokenDispatchCycleLocked(currentTime, connection, true /*notify*/);
- return;
- }
-
// Notify other system components and prepare to start the next dispatch cycle.
onDispatchCycleFinishedLocked(currentTime, connection, handled);
}
@@ -2064,21 +2044,21 @@
deactivateConnectionLocked(connection);
}
-int InputDispatcher::handleReceiveCallback(int receiveFd, int events, void* data) {
+int InputDispatcher::handleReceiveCallback(int fd, int events, void* data) {
InputDispatcher* d = static_cast<InputDispatcher*>(data);
{ // acquire lock
AutoMutex _l(d->mLock);
- ssize_t connectionIndex = d->mConnectionsByReceiveFd.indexOfKey(receiveFd);
+ ssize_t connectionIndex = d->mConnectionsByFd.indexOfKey(fd);
if (connectionIndex < 0) {
ALOGE("Received spurious receive callback for unknown input channel. "
- "fd=%d, events=0x%x", receiveFd, events);
+ "fd=%d, events=0x%x", fd, events);
return 0; // remove the callback
}
bool notify;
- sp<Connection> connection = d->mConnectionsByReceiveFd.valueAt(connectionIndex);
+ sp<Connection> connection = d->mConnectionsByFd.valueAt(connectionIndex);
if (!(events & (ALOOPER_EVENT_ERROR | ALOOPER_EVENT_HANGUP))) {
if (!(events & ALOOPER_EVENT_INPUT)) {
ALOGW("channel '%s' ~ Received spurious callback for unhandled poll event. "
@@ -2117,9 +2097,9 @@
void InputDispatcher::synthesizeCancelationEventsForAllConnectionsLocked(
const CancelationOptions& options) {
- for (size_t i = 0; i < mConnectionsByReceiveFd.size(); i++) {
+ for (size_t i = 0; i < mConnectionsByFd.size(); i++) {
synthesizeCancelationEventsForConnectionLocked(
- mConnectionsByReceiveFd.valueAt(i), options);
+ mConnectionsByFd.valueAt(i), options);
}
}
@@ -2128,7 +2108,7 @@
ssize_t index = getConnectionIndexLocked(channel);
if (index >= 0) {
synthesizeCancelationEventsForConnectionLocked(
- mConnectionsByReceiveFd.valueAt(index), options);
+ mConnectionsByFd.valueAt(index), options);
}
}
@@ -2968,8 +2948,8 @@
ssize_t fromConnectionIndex = getConnectionIndexLocked(fromChannel);
ssize_t toConnectionIndex = getConnectionIndexLocked(toChannel);
if (fromConnectionIndex >= 0 && toConnectionIndex >= 0) {
- sp<Connection> fromConnection = mConnectionsByReceiveFd.valueAt(fromConnectionIndex);
- sp<Connection> toConnection = mConnectionsByReceiveFd.valueAt(toConnectionIndex);
+ sp<Connection> fromConnection = mConnectionsByFd.valueAt(fromConnectionIndex);
+ sp<Connection> toConnection = mConnectionsByFd.valueAt(toConnectionIndex);
fromConnection->inputState.copyPointerStateTo(toConnection->inputState);
CancelationOptions options(CancelationOptions::CANCEL_POINTER_EVENTS,
@@ -3134,21 +3114,15 @@
}
sp<Connection> connection = new Connection(inputChannel, inputWindowHandle, monitor);
- status_t status = connection->initialize();
- if (status) {
- ALOGE("Failed to initialize input publisher for input channel '%s', status=%d",
- inputChannel->getName().string(), status);
- return status;
- }
- int32_t receiveFd = inputChannel->getReceivePipeFd();
- mConnectionsByReceiveFd.add(receiveFd, connection);
+ int32_t fd = inputChannel->getFd();
+ mConnectionsByFd.add(fd, connection);
if (monitor) {
mMonitoringChannels.push(inputChannel);
}
- mLooper->addFd(receiveFd, 0, ALOOPER_EVENT_INPUT, handleReceiveCallback, this);
+ mLooper->addFd(fd, 0, ALOOPER_EVENT_INPUT, handleReceiveCallback, this);
runCommandsLockedInterruptible();
} // release lock
@@ -3184,14 +3158,14 @@
return BAD_VALUE;
}
- sp<Connection> connection = mConnectionsByReceiveFd.valueAt(connectionIndex);
- mConnectionsByReceiveFd.removeItemsAt(connectionIndex);
+ sp<Connection> connection = mConnectionsByFd.valueAt(connectionIndex);
+ mConnectionsByFd.removeItemsAt(connectionIndex);
if (connection->monitor) {
removeMonitorChannelLocked(inputChannel);
}
- mLooper->removeFd(inputChannel->getReceivePipeFd());
+ mLooper->removeFd(inputChannel->getFd());
nsecs_t currentTime = now();
abortBrokenDispatchCycleLocked(currentTime, connection, notify);
@@ -3212,9 +3186,9 @@
}
ssize_t InputDispatcher::getConnectionIndexLocked(const sp<InputChannel>& inputChannel) {
- ssize_t connectionIndex = mConnectionsByReceiveFd.indexOfKey(inputChannel->getReceivePipeFd());
+ ssize_t connectionIndex = mConnectionsByFd.indexOfKey(inputChannel->getFd());
if (connectionIndex >= 0) {
- sp<Connection> connection = mConnectionsByReceiveFd.valueAt(connectionIndex);
+ sp<Connection> connection = mConnectionsByFd.valueAt(connectionIndex);
if (connection->inputChannel.get() == inputChannel.get()) {
return connectionIndex;
}
@@ -4052,10 +4026,6 @@
InputDispatcher::Connection::~Connection() {
}
-status_t InputDispatcher::Connection::initialize() {
- return inputPublisher.initialize();
-}
-
const char* InputDispatcher::Connection::getStatusLabel() const {
switch (status) {
case STATUS_NORMAL: