Core to client fifo on sockets.

Change-Id: I3b84a7d4c3c5fa0d764ad4db22dfd142d5cfa95b
diff --git a/libs/rs/rsThreadIO.cpp b/libs/rs/rsThreadIO.cpp
index 6e959a7..4429556 100644
--- a/libs/rs/rsThreadIO.cpp
+++ b/libs/rs/rsThreadIO.cpp
@@ -23,16 +23,96 @@
 
 ThreadIO::ThreadIO() {
     mToCore.init(16 * 1024);
-    mToClient.init(1024);
 }
 
 ThreadIO::~ThreadIO() {
 }
 
-void ThreadIO::shutdown() {
-    mToCore.shutdown();
+void ThreadIO::init(bool useSocket) {
+    mUsingSocket = useSocket;
+
+    if (mUsingSocket) {
+        mToClientSocket.init();
+        mToCoreSocket.init();
+    } else {
+        mToClient.init(1024);
+    }
 }
 
+void ThreadIO::shutdown() {
+    //LOGE("shutdown 1");
+    mToCore.shutdown();
+    //LOGE("shutdown 2");
+}
+
+void ThreadIO::coreFlush() {
+    //LOGE("coreFlush 1");
+    if (mUsingSocket) {
+    } else {
+        mToCore.flush();
+    }
+    //LOGE("coreFlush 2");
+}
+
+void * ThreadIO::coreHeader(uint32_t cmdID, size_t dataLen) {
+    //LOGE("coreHeader %i %i", cmdID, dataLen);
+    if (mUsingSocket) {
+        CoreCmdHeader hdr;
+        hdr.bytes = dataLen;
+        hdr.cmdID = cmdID;
+        mToCoreSocket.writeAsync(&hdr, sizeof(hdr));
+    } else {
+        mCoreCommandSize = dataLen;
+        mCoreCommandID = cmdID;
+        mCoreDataPtr = (uint8_t *)mToCore.reserve(dataLen);
+        mCoreDataBasePtr = mCoreDataPtr;
+    }
+    //LOGE("coreHeader ret %p", mCoreDataPtr);
+    return mCoreDataPtr;
+}
+
+void ThreadIO::coreData(const void *data, size_t dataLen) {
+    //LOGE("coreData %p %i", data, dataLen);
+    mToCoreSocket.writeAsync(data, dataLen);
+    //LOGE("coreData ret %p", mCoreDataPtr);
+}
+
+void ThreadIO::coreCommit() {
+    //LOGE("coreCommit %p %p %i", mCoreDataPtr, mCoreDataBasePtr, mCoreCommandSize);
+    if (mUsingSocket) {
+    } else {
+        rsAssert((size_t)(mCoreDataPtr - mCoreDataBasePtr) <= mCoreCommandSize);
+        mToCore.commit(mCoreCommandID, mCoreCommandSize);
+    }
+    //LOGE("coreCommit ret");
+}
+
+void ThreadIO::coreCommitSync() {
+    //LOGE("coreCommitSync %p %p %i", mCoreDataPtr, mCoreDataBasePtr, mCoreCommandSize);
+    if (mUsingSocket) {
+    } else {
+        rsAssert((size_t)(mCoreDataPtr - mCoreDataBasePtr) <= mCoreCommandSize);
+        mToCore.commitSync(mCoreCommandID, mCoreCommandSize);
+    }
+    //LOGE("coreCommitSync ret");
+}
+
+void ThreadIO::clientShutdown() {
+    //LOGE("coreShutdown 1");
+    mToClient.shutdown();
+    //LOGE("coreShutdown 2");
+}
+
+void ThreadIO::coreSetReturn(const void *data, size_t dataLen) {
+    rsAssert(dataLen <= sizeof(mToCoreRet));
+    memcpy(&mToCoreRet, data, dataLen);
+}
+
+void ThreadIO::coreGetReturn(void *data, size_t dataLen) {
+    memcpy(data, &mToCoreRet, dataLen);
+}
+
+
 bool ThreadIO::playCoreCommands(Context *con, bool waitForCommand) {
     bool ret = false;
     while (!mToCore.isEmpty() || waitForCommand) {
@@ -64,4 +144,82 @@
     return ret;
 }
 
+RsMessageToClientType ThreadIO::getClientHeader(size_t *receiveLen, uint32_t *usrID) {
+    if (mUsingSocket) {
+        mToClientSocket.read(&mLastClientHeader, sizeof(mLastClientHeader));
+    } else {
+        size_t bytesData = 0;
+        const uint32_t *d = (const uint32_t *)mToClient.get(&mLastClientHeader.cmdID, &bytesData);
+        if (bytesData >= sizeof(uint32_t)) {
+            mLastClientHeader.userID = d[0];
+            mLastClientHeader.bytes = bytesData - sizeof(uint32_t);
+        } else {
+            mLastClientHeader.userID = 0;
+            mLastClientHeader.bytes = 0;
+        }
+    }
+    receiveLen[0] = mLastClientHeader.bytes;
+    usrID[0] = mLastClientHeader.userID;
+    return (RsMessageToClientType)mLastClientHeader.cmdID;
+}
+
+RsMessageToClientType ThreadIO::getClientPayload(void *data, size_t *receiveLen,
+                                uint32_t *usrID, size_t bufferLen) {
+    receiveLen[0] = mLastClientHeader.bytes;
+    usrID[0] = mLastClientHeader.userID;
+    if (bufferLen < mLastClientHeader.bytes) {
+        return RS_MESSAGE_TO_CLIENT_RESIZE;
+    }
+    if (mUsingSocket) {
+        if (receiveLen[0]) {
+            mToClientSocket.read(data, receiveLen[0]);
+        }
+        return (RsMessageToClientType)mLastClientHeader.cmdID;
+    } else {
+        uint32_t bytesData = 0;
+        uint32_t commandID = 0;
+        const uint32_t *d = (const uint32_t *)mToClient.get(&commandID, &bytesData);
+        //LOGE("getMessageToClient 3    %i  %i", commandID, bytesData);
+        //LOGE("getMessageToClient  %i %i", commandID, *subID);
+        if (bufferLen >= receiveLen[0]) {
+            memcpy(data, d+1, receiveLen[0]);
+            mToClient.next();
+            return (RsMessageToClientType)commandID;
+        }
+    }
+    return RS_MESSAGE_TO_CLIENT_RESIZE;
+}
+
+bool ThreadIO::sendToClient(RsMessageToClientType cmdID, uint32_t usrID, const void *data,
+                            size_t dataLen, bool waitForSpace) {
+    ClientCmdHeader hdr;
+    hdr.bytes = dataLen;
+    hdr.cmdID = cmdID;
+    hdr.userID = usrID;
+    if (mUsingSocket) {
+        mToClientSocket.writeAsync(&hdr, sizeof(hdr));
+        if (dataLen) {
+            mToClientSocket.writeAsync(data, dataLen);
+        }
+        return true;
+    } else {
+        if (!waitForSpace) {
+            if (!mToClient.makeSpaceNonBlocking(dataLen + sizeof(hdr))) {
+                // Not enough room, and not waiting.
+                return false;
+            }
+        }
+
+        //LOGE("sendMessageToClient 2");
+        uint32_t *p = (uint32_t *)mToClient.reserve(dataLen + sizeof(usrID));
+        p[0] = usrID;
+        if (dataLen > 0) {
+            memcpy(p+1, data, dataLen);
+        }
+        mToClient.commit(cmdID, dataLen + sizeof(usrID));
+        //LOGE("sendMessageToClient 3");
+        return true;
+    }
+    return false;
+}