Merge "Added MediaSourceSplitter to split single source to multiple ones."
diff --git a/include/media/stagefright/MediaSourceSplitter.h b/include/media/stagefright/MediaSourceSplitter.h
new file mode 100644
index 0000000..671cdf5
--- /dev/null
+++ b/include/media/stagefright/MediaSourceSplitter.h
@@ -0,0 +1,193 @@
+/*
+ * Copyright (C) 2010 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This class provides a way to split a single media source into multiple sources.
+// The constructor takes in the real mediaSource and createClient() can then be
+// used to create multiple sources served from this real mediaSource.
+//
+// Usage:
+// - Create MediaSourceSplitter by passing in a real mediaSource from which
+// multiple duplicate channels are needed.
+// - Create a client using createClient() and use it as any other mediaSource.
+//
+// Note that multiple clients can be created using createClient() and
+// started/stopped in any order. MediaSourceSplitter stops the real source only
+// when all clients have been stopped.
+//
+// If a new client is created/started after some existing clients have already
+// started, the new client will start getting its read frames from the current
+// time.
+
+#ifndef MEDIA_SOURCE_SPLITTER_H_
+
+#define MEDIA_SOURCE_SPLITTER_H_
+
+#include <media/stagefright/MediaSource.h>
+#include <utils/threads.h>
+#include <utils/Vector.h>
+#include <utils/RefBase.h>
+
+namespace android {
+
+class MediaBuffer;
+class MetaData;
+
+class MediaSourceSplitter : public RefBase {
+public:
+    // Constructor
+    // mediaSource: The real mediaSource. The class keeps a reference to it to
+    // implement the various clients.
+    MediaSourceSplitter(sp<MediaSource> mediaSource);
+
+    ~MediaSourceSplitter();
+
+    // Creates a new client of base type MediaSource. Multiple clients can be
+    // created which get their data through the same real mediaSource. These
+    // clients can then be used like any other MediaSource, all of which provide
+    // data from the same real source.
+    sp<MediaSource> createClient();
+
+private:
+    // Total number of clients created through createClient().
+    int32_t mNumberOfClients;
+
+    // reference to the real MediaSource passed to the constructor.
+    sp<MediaSource> mSource;
+
+    // Stores pointer to the MediaBuffer read from the real MediaSource.
+    // All clients use this to implement the read() call.
+    MediaBuffer *mLastReadMediaBuffer;
+
+    // Status code for read from the real MediaSource. All clients return
+    // this for their read().
+    status_t mLastReadStatus;
+
+    // Boolean telling whether the real MediaSource has started.
+    bool mSourceStarted;
+
+    // List of booleans, one for each client, storing whether the corresponding
+    // client's start() has been called.
+    Vector<bool> mClientsStarted;
+
+    // Stores the number of clients which are currently started.
+    int32_t mNumberOfClientsStarted;
+
+    // Since different clients call read() asynchronously, we need to keep track
+    // of what data is currently read into the mLastReadMediaBuffer.
+    // mCurrentReadBit stores the bit for the current read buffer. This bit
+    // flips each time a new buffer is read from the source.
+    // mClientsDesiredReadBit stores the bit for the next desired read buffer
+    // for each client. This bit flips each time read() is completed for this
+    // client.
+    bool mCurrentReadBit;
+    Vector<bool> mClientsDesiredReadBit;
+
+    // Number of clients whose current read has been completed.
+    int32_t mNumberOfCurrentReads;
+
+    // Boolean telling whether the last read has been completed for all clients.
+    // The variable is reset to false each time buffer is read from the real
+    // source.
+    bool mLastReadCompleted;
+
+    // A global mutex for access to critical sections.
+    Mutex mLock;
+
+    // Condition variable for waiting on read from source to complete.
+    Condition mReadFromSourceCondition;
+
+    // Condition variable for waiting on all client's last read to complete.
+    Condition mAllReadsCompleteCondition;
+
+    // Functions used by Client to implement the MediaSource interface.
+
+    // If the real source has not been started yet by any client, starts it.
+    status_t start(int client_id, MetaData *params);
+
+    // Stops the real source after all clients have called stop().
+    status_t stop(int client_id);
+
+    // returns the real source's getFormat().
+    sp<MetaData> getFormat(int client_id);
+
+    // If the client's desired buffer has already been read into
+    // mLastReadMediaBuffer, points the buffer to that. Otherwise if it is the
+    // master client, reads the buffer from source or else waits for the master
+    // client to read the buffer and uses that.
+    status_t read(int client_id,
+            MediaBuffer **buffer, const MediaSource::ReadOptions *options = NULL);
+
+    // Not implemented right now.
+    status_t pause(int client_id);
+
+    // Function which reads a buffer from the real source into
+    // mLastReadMediaBuffer
+    void readFromSource_lock(const MediaSource::ReadOptions *options);
+
+    // Waits until read from the real source has been completed.
+    // _lock means that the function should be called when the thread has already
+    // obtained the lock for the mutex mLock.
+    void waitForReadFromSource_lock(int32_t client_id);
+
+    // Waits until all clients have read the current buffer in
+    // mLastReadCompleted.
+    void waitForAllClientsLastRead_lock(int32_t client_id);
+
+    // Each client calls this after it completes its read(). Once all clients
+    // have called this for the current buffer, the function calls
+    // mAllReadsCompleteCondition.broadcast() to signal the waiting clients.
+    void signalReadComplete_lock(bool readAborted);
+
+    // Make these constructors private.
+    MediaSourceSplitter();
+    MediaSourceSplitter(const MediaSourceSplitter &);
+    MediaSourceSplitter &operator=(const MediaSourceSplitter &);
+
+    // This class implements the MediaSource interface. Each client stores a
+    // reference to the parent MediaSourceSplitter and uses it to complete the
+    // various calls.
+    class Client : public MediaSource {
+    public:
+        // Constructor stores reference to the parent MediaSourceSplitter and it
+        // client id.
+        Client(sp<MediaSourceSplitter> splitter, int32_t client_id);
+
+        // MediaSource interface
+        virtual status_t start(MetaData *params = NULL);
+
+        virtual status_t stop();
+
+        virtual sp<MetaData> getFormat();
+
+        virtual status_t read(
+                MediaBuffer **buffer, const ReadOptions *options = NULL);
+
+        virtual status_t pause();
+
+    private:
+        // Refernce to the parent MediaSourceSplitter
+        sp<MediaSourceSplitter> mSplitter;
+
+        // Id of this client.
+        int32_t mClient_id;
+    };
+
+    friend class Client;
+};
+
+}  // namespace android
+
+#endif  // MEDIA_SOURCE_SPLITTER_H_
diff --git a/media/libstagefright/MediaSourceSplitter.cpp b/media/libstagefright/MediaSourceSplitter.cpp
new file mode 100644
index 0000000..8034bf1
--- /dev/null
+++ b/media/libstagefright/MediaSourceSplitter.cpp
@@ -0,0 +1,229 @@
+/*
+ * Copyright (C) 2010 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//#define LOG_NDEBUG 0
+#define LOG_TAG "MediaSourceSplitter"
+#include <utils/Log.h>
+
+#include <media/stagefright/MediaSourceSplitter.h>
+#include <media/stagefright/MediaDebug.h>
+#include <media/stagefright/MediaBuffer.h>
+#include <media/stagefright/MetaData.h>
+
+namespace android {
+
+MediaSourceSplitter::MediaSourceSplitter(sp<MediaSource> mediaSource) {
+    mNumberOfClients = 0;
+    mSource = mediaSource;
+    mSourceStarted = false;
+
+    mNumberOfClientsStarted = 0;
+    mNumberOfCurrentReads = 0;
+    mCurrentReadBit = 0;
+    mLastReadCompleted = true;
+}
+
+MediaSourceSplitter::~MediaSourceSplitter() {
+}
+
+sp<MediaSource> MediaSourceSplitter::createClient() {
+    Mutex::Autolock autoLock(mLock);
+
+    sp<MediaSource> client = new Client(this, mNumberOfClients++);
+    mClientsStarted.push(false);
+    mClientsDesiredReadBit.push(0);
+    return client;
+}
+
+status_t MediaSourceSplitter::start(int client_id, MetaData *params) {
+    Mutex::Autolock autoLock(mLock);
+
+    LOGV("start client (%d)", client_id);
+    if (mClientsStarted[client_id]) {
+        return OK;
+    }
+
+    mNumberOfClientsStarted++;
+
+    if (!mSourceStarted) {
+        LOGV("Starting real source from client (%d)", client_id);
+        status_t err = mSource->start(params);
+
+        if (err == OK) {
+            mSourceStarted = true;
+            mClientsStarted.editItemAt(client_id) = true;
+            mClientsDesiredReadBit.editItemAt(client_id) = !mCurrentReadBit;
+        }
+
+        return err;
+    } else {
+        mClientsStarted.editItemAt(client_id) = true;
+        if (mLastReadCompleted) {
+            // Last read was completed. So join in the threads for the next read.
+            mClientsDesiredReadBit.editItemAt(client_id) = !mCurrentReadBit;
+        } else {
+            // Last read is ongoing. So join in the threads for the current read.
+            mClientsDesiredReadBit.editItemAt(client_id) = mCurrentReadBit;
+        }
+        return OK;
+    }
+}
+
+status_t MediaSourceSplitter::stop(int client_id) {
+    Mutex::Autolock autoLock(mLock);
+
+    LOGV("stop client (%d)", client_id);
+    CHECK(client_id >= 0 && client_id < mNumberOfClients);
+    CHECK(mClientsStarted[client_id]);
+
+    if (--mNumberOfClientsStarted == 0) {
+        LOGV("Stopping real source from client (%d)", client_id);
+        status_t err = mSource->stop();
+        mSourceStarted = false;
+        mClientsStarted.editItemAt(client_id) = false;
+        return err;
+    } else {
+        mClientsStarted.editItemAt(client_id) = false;
+        if (!mLastReadCompleted) {
+            // Other threads may be waiting for all the reads to complete.
+            // Signal that the read has been aborted.
+            signalReadComplete_lock(true);
+        }
+        return OK;
+    }
+}
+
+sp<MetaData> MediaSourceSplitter::getFormat(int client_id) {
+    Mutex::Autolock autoLock(mLock);
+
+    LOGV("getFormat client (%d)", client_id);
+    return mSource->getFormat();
+}
+
+status_t MediaSourceSplitter::read(int client_id,
+        MediaBuffer **buffer, const MediaSource::ReadOptions *options) {
+    Mutex::Autolock autoLock(mLock);
+
+    CHECK(client_id >= 0 && client_id < mNumberOfClients);
+
+    LOGV("read client (%d)", client_id);
+    *buffer = NULL;
+
+    if (!mClientsStarted[client_id]) {
+        return OK;
+    }
+
+    if (mCurrentReadBit != mClientsDesiredReadBit[client_id]) {
+        // Desired buffer has not been read from source yet.
+
+        // If the current client is the special client with client_id = 0
+        // then read from source, else wait until the client 0 has finished
+        // reading from source.
+        if (client_id == 0) {
+            // Wait for all client's last read to complete first so as to not
+            // corrupt the buffer at mLastReadMediaBuffer.
+            waitForAllClientsLastRead_lock(client_id);
+
+            readFromSource_lock(options);
+            *buffer = mLastReadMediaBuffer;
+        } else {
+            waitForReadFromSource_lock(client_id);
+
+            *buffer = mLastReadMediaBuffer;
+            (*buffer)->add_ref();
+        }
+        CHECK(mCurrentReadBit == mClientsDesiredReadBit[client_id]);
+    } else {
+        // Desired buffer has already been read from source. Use the cached data.
+        CHECK(client_id != 0);
+
+        *buffer = mLastReadMediaBuffer;
+        (*buffer)->add_ref();
+    }
+
+    mClientsDesiredReadBit.editItemAt(client_id) = !mClientsDesiredReadBit[client_id];
+    signalReadComplete_lock(false);
+
+    return mLastReadStatus;
+}
+
+void MediaSourceSplitter::readFromSource_lock(const MediaSource::ReadOptions *options) {
+    mLastReadStatus = mSource->read(&mLastReadMediaBuffer , options);
+
+    mCurrentReadBit = !mCurrentReadBit;
+    mLastReadCompleted = false;
+    mReadFromSourceCondition.broadcast();
+}
+
+void MediaSourceSplitter::waitForReadFromSource_lock(int32_t client_id) {
+    mReadFromSourceCondition.wait(mLock);
+}
+
+void MediaSourceSplitter::waitForAllClientsLastRead_lock(int32_t client_id) {
+    if (mLastReadCompleted) {
+        return;
+    }
+    mAllReadsCompleteCondition.wait(mLock);
+    CHECK(mLastReadCompleted);
+}
+
+void MediaSourceSplitter::signalReadComplete_lock(bool readAborted) {
+    if (!readAborted) {
+        mNumberOfCurrentReads++;
+    }
+
+    if (mNumberOfCurrentReads == mNumberOfClientsStarted) {
+        mLastReadCompleted = true;
+        mNumberOfCurrentReads = 0;
+        mAllReadsCompleteCondition.broadcast();
+    }
+}
+
+status_t MediaSourceSplitter::pause(int client_id) {
+    return ERROR_UNSUPPORTED;
+}
+
+// Client
+
+MediaSourceSplitter::Client::Client(
+        sp<MediaSourceSplitter> splitter,
+        int32_t client_id) {
+    mSplitter = splitter;
+    mClient_id = client_id;
+}
+
+status_t MediaSourceSplitter::Client::start(MetaData *params) {
+    return mSplitter->start(mClient_id, params);
+}
+
+status_t MediaSourceSplitter::Client::stop() {
+    return mSplitter->stop(mClient_id);
+}
+
+sp<MetaData> MediaSourceSplitter::Client::getFormat() {
+    return mSplitter->getFormat(mClient_id);
+}
+
+status_t MediaSourceSplitter::Client::read(
+        MediaBuffer **buffer, const ReadOptions *options) {
+    return mSplitter->read(mClient_id, buffer, options);
+}
+
+status_t MediaSourceSplitter::Client::pause() {
+    return mSplitter->pause(mClient_id);
+}
+
+}  // namespace android