Add a callback thread to ConsumerBase

- Add a message queue and callback thread in ConsumerBase.
- This is used to prevent deadlocks when ConsumerBase calls into
  BufferQueueConsumer and that generates a callback.

Bug 27229287

Change-Id: I45c41e5a554555511fcfa5c185a7d60b0d969b7e
diff --git a/include/gui/ConsumerBase.h b/include/gui/ConsumerBase.h
index 9307a26..1b63552 100644
--- a/include/gui/ConsumerBase.h
+++ b/include/gui/ConsumerBase.h
@@ -26,6 +26,8 @@
 #include <utils/threads.h>
 #include <gui/IConsumerListener.h>
 
+#include <queue>
+
 namespace android {
 // ----------------------------------------------------------------------------
 
@@ -108,18 +110,18 @@
     // from the derived class.
     virtual void onLastStrongRef(const void* id);
 
-    // Implementation of the IConsumerListener interface.  These
-    // calls are used to notify the ConsumerBase of asynchronous events in the
-    // BufferQueue.  The onFrameAvailable, onFrameReplaced, and
-    // onBuffersReleased methods should not need to be overridden by derived
-    // classes, but if they are overridden the ConsumerBase implementation must
-    // be called from the derived class. The ConsumerBase version of
-    // onSidebandStreamChanged does nothing and can be overriden by derived
-    // classes if they want the notification.
-    virtual void onFrameAvailable(const BufferItem& item) override;
-    virtual void onFrameReplaced(const BufferItem& item) override;
-    virtual void onBuffersReleased() override;
-    virtual void onSidebandStreamChanged() override;
+    // Handlers for the IConsumerListener interface, these will be called from
+    // the message queue thread. These calls are used to notify the ConsumerBase
+    // of asynchronous events in the BufferQueue.  The onFrameAvailableHandler,
+    // onFrameReplacedHandler, and onBuffersReleasedHandler methods should not
+    // need to be overridden by derived classes, but if they are overridden the
+    // ConsumerBase implementation must be called from the derived class. The
+    // ConsumerBase version of onSidebandStreamChangedHandler does nothing and
+    // can be overriden by derived classes if they want the notification.
+    virtual void onFrameAvailableHandler(const BufferItem& item);
+    virtual void onFrameReplacedHandler(const BufferItem& item);
+    virtual void onBuffersReleasedHandler();
+    virtual void onSidebandStreamChangedHandler();
 
     // freeBufferLocked frees up the given buffer slot.  If the slot has been
     // initialized this will release the reference to the GraphicBuffer in that
@@ -244,6 +246,35 @@
     //
     // This mutex is intended to be locked by derived classes.
     mutable Mutex mMutex;
+
+    // Implements the ConsumerListener interface
+    virtual void onFrameAvailable(const BufferItem& item) override;
+    virtual void onFrameReplaced(const BufferItem& item) override;
+    virtual void onBuffersReleased() override;
+    virtual void onSidebandStreamChanged() override;
+
+    enum MessageType {
+        ON_FRAME_AVAILABLE,
+        ON_FRAME_REPLACED,
+        ON_BUFFERS_RELEASED,
+        ON_SIDEBAND_STREAM_CHANGED,
+        EXIT,
+    };
+
+    mutable Mutex mMessageQueueLock;
+    Condition mMessageAvailable;
+    std::queue<std::pair<MessageType, BufferItem>> mMessageQueue;
+
+    class MessageThread : public Thread {
+    public:
+        MessageThread(ConsumerBase* consumerBase) :
+            mConsumerBase(consumerBase) {};
+    protected:
+        virtual bool threadLoop() override;
+        ConsumerBase* mConsumerBase;
+    };
+
+    sp<MessageThread> mMessageThread;
 };
 
 // ----------------------------------------------------------------------------
diff --git a/libs/gui/ConsumerBase.cpp b/libs/gui/ConsumerBase.cpp
index d01187f..a22b81b 100644
--- a/libs/gui/ConsumerBase.cpp
+++ b/libs/gui/ConsumerBase.cpp
@@ -74,12 +74,26 @@
     } else {
         mConsumer->setConsumerName(mName);
     }
+
+    mMessageThread = new MessageThread(this);
+    mMessageThread->run();
 }
 
 ConsumerBase::~ConsumerBase() {
     CB_LOGV("~ConsumerBase");
-    Mutex::Autolock lock(mMutex);
 
+    mMessageThread->requestExit();
+    {
+        Mutex::Autolock lock(mMessageQueueLock);
+        mMessageQueue.emplace(std::piecewise_construct,
+                std::forward_as_tuple(EXIT),
+                std::forward_as_tuple());
+        mMessageAvailable.signal();
+    }
+
+    mMessageThread->join();
+
+    Mutex::Autolock lock(mMutex);
     // Verify that abandon() has been called before we get here.  This should
     // be done by ConsumerBase::onLastStrongRef(), but it's possible for a
     // derived class to override that method and not call
@@ -100,6 +114,13 @@
 }
 
 void ConsumerBase::onFrameAvailable(const BufferItem& item) {
+    Mutex::Autolock lock(mMessageQueueLock);
+    mMessageQueue.emplace(std::piecewise_construct,
+            std::forward_as_tuple(ON_FRAME_AVAILABLE),
+            std::forward_as_tuple(item));
+    mMessageAvailable.signal();
+}
+void ConsumerBase::onFrameAvailableHandler(const BufferItem& item) {
     CB_LOGV("onFrameAvailable");
 
     sp<FrameAvailableListener> listener;
@@ -115,6 +136,14 @@
 }
 
 void ConsumerBase::onFrameReplaced(const BufferItem &item) {
+    Mutex::Autolock lock(mMessageQueueLock);
+    mMessageQueue.emplace(std::piecewise_construct,
+            std::forward_as_tuple(ON_FRAME_REPLACED),
+            std::forward_as_tuple(item));
+    mMessageAvailable.signal();
+}
+
+void ConsumerBase::onFrameReplacedHandler(const BufferItem &item) {
     CB_LOGV("onFrameReplaced");
 
     sp<FrameAvailableListener> listener;
@@ -130,6 +159,14 @@
 }
 
 void ConsumerBase::onBuffersReleased() {
+    Mutex::Autolock lock(mMessageQueueLock);
+    mMessageQueue.emplace(std::piecewise_construct,
+            std::forward_as_tuple(ON_BUFFERS_RELEASED),
+            std::forward_as_tuple());
+    mMessageAvailable.signal();
+}
+
+void ConsumerBase::onBuffersReleasedHandler() {
     Mutex::Autolock lock(mMutex);
 
     CB_LOGV("onBuffersReleased");
@@ -149,6 +186,45 @@
 }
 
 void ConsumerBase::onSidebandStreamChanged() {
+    Mutex::Autolock lock(mMessageQueueLock);
+    mMessageQueue.emplace(std::piecewise_construct,
+            std::forward_as_tuple(ON_SIDEBAND_STREAM_CHANGED),
+            std::forward_as_tuple());
+    mMessageAvailable.signal();
+}
+
+void ConsumerBase::onSidebandStreamChangedHandler() {
+}
+
+bool ConsumerBase::MessageThread::threadLoop() {
+    Mutex::Autolock lock(mConsumerBase->mMessageQueueLock);
+
+    if (mConsumerBase->mMessageQueue.empty()) {
+        mConsumerBase->mMessageAvailable.wait(mConsumerBase->mMessageQueueLock);
+    }
+
+    while (!mConsumerBase->mMessageQueue.empty()) {
+        auto nextMessage = mConsumerBase->mMessageQueue.front();
+
+        switch (nextMessage.first) {
+            case ON_FRAME_AVAILABLE:
+                mConsumerBase->onFrameAvailableHandler(nextMessage.second);
+                break;
+            case ON_FRAME_REPLACED:
+                mConsumerBase->onFrameReplacedHandler(nextMessage.second);
+                break;
+            case ON_BUFFERS_RELEASED:
+                mConsumerBase->onBuffersReleasedHandler();
+                break;
+            case ON_SIDEBAND_STREAM_CHANGED:
+                mConsumerBase->onSidebandStreamChangedHandler();
+                break;
+            case EXIT:
+                break;
+        }
+        mConsumerBase->mMessageQueue.pop();
+    }
+    return true;
 }
 
 void ConsumerBase::abandon() {
diff --git a/libs/gui/tests/SurfaceTextureGLToGL_test.cpp b/libs/gui/tests/SurfaceTextureGLToGL_test.cpp
index c28b4d1..b8a7a90 100644
--- a/libs/gui/tests/SurfaceTextureGLToGL_test.cpp
+++ b/libs/gui/tests/SurfaceTextureGLToGL_test.cpp
@@ -192,6 +192,10 @@
     ASSERT_EQ(EGL_SUCCESS, eglGetError());
     mProducerEglSurface = EGL_NO_SURFACE;
 
+    // sleep for 10ms to allow any asynchronous operations to complete before
+    // checking the reference counts
+    usleep(10000);
+
     // This test should have the only reference to buffer 0.
     EXPECT_EQ(1, buffers[0]->getStrongCount());