Add a callback thread to ConsumerBase
- Add a message queue and callback thread in ConsumerBase.
- This is used to prevent deadlocks when ConsumerBase calls into
BufferQueueConsumer and that generates a callback.
Bug 27229287
Change-Id: I45c41e5a554555511fcfa5c185a7d60b0d969b7e
diff --git a/include/gui/ConsumerBase.h b/include/gui/ConsumerBase.h
index 9307a26..1b63552 100644
--- a/include/gui/ConsumerBase.h
+++ b/include/gui/ConsumerBase.h
@@ -26,6 +26,8 @@
#include <utils/threads.h>
#include <gui/IConsumerListener.h>
+#include <queue>
+
namespace android {
// ----------------------------------------------------------------------------
@@ -108,18 +110,18 @@
// from the derived class.
virtual void onLastStrongRef(const void* id);
- // Implementation of the IConsumerListener interface. These
- // calls are used to notify the ConsumerBase of asynchronous events in the
- // BufferQueue. The onFrameAvailable, onFrameReplaced, and
- // onBuffersReleased methods should not need to be overridden by derived
- // classes, but if they are overridden the ConsumerBase implementation must
- // be called from the derived class. The ConsumerBase version of
- // onSidebandStreamChanged does nothing and can be overriden by derived
- // classes if they want the notification.
- virtual void onFrameAvailable(const BufferItem& item) override;
- virtual void onFrameReplaced(const BufferItem& item) override;
- virtual void onBuffersReleased() override;
- virtual void onSidebandStreamChanged() override;
+ // Handlers for the IConsumerListener interface, these will be called from
+ // the message queue thread. These calls are used to notify the ConsumerBase
+ // of asynchronous events in the BufferQueue. The onFrameAvailableHandler,
+ // onFrameReplacedHandler, and onBuffersReleasedHandler methods should not
+ // need to be overridden by derived classes, but if they are overridden the
+ // ConsumerBase implementation must be called from the derived class. The
+ // ConsumerBase version of onSidebandStreamChangedHandler does nothing and
+ // can be overriden by derived classes if they want the notification.
+ virtual void onFrameAvailableHandler(const BufferItem& item);
+ virtual void onFrameReplacedHandler(const BufferItem& item);
+ virtual void onBuffersReleasedHandler();
+ virtual void onSidebandStreamChangedHandler();
// freeBufferLocked frees up the given buffer slot. If the slot has been
// initialized this will release the reference to the GraphicBuffer in that
@@ -244,6 +246,35 @@
//
// This mutex is intended to be locked by derived classes.
mutable Mutex mMutex;
+
+ // Implements the ConsumerListener interface
+ virtual void onFrameAvailable(const BufferItem& item) override;
+ virtual void onFrameReplaced(const BufferItem& item) override;
+ virtual void onBuffersReleased() override;
+ virtual void onSidebandStreamChanged() override;
+
+ enum MessageType {
+ ON_FRAME_AVAILABLE,
+ ON_FRAME_REPLACED,
+ ON_BUFFERS_RELEASED,
+ ON_SIDEBAND_STREAM_CHANGED,
+ EXIT,
+ };
+
+ mutable Mutex mMessageQueueLock;
+ Condition mMessageAvailable;
+ std::queue<std::pair<MessageType, BufferItem>> mMessageQueue;
+
+ class MessageThread : public Thread {
+ public:
+ MessageThread(ConsumerBase* consumerBase) :
+ mConsumerBase(consumerBase) {};
+ protected:
+ virtual bool threadLoop() override;
+ ConsumerBase* mConsumerBase;
+ };
+
+ sp<MessageThread> mMessageThread;
};
// ----------------------------------------------------------------------------
diff --git a/libs/gui/ConsumerBase.cpp b/libs/gui/ConsumerBase.cpp
index d01187f..a22b81b 100644
--- a/libs/gui/ConsumerBase.cpp
+++ b/libs/gui/ConsumerBase.cpp
@@ -74,12 +74,26 @@
} else {
mConsumer->setConsumerName(mName);
}
+
+ mMessageThread = new MessageThread(this);
+ mMessageThread->run();
}
ConsumerBase::~ConsumerBase() {
CB_LOGV("~ConsumerBase");
- Mutex::Autolock lock(mMutex);
+ mMessageThread->requestExit();
+ {
+ Mutex::Autolock lock(mMessageQueueLock);
+ mMessageQueue.emplace(std::piecewise_construct,
+ std::forward_as_tuple(EXIT),
+ std::forward_as_tuple());
+ mMessageAvailable.signal();
+ }
+
+ mMessageThread->join();
+
+ Mutex::Autolock lock(mMutex);
// Verify that abandon() has been called before we get here. This should
// be done by ConsumerBase::onLastStrongRef(), but it's possible for a
// derived class to override that method and not call
@@ -100,6 +114,13 @@
}
void ConsumerBase::onFrameAvailable(const BufferItem& item) {
+ Mutex::Autolock lock(mMessageQueueLock);
+ mMessageQueue.emplace(std::piecewise_construct,
+ std::forward_as_tuple(ON_FRAME_AVAILABLE),
+ std::forward_as_tuple(item));
+ mMessageAvailable.signal();
+}
+void ConsumerBase::onFrameAvailableHandler(const BufferItem& item) {
CB_LOGV("onFrameAvailable");
sp<FrameAvailableListener> listener;
@@ -115,6 +136,14 @@
}
void ConsumerBase::onFrameReplaced(const BufferItem &item) {
+ Mutex::Autolock lock(mMessageQueueLock);
+ mMessageQueue.emplace(std::piecewise_construct,
+ std::forward_as_tuple(ON_FRAME_REPLACED),
+ std::forward_as_tuple(item));
+ mMessageAvailable.signal();
+}
+
+void ConsumerBase::onFrameReplacedHandler(const BufferItem &item) {
CB_LOGV("onFrameReplaced");
sp<FrameAvailableListener> listener;
@@ -130,6 +159,14 @@
}
void ConsumerBase::onBuffersReleased() {
+ Mutex::Autolock lock(mMessageQueueLock);
+ mMessageQueue.emplace(std::piecewise_construct,
+ std::forward_as_tuple(ON_BUFFERS_RELEASED),
+ std::forward_as_tuple());
+ mMessageAvailable.signal();
+}
+
+void ConsumerBase::onBuffersReleasedHandler() {
Mutex::Autolock lock(mMutex);
CB_LOGV("onBuffersReleased");
@@ -149,6 +186,45 @@
}
void ConsumerBase::onSidebandStreamChanged() {
+ Mutex::Autolock lock(mMessageQueueLock);
+ mMessageQueue.emplace(std::piecewise_construct,
+ std::forward_as_tuple(ON_SIDEBAND_STREAM_CHANGED),
+ std::forward_as_tuple());
+ mMessageAvailable.signal();
+}
+
+void ConsumerBase::onSidebandStreamChangedHandler() {
+}
+
+bool ConsumerBase::MessageThread::threadLoop() {
+ Mutex::Autolock lock(mConsumerBase->mMessageQueueLock);
+
+ if (mConsumerBase->mMessageQueue.empty()) {
+ mConsumerBase->mMessageAvailable.wait(mConsumerBase->mMessageQueueLock);
+ }
+
+ while (!mConsumerBase->mMessageQueue.empty()) {
+ auto nextMessage = mConsumerBase->mMessageQueue.front();
+
+ switch (nextMessage.first) {
+ case ON_FRAME_AVAILABLE:
+ mConsumerBase->onFrameAvailableHandler(nextMessage.second);
+ break;
+ case ON_FRAME_REPLACED:
+ mConsumerBase->onFrameReplacedHandler(nextMessage.second);
+ break;
+ case ON_BUFFERS_RELEASED:
+ mConsumerBase->onBuffersReleasedHandler();
+ break;
+ case ON_SIDEBAND_STREAM_CHANGED:
+ mConsumerBase->onSidebandStreamChangedHandler();
+ break;
+ case EXIT:
+ break;
+ }
+ mConsumerBase->mMessageQueue.pop();
+ }
+ return true;
}
void ConsumerBase::abandon() {
diff --git a/libs/gui/tests/SurfaceTextureGLToGL_test.cpp b/libs/gui/tests/SurfaceTextureGLToGL_test.cpp
index c28b4d1..b8a7a90 100644
--- a/libs/gui/tests/SurfaceTextureGLToGL_test.cpp
+++ b/libs/gui/tests/SurfaceTextureGLToGL_test.cpp
@@ -192,6 +192,10 @@
ASSERT_EQ(EGL_SUCCESS, eglGetError());
mProducerEglSurface = EGL_NO_SURFACE;
+ // sleep for 10ms to allow any asynchronous operations to complete before
+ // checking the reference counts
+ usleep(10000);
+
// This test should have the only reference to buffer 0.
EXPECT_EQ(1, buffers[0]->getStrongCount());