Merge changes from topic 'camera-hidl-hal'

* changes:
  Camera: patching treble camera HAL
  Camera: add camera vts tests
  Camera: Add default camera provider 2.4
  Camera: default implementation of device 3.2
diff --git a/audio/2.0/IStream.hal b/audio/2.0/IStream.hal
index 5c88a69..8de7851 100644
--- a/audio/2.0/IStream.hal
+++ b/audio/2.0/IStream.hal
@@ -279,4 +279,16 @@
      */
     getMmapPosition()
             generates (Result retval, MmapPosition position);
+
+    /*
+     * Called by the framework to deinitialize the stream and free up
+     * all the currently allocated resources. It is recommended to close
+     * the stream on the client side as soon as it is becomes unused.
+     *
+     * @return retval OK in case the success.
+     *                NOT_SUPPORTED if called on IStream instead of input or
+     *                              output stream interface.
+     *                INVALID_STATE if the stream was already closed.
+     */
+    close() generates (Result retval);
 };
diff --git a/audio/2.0/IStreamIn.hal b/audio/2.0/IStreamIn.hal
index 6cf7425..9a96f71 100644
--- a/audio/2.0/IStreamIn.hal
+++ b/audio/2.0/IStreamIn.hal
@@ -41,16 +41,45 @@
     setGain(float gain) generates (Result retval);
 
     /*
-     * Read audio buffer in from driver. If at least one frame was read prior to
-     * the error, 'read' must return that byte count and then return an error
-     * in the subsequent call.
+     * Data structure passed back to the client via status message queue
+     * of 'read' operation.
      *
-     * @param size maximum amount of bytes to read.
-     * @return retval operation completion status.
-     * @return data audio data.
+     * Possible values of 'retval' field:
+     *  - OK, read operation was successful;
+     *  - INVALID_ARGUMENTS, stream was not configured properly;
+     *  - INVALID_STATE, stream is in a state that doesn't allow reads.
      */
-    // TODO(mnaganov): Replace with FMQ version.
-    read(uint64_t size) generates (Result retval, vec<uint8_t> data);
+    struct ReadStatus {
+        Result retval;
+        uint64_t read;
+    };
+
+    /*
+     * Set up required transports for receiving audio buffers from the driver.
+     *
+     * The transport consists of two message queues: one is used for passing
+     * audio data from the driver to the client, another is used for reporting
+     * read operation status (amount of bytes actually read or error code),
+     * see ReadStatus structure definition.
+     *
+     * @param frameSize the size of a single frame, in bytes.
+     * @param framesCount the number of frames in a buffer.
+     * @param threadPriority priority of the thread that performs reads.
+     * @return retval OK if both message queues were created successfully.
+     *                INVALID_STATE if the method was already called.
+     *                INVALID_ARGUMENTS if there was a problem setting up
+     *                                  the queues.
+     * @return dataMQ a message queue used for passing audio data in the format
+     *                specified at the stream opening.
+     * @return statusMQ a message queue used for passing status from the driver
+     *                  using ReadStatus structures.
+     */
+    prepareForReading(
+            uint32_t frameSize, uint32_t framesCount,
+            ThreadPriority threadPriority)
+    generates (
+            Result retval,
+            fmq_sync<uint8_t> dataMQ, fmq_sync<ReadStatus> statusMQ);
 
     /*
      * Return the amount of input frames lost in the audio driver since the last
diff --git a/audio/2.0/IStreamOut.hal b/audio/2.0/IStreamOut.hal
index 4ba3b2f..155e329 100644
--- a/audio/2.0/IStreamOut.hal
+++ b/audio/2.0/IStreamOut.hal
@@ -44,25 +44,48 @@
     setVolume(float left, float right) generates (Result retval);
 
     /*
-     * Write audio buffer to driver. On success, sets 'retval' to 'OK', and
-     * returns number of bytes written. If at least one frame was written
-     * successfully prior to the error, it is suggested that the driver return
-     * that successful (short) byte count and then return an error in the
-     * subsequent call.
+     * Data structure passed back to the client via status message queue
+     * of 'write' operation.
      *
-     * If 'setCallback' has previously been called to enable non-blocking mode
-     * then 'write' is not allowed to block. It must write only the number of
-     * bytes that currently fit in the driver/hardware buffer and then return
-     * this byte count. If this is less than the requested write size the
-     * callback function must be called when more space is available in the
-     * driver/hardware buffer.
-     *
-     * @param data audio data.
-     * @return retval operation completion status.
-     * @return written number of bytes written.
+     * Possible values of 'retval' field:
+     *  - OK, write operation was successful;
+     *  - INVALID_ARGUMENTS, stream was not configured properly;
+     *  - INVALID_STATE, stream is in a state that doesn't allow writes.
      */
-    // TODO(mnaganov): Replace with FMQ version.
-    write(vec<uint8_t> data) generates (Result retval, uint64_t written);
+    struct WriteStatus {
+        Result retval;
+        uint64_t written;
+        uint64_t frames;    // presentation position
+        TimeSpec timeStamp; // presentation position
+    };
+
+    /*
+     * Set up required transports for passing audio buffers to the driver.
+     *
+     * The transport consists of two message queues: one is used for passing
+     * audio data from the client to the driver, another is used for reporting
+     * write operation status (amount of bytes actually written or error code),
+     * and the presentation position immediately after the write, see
+     * WriteStatus structure definition.
+     *
+     * @param frameSize the size of a single frame, in bytes.
+     * @param framesCount the number of frames in a buffer.
+     * @param threadPriority priority of the thread that performs writes.
+     * @return retval OK if both message queues were created successfully.
+     *                INVALID_STATE if the method was already called.
+     *                INVALID_ARGUMENTS if there was a problem setting up
+     *                                  the queues.
+     * @return dataMQ a message queue used for passing audio data in the format
+     *                specified at the stream opening.
+     * @return statusMQ a message queue used for passing status from the driver
+     *                  using WriteStatus structures.
+     */
+    prepareForWriting(
+            uint32_t frameSize, uint32_t framesCount,
+            ThreadPriority threadPriority)
+    generates (
+            Result retval,
+            fmq_sync<uint8_t> dataMQ, fmq_sync<WriteStatus> statusMQ);
 
     /*
      * Return the number of audio frames written by the audio DSP to DAC since
diff --git a/audio/2.0/default/Android.mk b/audio/2.0/default/Android.mk
index c3cfd69..eeea92c 100644
--- a/audio/2.0/default/Android.mk
+++ b/audio/2.0/default/Android.mk
@@ -30,13 +30,16 @@
     StreamOut.cpp \
 
 LOCAL_SHARED_LIBRARIES := \
+    libbase \
+    libcutils \
+    libfmq \
+    libhardware \
     libhidlbase \
     libhidltransport \
     libhwbinder \
-    libcutils \
-    libutils \
-    libhardware \
     liblog \
+    libmediautils \
+    libutils \
     android.hardware.audio@2.0 \
     android.hardware.audio.common@2.0 \
     android.hardware.audio.common@2.0-util \
diff --git a/audio/2.0/default/Stream.cpp b/audio/2.0/default/Stream.cpp
index f214eed..62b34a3 100644
--- a/audio/2.0/default/Stream.cpp
+++ b/audio/2.0/default/Stream.cpp
@@ -253,6 +253,10 @@
     return Void();
 }
 
+Return<Result> Stream::close()  {
+    return Result::NOT_SUPPORTED;
+}
+
 } // namespace implementation
 }  // namespace V2_0
 }  // namespace audio
diff --git a/audio/2.0/default/Stream.h b/audio/2.0/default/Stream.h
index 819bbf7..0bbd803 100644
--- a/audio/2.0/default/Stream.h
+++ b/audio/2.0/default/Stream.h
@@ -76,6 +76,7 @@
     Return<Result> stop() override;
     Return<void> createMmapBuffer(int32_t minSizeFrames, createMmapBuffer_cb _hidl_cb) override;
     Return<void> getMmapPosition(getMmapPosition_cb _hidl_cb) override;
+    Return<Result> close()  override;
 
     // Utility methods for extending interfaces.
     static Result analyzeStatus(const char* funcName, int status, int ignoreError = OK);
diff --git a/audio/2.0/default/StreamIn.cpp b/audio/2.0/default/StreamIn.cpp
index 1441e74..51c2cc7 100644
--- a/audio/2.0/default/StreamIn.cpp
+++ b/audio/2.0/default/StreamIn.cpp
@@ -15,26 +15,112 @@
  */
 
 #define LOG_TAG "StreamInHAL"
+//#define LOG_NDEBUG 0
 
-#include <hardware/audio.h>
 #include <android/log.h>
+#include <hardware/audio.h>
+#include <mediautils/SchedulingPolicyService.h>
 
 #include "StreamIn.h"
 
+using ::android::hardware::audio::V2_0::MessageQueueFlagBits;
+
 namespace android {
 namespace hardware {
 namespace audio {
 namespace V2_0 {
 namespace implementation {
 
+namespace {
+
+class ReadThread : public Thread {
+  public:
+    // ReadThread's lifespan never exceeds StreamIn's lifespan.
+    ReadThread(std::atomic<bool>* stop,
+            audio_stream_in_t* stream,
+            StreamIn::DataMQ* dataMQ,
+            StreamIn::StatusMQ* statusMQ,
+            EventFlag* efGroup,
+            ThreadPriority threadPriority)
+            : Thread(false /*canCallJava*/),
+              mStop(stop),
+              mStream(stream),
+              mDataMQ(dataMQ),
+              mStatusMQ(statusMQ),
+              mEfGroup(efGroup),
+              mThreadPriority(threadPriority),
+              mBuffer(new uint8_t[dataMQ->getQuantumCount()]) {
+    }
+    virtual ~ReadThread() {}
+
+    status_t readyToRun() override;
+
+  private:
+    std::atomic<bool>* mStop;
+    audio_stream_in_t* mStream;
+    StreamIn::DataMQ* mDataMQ;
+    StreamIn::StatusMQ* mStatusMQ;
+    EventFlag* mEfGroup;
+    ThreadPriority mThreadPriority;
+    std::unique_ptr<uint8_t[]> mBuffer;
+
+    bool threadLoop() override;
+};
+
+status_t ReadThread::readyToRun() {
+    if (mThreadPriority != ThreadPriority::NORMAL) {
+        int err = requestPriority(
+                getpid(), getTid(), static_cast<int>(mThreadPriority), true /*asynchronous*/);
+        ALOGW_IF(err, "failed to set priority %d for pid %d tid %d; error %d",
+                static_cast<int>(mThreadPriority), getpid(), getTid(), err);
+    }
+    return OK;
+}
+
+bool ReadThread::threadLoop() {
+    // This implementation doesn't return control back to the Thread until it decides to stop,
+    // as the Thread uses mutexes, and this can lead to priority inversion.
+    while(!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
+        // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
+        uint32_t efState = 0;
+        mEfGroup->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL), &efState, NS_PER_SEC);
+        if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL))) {
+            continue;  // Nothing to do.
+        }
+
+        const size_t availToWrite = mDataMQ->availableToWrite();
+        ssize_t readResult = mStream->read(mStream, &mBuffer[0], availToWrite);
+        Result retval = Result::OK;
+        uint64_t read = 0;
+        if (readResult >= 0) {
+            read = readResult;
+            if (!mDataMQ->write(&mBuffer[0], readResult)) {
+                ALOGW("data message queue write failed");
+            }
+        } else {
+            retval = Stream::analyzeStatus("read", readResult);
+        }
+        IStreamIn::ReadStatus status = { retval, read };
+        if (!mStatusMQ->write(&status)) {
+            ALOGW("status message queue write failed");
+        }
+        mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
+    }
+
+    return false;
+}
+
+}  // namespace
+
 StreamIn::StreamIn(audio_hw_device_t* device, audio_stream_in_t* stream)
-        : mDevice(device), mStream(stream),
+        : mIsClosed(false), mDevice(device), mStream(stream),
           mStreamCommon(new Stream(&stream->common)),
-          mStreamMmap(new StreamMmap<audio_stream_in_t>(stream)) {
+          mStreamMmap(new StreamMmap<audio_stream_in_t>(stream)),
+          mEfGroup(nullptr), mStopReadThread(false) {
 }
 
 StreamIn::~StreamIn() {
-    mDevice->close_input_stream(mDevice, mStream);
+    close();
     mStream = nullptr;
     mDevice = nullptr;
 }
@@ -149,6 +235,22 @@
     return mStreamMmap->getMmapPosition(_hidl_cb);
 }
 
+Return<Result> StreamIn::close()  {
+    if (mIsClosed) return Result::INVALID_STATE;
+    mIsClosed = true;
+    if (mReadThread.get()) {
+        mStopReadThread.store(true, std::memory_order_release);
+        status_t status = mReadThread->requestExitAndWait();
+        ALOGE_IF(status, "read thread exit error: %s", strerror(-status));
+    }
+    if (mEfGroup) {
+        status_t status = EventFlag::deleteEventFlag(&mEfGroup);
+        ALOGE_IF(status, "read MQ event flag deletion error: %s", strerror(-status));
+    }
+    mDevice->close_input_stream(mDevice, mStream);
+    return Result::OK;
+}
+
 // Methods from ::android::hardware::audio::V2_0::IStreamIn follow.
 Return<void> StreamIn::getAudioSource(getAudioSource_cb _hidl_cb)  {
     int halSource;
@@ -165,19 +267,55 @@
     return Stream::analyzeStatus("set_gain", mStream->set_gain(mStream, gain));
 }
 
-Return<void> StreamIn::read(uint64_t size, read_cb _hidl_cb)  {
-    // TODO(mnaganov): Replace with FMQ version.
-    hidl_vec<uint8_t> data;
-    data.resize(size);
-    Result retval(Result::OK);
-    ssize_t readResult = mStream->read(mStream, &data[0], data.size());
-    if (readResult >= 0 && static_cast<size_t>(readResult) != data.size()) {
-        data.resize(readResult);
-    } else if (readResult < 0) {
-        data.resize(0);
-        retval = Stream::analyzeStatus("read", readResult);
+Return<void> StreamIn::prepareForReading(
+        uint32_t frameSize, uint32_t framesCount, ThreadPriority threadPriority,
+        prepareForReading_cb _hidl_cb)  {
+    status_t status;
+    // Create message queues.
+    if (mDataMQ) {
+        ALOGE("the client attempts to call prepareForReading twice");
+        _hidl_cb(Result::INVALID_STATE,
+                MQDescriptorSync<uint8_t>(), MQDescriptorSync<ReadStatus>());
+        return Void();
     }
-    _hidl_cb(retval, data);
+    std::unique_ptr<DataMQ> tempDataMQ(
+            new DataMQ(frameSize * framesCount, true /* EventFlag */));
+    std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1));
+    if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
+        ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid");
+        ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid");
+        _hidl_cb(Result::INVALID_ARGUMENTS,
+                MQDescriptorSync<uint8_t>(), MQDescriptorSync<ReadStatus>());
+        return Void();
+    }
+    // TODO: Remove event flag management once blocking MQ is implemented. b/33815422
+    status = EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
+    if (status != OK || !mEfGroup) {
+        ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
+        _hidl_cb(Result::INVALID_ARGUMENTS,
+                MQDescriptorSync<uint8_t>(), MQDescriptorSync<ReadStatus>());
+        return Void();
+    }
+
+    // Create and launch the thread.
+    mReadThread = new ReadThread(
+            &mStopReadThread,
+            mStream,
+            tempDataMQ.get(),
+            tempStatusMQ.get(),
+            mEfGroup,
+            threadPriority);
+    status = mReadThread->run("reader", PRIORITY_URGENT_AUDIO);
+    if (status != OK) {
+        ALOGW("failed to start reader thread: %s", strerror(-status));
+        _hidl_cb(Result::INVALID_ARGUMENTS,
+                MQDescriptorSync<uint8_t>(), MQDescriptorSync<ReadStatus>());
+        return Void();
+    }
+
+    mDataMQ = std::move(tempDataMQ);
+    mStatusMQ = std::move(tempStatusMQ);
+    _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc());
     return Void();
 }
 
diff --git a/audio/2.0/default/StreamIn.h b/audio/2.0/default/StreamIn.h
index 65e94bb..fc813d9 100644
--- a/audio/2.0/default/StreamIn.h
+++ b/audio/2.0/default/StreamIn.h
@@ -17,10 +17,15 @@
 #ifndef ANDROID_HARDWARE_AUDIO_V2_0_STREAMIN_H
 #define ANDROID_HARDWARE_AUDIO_V2_0_STREAMIN_H
 
-#include <android/hardware/audio/2.0/IStreamIn.h>
-#include <hidl/Status.h>
+#include <atomic>
+#include <memory>
 
+#include <android/hardware/audio/2.0/IStreamIn.h>
 #include <hidl/MQDescriptor.h>
+#include <fmq/EventFlag.h>
+#include <fmq/MessageQueue.h>
+#include <hidl/Status.h>
+#include <utils/Thread.h>
 
 #include "Stream.h"
 
@@ -39,6 +44,7 @@
 using ::android::hardware::audio::V2_0::IStreamIn;
 using ::android::hardware::audio::V2_0::ParameterValue;
 using ::android::hardware::audio::V2_0::Result;
+using ::android::hardware::audio::V2_0::ThreadPriority;
 using ::android::hardware::Return;
 using ::android::hardware::Void;
 using ::android::hardware::hidl_vec;
@@ -46,6 +52,9 @@
 using ::android::sp;
 
 struct StreamIn : public IStreamIn {
+    typedef MessageQueue<uint8_t, kSynchronizedReadWrite> DataMQ;
+    typedef MessageQueue<ReadStatus, kSynchronizedReadWrite> StatusMQ;
+
     StreamIn(audio_hw_device_t* device, audio_stream_in_t* stream);
 
     // Methods from ::android::hardware::audio::V2_0::IStream follow.
@@ -73,11 +82,14 @@
             const hidl_vec<hidl_string>& keys, getParameters_cb _hidl_cb)  override;
     Return<Result> setParameters(const hidl_vec<ParameterValue>& parameters)  override;
     Return<void> debugDump(const hidl_handle& fd)  override;
+    Return<Result> close()  override;
 
     // Methods from ::android::hardware::audio::V2_0::IStreamIn follow.
     Return<void> getAudioSource(getAudioSource_cb _hidl_cb)  override;
     Return<Result> setGain(float gain)  override;
-    Return<void> read(uint64_t size, read_cb _hidl_cb)  override;
+    Return<void> prepareForReading(
+            uint32_t frameSize, uint32_t framesCount, ThreadPriority threadPriority,
+            prepareForReading_cb _hidl_cb)  override;
     Return<uint32_t> getInputFramesLost()  override;
     Return<void> getCapturePosition(getCapturePosition_cb _hidl_cb)  override;
     Return<Result> start() override;
@@ -86,11 +98,16 @@
     Return<void> getMmapPosition(getMmapPosition_cb _hidl_cb) override;
 
   private:
+    bool mIsClosed;
     audio_hw_device_t *mDevice;
     audio_stream_in_t *mStream;
     sp<Stream> mStreamCommon;
     sp<StreamMmap<audio_stream_in_t>> mStreamMmap;
-
+    std::unique_ptr<DataMQ> mDataMQ;
+    std::unique_ptr<StatusMQ> mStatusMQ;
+    EventFlag* mEfGroup;
+    std::atomic<bool> mStopReadThread;
+    sp<Thread> mReadThread;
 
     virtual ~StreamIn();
 };
diff --git a/audio/2.0/default/StreamOut.cpp b/audio/2.0/default/StreamOut.cpp
index 3d20d11..4bb2274 100644
--- a/audio/2.0/default/StreamOut.cpp
+++ b/audio/2.0/default/StreamOut.cpp
@@ -17,8 +17,9 @@
 #define LOG_TAG "StreamOutHAL"
 //#define LOG_NDEBUG 0
 
-#include <hardware/audio.h>
 #include <android/log.h>
+#include <hardware/audio.h>
+#include <mediautils/SchedulingPolicyService.h>
 
 #include "StreamOut.h"
 
@@ -28,15 +29,103 @@
 namespace V2_0 {
 namespace implementation {
 
+namespace {
+
+class WriteThread : public Thread {
+  public:
+    // WriteThread's lifespan never exceeds StreamOut's lifespan.
+    WriteThread(std::atomic<bool>* stop,
+            audio_stream_out_t* stream,
+            StreamOut::DataMQ* dataMQ,
+            StreamOut::StatusMQ* statusMQ,
+            EventFlag* efGroup,
+            ThreadPriority threadPriority)
+            : Thread(false /*canCallJava*/),
+              mStop(stop),
+              mStream(stream),
+              mDataMQ(dataMQ),
+              mStatusMQ(statusMQ),
+              mEfGroup(efGroup),
+              mThreadPriority(threadPriority),
+              mBuffer(new uint8_t[dataMQ->getQuantumCount()]) {
+    }
+    virtual ~WriteThread() {}
+
+    status_t readyToRun() override;
+
+  private:
+    std::atomic<bool>* mStop;
+    audio_stream_out_t* mStream;
+    StreamOut::DataMQ* mDataMQ;
+    StreamOut::StatusMQ* mStatusMQ;
+    EventFlag* mEfGroup;
+    ThreadPriority mThreadPriority;
+    std::unique_ptr<uint8_t[]> mBuffer;
+
+    bool threadLoop() override;
+};
+
+status_t WriteThread::readyToRun() {
+    if (mThreadPriority != ThreadPriority::NORMAL) {
+        int err = requestPriority(
+                getpid(), getTid(), static_cast<int>(mThreadPriority), true /*asynchronous*/);
+        ALOGW_IF(err, "failed to set priority %d for pid %d tid %d; error %d",
+                static_cast<int>(mThreadPriority), getpid(), getTid(), err);
+    }
+    return OK;
+}
+
+bool WriteThread::threadLoop() {
+    // This implementation doesn't return control back to the Thread until it decides to stop,
+    // as the Thread uses mutexes, and this can lead to priority inversion.
+    while(!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
+        // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
+        uint32_t efState = 0;
+        mEfGroup->wait(
+                static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY), &efState, NS_PER_SEC);
+        if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) {
+            continue;  // Nothing to do.
+        }
+
+        const size_t availToRead = mDataMQ->availableToRead();
+        Result retval = Result::OK;
+        uint64_t written = 0;
+        if (mDataMQ->read(&mBuffer[0], availToRead)) {
+            ssize_t writeResult = mStream->write(mStream, &mBuffer[0], availToRead);
+            if (writeResult >= 0) {
+                written = writeResult;
+            } else {
+                retval = Stream::analyzeStatus("write", writeResult);
+            }
+        }
+        uint64_t frames = 0;
+        struct timespec halTimeStamp = { 0, 0 };
+        if (retval == Result::OK && mStream->get_presentation_position != NULL) {
+            mStream->get_presentation_position(mStream, &frames, &halTimeStamp);
+        }
+        IStreamOut::WriteStatus status = { retval, written, frames,
+                                           { static_cast<uint64_t>(halTimeStamp.tv_sec),
+                                             static_cast<uint64_t>(halTimeStamp.tv_nsec) } };
+        if (!mStatusMQ->write(&status)) {
+            ALOGW("status message queue write failed");
+        }
+        mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
+    }
+
+    return false;
+}
+
+}  // namespace
+
 StreamOut::StreamOut(audio_hw_device_t* device, audio_stream_out_t* stream)
-        : mDevice(device), mStream(stream),
+        : mIsClosed(false), mDevice(device), mStream(stream),
           mStreamCommon(new Stream(&stream->common)),
-          mStreamMmap(new StreamMmap<audio_stream_out_t>(stream)) {
+          mStreamMmap(new StreamMmap<audio_stream_out_t>(stream)),
+          mEfGroup(nullptr), mStopWriteThread(false) {
 }
 
 StreamOut::~StreamOut() {
-    mCallback.clear();
-    mDevice->close_output_stream(mDevice, mStream);
+    close();
     mStream = nullptr;
     mDevice = nullptr;
 }
@@ -135,6 +224,23 @@
     return mStreamCommon->debugDump(fd);
 }
 
+Return<Result> StreamOut::close()  {
+    if (mIsClosed) return Result::INVALID_STATE;
+    mIsClosed = true;
+    if (mWriteThread.get()) {
+        mStopWriteThread.store(true, std::memory_order_release);
+        status_t status = mWriteThread->requestExitAndWait();
+        ALOGE_IF(status, "write thread exit error: %s", strerror(-status));
+    }
+    if (mEfGroup) {
+        status_t status = EventFlag::deleteEventFlag(&mEfGroup);
+        ALOGE_IF(status, "write MQ event flag deletion error: %s", strerror(-status));
+    }
+    mCallback.clear();
+    mDevice->close_output_stream(mDevice, mStream);
+    return Result::OK;
+}
+
 // Methods from ::android::hardware::audio::V2_0::IStreamOut follow.
 Return<uint32_t> StreamOut::getLatency()  {
     return mStream->get_latency(mStream);
@@ -149,18 +255,55 @@
     return retval;
 }
 
-Return<void> StreamOut::write(const hidl_vec<uint8_t>& data, write_cb _hidl_cb)  {
-    // TODO(mnaganov): Replace with FMQ version.
-    Result retval(Result::OK);
-    uint64_t written = 0;
-    ssize_t writeResult = mStream->write(mStream, &data[0], data.size());
-    if (writeResult >= 0) {
-        written = writeResult;
-    } else {
-        retval = Stream::analyzeStatus("write", writeResult);
-        written = 0;
+Return<void> StreamOut::prepareForWriting(
+        uint32_t frameSize, uint32_t framesCount, ThreadPriority threadPriority,
+        prepareForWriting_cb _hidl_cb)  {
+    status_t status;
+    // Create message queues.
+    if (mDataMQ) {
+        ALOGE("the client attempts to call prepareForWriting twice");
+        _hidl_cb(Result::INVALID_STATE,
+                MQDescriptorSync<uint8_t>(), MQDescriptorSync<WriteStatus>());
+        return Void();
     }
-    _hidl_cb(retval, written);
+    std::unique_ptr<DataMQ> tempDataMQ(
+            new DataMQ(frameSize * framesCount, true /* EventFlag */));
+    std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1));
+    if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
+        ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid");
+        ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid");
+        _hidl_cb(Result::INVALID_ARGUMENTS,
+                MQDescriptorSync<uint8_t>(), MQDescriptorSync<WriteStatus>());
+        return Void();
+    }
+    // TODO: Remove event flag management once blocking MQ is implemented. b/33815422
+    status = EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
+    if (status != OK || !mEfGroup) {
+        ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
+        _hidl_cb(Result::INVALID_ARGUMENTS,
+                MQDescriptorSync<uint8_t>(), MQDescriptorSync<WriteStatus>());
+        return Void();
+    }
+
+    // Create and launch the thread.
+    mWriteThread = new WriteThread(
+            &mStopWriteThread,
+            mStream,
+            tempDataMQ.get(),
+            tempStatusMQ.get(),
+            mEfGroup,
+            threadPriority);
+    status = mWriteThread->run("writer", PRIORITY_URGENT_AUDIO);
+    if (status != OK) {
+        ALOGW("failed to start writer thread: %s", strerror(-status));
+        _hidl_cb(Result::INVALID_ARGUMENTS,
+                MQDescriptorSync<uint8_t>(), MQDescriptorSync<WriteStatus>());
+        return Void();
+    }
+
+    mDataMQ = std::move(tempDataMQ);
+    mStatusMQ = std::move(tempStatusMQ);
+    _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc());
     return Void();
 }
 
diff --git a/audio/2.0/default/StreamOut.h b/audio/2.0/default/StreamOut.h
index 9b7f9f8..83f4447 100644
--- a/audio/2.0/default/StreamOut.h
+++ b/audio/2.0/default/StreamOut.h
@@ -17,10 +17,15 @@
 #ifndef ANDROID_HARDWARE_AUDIO_V2_0_STREAMOUT_H
 #define ANDROID_HARDWARE_AUDIO_V2_0_STREAMOUT_H
 
-#include <android/hardware/audio/2.0/IStreamOut.h>
-#include <hidl/Status.h>
+#include <atomic>
+#include <memory>
 
+#include <android/hardware/audio/2.0/IStreamOut.h>
 #include <hidl/MQDescriptor.h>
+#include <hidl/Status.h>
+#include <fmq/EventFlag.h>
+#include <fmq/MessageQueue.h>
+#include <utils/Thread.h>
 
 #include "Stream.h"
 
@@ -40,6 +45,7 @@
 using ::android::hardware::audio::V2_0::IStreamOutCallback;
 using ::android::hardware::audio::V2_0::ParameterValue;
 using ::android::hardware::audio::V2_0::Result;
+using ::android::hardware::audio::V2_0::ThreadPriority;
 using ::android::hardware::audio::V2_0::TimeSpec;
 using ::android::hardware::Return;
 using ::android::hardware::Void;
@@ -48,6 +54,9 @@
 using ::android::sp;
 
 struct StreamOut : public IStreamOut {
+    typedef MessageQueue<uint8_t, kSynchronizedReadWrite> DataMQ;
+    typedef MessageQueue<WriteStatus, kSynchronizedReadWrite> StatusMQ;
+
     StreamOut(audio_hw_device_t* device, audio_stream_out_t* stream);
 
     // Methods from ::android::hardware::audio::V2_0::IStream follow.
@@ -75,11 +84,14 @@
             const hidl_vec<hidl_string>& keys, getParameters_cb _hidl_cb)  override;
     Return<Result> setParameters(const hidl_vec<ParameterValue>& parameters)  override;
     Return<void> debugDump(const hidl_handle& fd)  override;
+    Return<Result> close()  override;
 
     // Methods from ::android::hardware::audio::V2_0::IStreamOut follow.
     Return<uint32_t> getLatency()  override;
     Return<Result> setVolume(float left, float right)  override;
-    Return<void> write(const hidl_vec<uint8_t>& data, write_cb _hidl_cb)  override;
+    Return<void> prepareForWriting(
+            uint32_t frameSize, uint32_t framesCount, ThreadPriority threadPriority,
+            prepareForWriting_cb _hidl_cb)  override;
     Return<void> getRenderPosition(getRenderPosition_cb _hidl_cb)  override;
     Return<void> getNextWriteTimestamp(getNextWriteTimestamp_cb _hidl_cb)  override;
     Return<Result> setCallback(const sp<IStreamOutCallback>& callback)  override;
@@ -97,11 +109,17 @@
     Return<void> getMmapPosition(getMmapPosition_cb _hidl_cb) override;
 
   private:
+    bool mIsClosed;
     audio_hw_device_t *mDevice;
     audio_stream_out_t *mStream;
     sp<Stream> mStreamCommon;
     sp<StreamMmap<audio_stream_out_t>> mStreamMmap;
     sp<IStreamOutCallback> mCallback;
+    std::unique_ptr<DataMQ> mDataMQ;
+    std::unique_ptr<StatusMQ> mStatusMQ;
+    EventFlag* mEfGroup;
+    std::atomic<bool> mStopWriteThread;
+    sp<Thread> mWriteThread;
 
     virtual ~StreamOut();
 
diff --git a/audio/2.0/types.hal b/audio/2.0/types.hal
index 37c39e4..8fc4314 100644
--- a/audio/2.0/types.hal
+++ b/audio/2.0/types.hal
@@ -89,3 +89,21 @@
     int64_t  timeNanoseconds; // time stamp in ns, CLOCK_MONOTONIC
     int32_t  positionFrames;  // increasing 32 bit frame count reset when IStream.stop() is called
 };
+
+/*
+ * The message queue flags used to synchronize reads and writes from
+ * message queues used by StreamIn and StreamOut.
+ */
+enum MessageQueueFlagBits : uint32_t {
+    NOT_EMPTY = 1 << 0,
+    NOT_FULL = 1 << 1
+};
+
+/*
+ * The priority of threads executing reads and writes of audio data.
+ */
+enum ThreadPriority : int32_t {
+    NORMAL = 0,
+    FAST_CAPTURE = 3,
+    FAST_MIXER = 3
+};
diff --git a/audio/effect/2.0/vts/functional/vts/testcases/hal/audio/effect/hidl/target/AndroidTest.xml b/audio/effect/2.0/vts/functional/vts/testcases/hal/audio/effect/hidl/target/AndroidTest.xml
index f0af67a..60a2cd0 100644
--- a/audio/effect/2.0/vts/functional/vts/testcases/hal/audio/effect/hidl/target/AndroidTest.xml
+++ b/audio/effect/2.0/vts/functional/vts/testcases/hal/audio/effect/hidl/target/AndroidTest.xml
@@ -24,6 +24,7 @@
             _32bit::DATA/nativetest/audio_effect_hidl_hal_test/audio_effect_hidl_hal_test,
             _64bit::DATA/nativetest64/audio_effect_hidl_hal_test/audio_effect_hidl_hal_test,
             "/>
+        <option name="test-config-path" value="vts/testcases/hal/audio/effect/hidl/target/HalAudioEffectHidlTargetBasicTest.config" />
         <option name="binary-test-type" value="gtest" />
         <option name="test-timeout" value="1m" />
     </test>
diff --git a/audio/effect/2.0/vts/functional/vts/testcases/hal/audio/effect/hidl/target/HalAudioEffectHidlTargetBasicTest.config b/audio/effect/2.0/vts/functional/vts/testcases/hal/audio/effect/hidl/target/HalAudioEffectHidlTargetBasicTest.config
new file mode 100644
index 0000000..495fda9
--- /dev/null
+++ b/audio/effect/2.0/vts/functional/vts/testcases/hal/audio/effect/hidl/target/HalAudioEffectHidlTargetBasicTest.config
@@ -0,0 +1,11 @@
+{
+    "use_gae_db": true,
+    "coverage": true,
+    "modules": [{
+                    "module_name": "system/lib64/hw/android.hardware.audio.effect@2.0-impl",
+                    "git_project": {
+                        "name": "platform/hardware/interfaces",
+                        "path": "hardware/interfaces"
+                    }
+                }]
+}