libsysutils: Add iovec/runOnEachSocket

SocketClient:
* Replace sendDataLocked with sendDataLockedv which takes an iovec.
* Add a version of sendData, sendDatav, which takes an iovec.
* do not preserve iovec content through sendDatav

SocketListener:
* Add runOnEachSocket, which allows to to specify a SocketClientCommand to
  run individually on each socket. This allows you to do broadcast-like
  actions customized for each individual socket.
* Client safe list reference counting for sendBroadcast & runOnEach Socket

(cherry picked from commit a6e965578e44f8ae5f98de822ba5decec381d5fc)

Signed-off-by: Nick Kralevich <nnk@google.com>
Signed-off-by: Mark Salyzyn <salyzyn@google.com>

Change-Id: I716f89c01b4cb7af900045c7e41fac1492defb06
diff --git a/include/sysutils/SocketClient.h b/include/sysutils/SocketClient.h
index 85b58ef..1004f06 100644
--- a/include/sysutils/SocketClient.h
+++ b/include/sysutils/SocketClient.h
@@ -6,22 +6,23 @@
 #include <pthread.h>
 #include <cutils/atomic.h>
 #include <sys/types.h>
+#include <sys/uio.h>
 
 class SocketClient {
     int             mSocket;
     bool            mSocketOwned;
     pthread_mutex_t mWriteMutex;
 
-    /* Peer process ID */
+    // Peer process ID
     pid_t mPid;
 
-    /* Peer user ID */
+    // Peer user ID
     uid_t mUid;
 
-    /* Peer group ID */
+    // Peer group ID
     gid_t mGid;
 
-    /* Reference count (starts at 1) */
+    // Reference count (starts at 1)
     pthread_mutex_t mRefCountMutex;
     int mRefCount;
 
@@ -38,12 +39,15 @@
     pid_t getPid() const { return mPid; }
     uid_t getUid() const { return mUid; }
     gid_t getGid() const { return mGid; }
-    void setCmdNum(int cmdNum) { android_atomic_release_store(cmdNum, &mCmdNum); }
+    void setCmdNum(int cmdNum) {
+        android_atomic_release_store(cmdNum, &mCmdNum);
+    }
     int getCmdNum() { return mCmdNum; }
 
     // Send null-terminated C strings:
     int sendMsg(int code, const char *msg, bool addErrno);
     int sendMsg(int code, const char *msg, bool addErrno, bool useCmdNum);
+    int sendMsg(const char *msg);
 
     // Provides a mechanism to send a response code to the client.
     // Sends the code and a null character.
@@ -56,6 +60,8 @@
 
     // Sending binary data:
     int sendData(const void *data, int len);
+    // iovec contents not preserved through call
+    int sendDatav(struct iovec *iov, int iovcnt);
 
     // Optional reference counting.  Reference count starts at 1.  If
     // it's decremented to 0, it deletes itself.
@@ -64,19 +70,18 @@
     void incRef();
     bool decRef(); // returns true at 0 (but note: SocketClient already deleted)
 
-    // return a new string in quotes with '\\' and '\"' escaped for "my arg" transmissions
+    // return a new string in quotes with '\\' and '\"' escaped for "my arg"
+    // transmissions
     static char *quoteArg(const char *arg);
 
 private:
-    // Send null-terminated C strings
-    int sendMsg(const char *msg);
     void init(int socket, bool owned, bool useCmdNum);
 
-    // Sending binary data. The caller should use make sure this is protected
+    // Sending binary data. The caller should make sure this is protected
     // from multiple threads entering simultaneously.
-    // returns 0 if successful, -1 if there is a 0 byte write and -2 if any other
-    // error occurred (use errno to get the error)
-    int sendDataLocked(const void *data, int len);
+    // returns 0 if successful, -1 if there is a 0 byte write or if any
+    // other error occurred (use errno to get the error)
+    int sendDataLockedv(struct iovec *iov, int iovcnt);
 };
 
 typedef android::sysutils::List<SocketClient *> SocketClientCollection;
diff --git a/include/sysutils/SocketClientCommand.h b/include/sysutils/SocketClientCommand.h
new file mode 100644
index 0000000..746bc25
--- /dev/null
+++ b/include/sysutils/SocketClientCommand.h
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2012 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _SOCKETCLIENTCOMMAND_H
+#define _SOCKETCLIENTCOMMAND_H
+
+#include <sysutils/SocketClient.h>
+
+class SocketClientCommand {
+public:
+    virtual ~SocketClientCommand() { }
+    virtual void runSocketCommand(SocketClient *client) = 0;
+};
+
+#endif
diff --git a/include/sysutils/SocketListener.h b/include/sysutils/SocketListener.h
index 8f56230..649c89a 100644
--- a/include/sysutils/SocketListener.h
+++ b/include/sysutils/SocketListener.h
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2008 The Android Open Source Project
+ * Copyright (C) 2008-2014 The Android Open Source Project
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
 #include <pthread.h>
 
 #include <sysutils/SocketClient.h>
+#include "SocketClientCommand.h"
 
 class SocketListener {
     bool                    mListen;
@@ -41,6 +42,8 @@
 
     void sendBroadcast(int code, const char *msg, bool addErrno);
 
+    void runOnEachSocket(SocketClientCommand *command);
+
 protected:
     virtual bool onDataAvailable(SocketClient *c) = 0;
 
diff --git a/libsysutils/src/SocketClient.cpp b/libsysutils/src/SocketClient.cpp
index ae0e077..3625d93 100644
--- a/libsysutils/src/SocketClient.cpp
+++ b/libsysutils/src/SocketClient.cpp
@@ -71,7 +71,7 @@
             ret = asprintf(&buf, "%d %s", code, msg);
         }
     }
-    /* Send the zero-terminated message */
+    // Send the zero-terminated message
     if (ret != -1) {
         ret = sendMsg(buf);
         free(buf);
@@ -79,22 +79,25 @@
     return ret;
 }
 
-/** send 3-digit code, null, binary-length, binary data */
+// send 3-digit code, null, binary-length, binary data
 int SocketClient::sendBinaryMsg(int code, const void *data, int len) {
 
-    /* 4 bytes for the code & null + 4 bytes for the len */
+    // 4 bytes for the code & null + 4 bytes for the len
     char buf[8];
-    /* Write the code */
+    // Write the code
     snprintf(buf, 4, "%.3d", code);
-    /* Write the len */
+    // Write the len
     uint32_t tmp = htonl(len);
     memcpy(buf + 4, &tmp, sizeof(uint32_t));
 
+    struct iovec vec[2];
+    vec[0].iov_base = (void *) buf;
+    vec[0].iov_len = sizeof(buf);
+    vec[1].iov_base = (void *) data;
+    vec[1].iov_len = len;
+
     pthread_mutex_lock(&mWriteMutex);
-    int result = sendDataLocked(buf, sizeof(buf));
-    if (result == 0 && len > 0) {
-        result = sendDataLocked(data, len);
-    }
+    int result = sendDataLockedv(vec, (len > 0) ? 2 : 1);
     pthread_mutex_unlock(&mWriteMutex);
 
     return result;
@@ -147,33 +150,51 @@
 }
 
 int SocketClient::sendData(const void *data, int len) {
+    struct iovec vec[1];
+    vec[0].iov_base = (void *) data;
+    vec[0].iov_len = len;
 
     pthread_mutex_lock(&mWriteMutex);
-    int rc = sendDataLocked(data, len);
+    int rc = sendDataLockedv(vec, 1);
     pthread_mutex_unlock(&mWriteMutex);
 
     return rc;
 }
 
-int SocketClient::sendDataLocked(const void *data, int len) {
-    int rc = 0;
-    const char *p = (const char*) data;
-    int brtw = len;
+int SocketClient::sendDatav(struct iovec *iov, int iovcnt) {
+    pthread_mutex_lock(&mWriteMutex);
+    int rc = sendDataLockedv(iov, iovcnt);
+    pthread_mutex_unlock(&mWriteMutex);
+
+    return rc;
+}
+
+int SocketClient::sendDataLockedv(struct iovec *iov, int iovcnt) {
 
     if (mSocket < 0) {
         errno = EHOSTUNREACH;
         return -1;
     }
 
-    if (len == 0) {
+    if (iovcnt <= 0) {
         return 0;
     }
 
-    while (brtw > 0) {
-        rc = send(mSocket, p, brtw, MSG_NOSIGNAL);
+    int current = 0;
+
+    for (;;) {
+        ssize_t rc = writev(mSocket, iov + current, iovcnt - current);
         if (rc > 0) {
-            p += rc;
-            brtw -= rc;
+            size_t written = rc;
+            while ((current < iovcnt) && (written >= iov[current].iov_len)) {
+                written -= iov[current].iov_len;
+                current++;
+            }
+            if (current == iovcnt) {
+                break;
+            }
+            iov[current].iov_base = (char *)iov[current].iov_base + written;
+            iov[current].iov_len -= written;
             continue;
         }
 
diff --git a/libsysutils/src/SocketListener.cpp b/libsysutils/src/SocketListener.cpp
index 0296910..1b53867 100644
--- a/libsysutils/src/SocketListener.cpp
+++ b/libsysutils/src/SocketListener.cpp
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2008 The Android Open Source Project
+ * Copyright (C) 2008-2014 The Android Open Source Project
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -164,6 +164,7 @@
 
         pthread_mutex_lock(&mClientsLock);
         for (it = mClients->begin(); it != mClients->end(); ++it) {
+            // NB: calling out to an other object with mClientsLock held (safe)
             int fd = (*it)->getSocket();
             FD_SET(fd, &read_fds);
             if (fd > max)
@@ -206,9 +207,12 @@
         pendingList->clear();
         pthread_mutex_lock(&mClientsLock);
         for (it = mClients->begin(); it != mClients->end(); ++it) {
-            int fd = (*it)->getSocket();
+            SocketClient* c = *it;
+            // NB: calling out to an other object with mClientsLock held (safe)
+            int fd = c->getSocket();
             if (FD_ISSET(fd, &read_fds)) {
-                pendingList->push_back(*it);
+                pendingList->push_back(c);
+                c->incRef();
             }
         }
         pthread_mutex_unlock(&mClientsLock);
@@ -236,20 +240,61 @@
                 /* Remove our reference to the client */
                 c->decRef();
             }
+            c->decRef();
         }
     }
     delete pendingList;
 }
 
 void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) {
+    SocketClientCollection safeList;
+
+    /* Add all active clients to the safe list first */
+    safeList.clear();
     pthread_mutex_lock(&mClientsLock);
     SocketClientCollection::iterator i;
 
     for (i = mClients->begin(); i != mClients->end(); ++i) {
-        // broadcasts are unsolicited and should not include a cmd number
-        if ((*i)->sendMsg(code, msg, addErrno, false)) {
-            SLOGW("Error sending broadcast (%s)", strerror(errno));
-        }
+        SocketClient* c = *i;
+        c->incRef();
+        safeList.push_back(c);
     }
     pthread_mutex_unlock(&mClientsLock);
+
+    while (!safeList.empty()) {
+        /* Pop the first item from the list */
+        i = safeList.begin();
+        SocketClient* c = *i;
+        safeList.erase(i);
+        // broadcasts are unsolicited and should not include a cmd number
+        if (c->sendMsg(code, msg, addErrno, false)) {
+            SLOGW("Error sending broadcast (%s)", strerror(errno));
+        }
+        c->decRef();
+    }
+}
+
+void SocketListener::runOnEachSocket(SocketClientCommand *command) {
+    SocketClientCollection safeList;
+
+    /* Add all active clients to the safe list first */
+    safeList.clear();
+    pthread_mutex_lock(&mClientsLock);
+    SocketClientCollection::iterator i;
+
+    for (i = mClients->begin(); i != mClients->end(); ++i) {
+        SocketClient* c = *i;
+        c->incRef();
+        safeList.push_back(c);
+    }
+    pthread_mutex_unlock(&mClientsLock);
+
+    while (!safeList.empty()) {
+        /* Pop the first item from the list */
+        i = safeList.begin();
+        SocketClient* c = *i;
+        safeList.erase(i);
+        command->runSocketCommand(c);
+        c->decRef();
+    }
 }