Merge "Implement Queue"
diff --git a/gd/os/Android.bp b/gd/os/Android.bp
index 535f144..dbad2ee 100644
--- a/gd/os/Android.bp
+++ b/gd/os/Android.bp
@@ -5,6 +5,7 @@
         "linux_generic/handler.cc",
         "linux_generic/reactor.cc",
         "linux_generic/repeating_alarm.cc",
+        "linux_generic/reactive_semaphore.cc",
         "linux_generic/thread.cc",
     ]
 }
@@ -14,6 +15,7 @@
     srcs: [
         "linux_generic/alarm_unittest.cc",
         "linux_generic/handler_unittest.cc",
+        "linux_generic/queue_unittest.cc",
         "linux_generic/reactor_unittest.cc",
         "linux_generic/repeating_alarm_unittest.cc",
         "linux_generic/thread_unittest.cc",
diff --git a/gd/os/handler.h b/gd/os/handler.h
index dc6d510..509723e 100644
--- a/gd/os/handler.h
+++ b/gd/os/handler.h
@@ -46,6 +46,9 @@
   // Remove all pending events from the queue of this handler
   void Clear();
 
+  template <typename T>
+  friend class Queue;
+
  private:
   std::queue<Closure> tasks_;
   Thread* thread_;
diff --git a/gd/os/linux_generic/linux.h b/gd/os/linux_generic/linux.h
new file mode 100644
index 0000000..524aa7d
--- /dev/null
+++ b/gd/os/linux_generic/linux.h
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#ifndef EFD_SEMAPHORE
+#define EFD_SEMAPHORE 1
+#endif
diff --git a/gd/os/linux_generic/queue.tpp b/gd/os/linux_generic/queue.tpp
new file mode 100644
index 0000000..419c162
--- /dev/null
+++ b/gd/os/linux_generic/queue.tpp
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+template <typename T>
+Queue<T>::Queue(size_t capacity)
+    : enqueue_callback_(nullptr), dequeue_callback_(nullptr), enqueue_(capacity), dequeue_(0){};
+
+template <typename T>
+Queue<T>::~Queue() {
+  ASSERT(enqueue_callback_ == nullptr);
+  ASSERT(dequeue_callback_ == nullptr);
+};
+
+template <typename T>
+void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  ASSERT(enqueue_.handler_ == nullptr);
+  ASSERT(enqueue_callback_ == nullptr);
+  ASSERT(enqueue_.reactable_ == nullptr);
+  enqueue_.handler_ = handler;
+  enqueue_callback_ = callback;
+  enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register(
+      enqueue_.reactive_semaphore_.GetFd(), [this] { EnqueueCallbackInternal(); }, nullptr);
+}
+
+template <typename T>
+void Queue<T>::UnregisterEnqueue() {
+  std::lock_guard<std::mutex> lock(mutex_);
+  ASSERT(enqueue_.reactable_ != nullptr);
+  enqueue_.handler_->thread_->GetReactor()->Unregister(enqueue_.reactable_);
+  enqueue_.reactable_ = nullptr;
+  enqueue_callback_ = nullptr;
+  enqueue_.handler_ = nullptr;
+}
+
+template <typename T>
+void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  ASSERT(dequeue_.handler_ == nullptr);
+  ASSERT(dequeue_callback_ == nullptr);
+  ASSERT(dequeue_.reactable_ == nullptr);
+  dequeue_.handler_ = handler;
+  dequeue_callback_ = callback;
+  dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register(dequeue_.reactive_semaphore_.GetFd(),
+                                                                           [this] { dequeue_callback_(); }, nullptr);
+}
+
+template <typename T>
+void Queue<T>::UnregisterDequeue() {
+  std::lock_guard<std::mutex> lock(mutex_);
+  ASSERT(dequeue_.reactable_ != nullptr);
+  dequeue_.handler_->thread_->GetReactor()->Unregister(dequeue_.reactable_);
+  dequeue_.reactable_ = nullptr;
+  dequeue_callback_ = nullptr;
+  dequeue_.handler_ = nullptr;
+}
+
+template <typename T>
+std::unique_ptr<T> Queue<T>::TryDequeue() {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  if (queue_.empty()) {
+    return nullptr;
+  }
+
+  dequeue_.reactive_semaphore_.Decrease();
+
+  std::unique_ptr<T> data = std::move(queue_.front());
+  queue_.pop();
+
+  enqueue_.reactive_semaphore_.Increase();
+
+  return data;
+}
+
+template <typename T>
+void Queue<T>::EnqueueCallbackInternal() {
+  enqueue_.reactive_semaphore_.Decrease();
+
+  {
+    std::unique_ptr<T> data = enqueue_callback_();
+    ASSERT(data != nullptr);
+    std::lock_guard<std::mutex> lock(mutex_);
+    queue_.push(std::move(data));
+  }
+
+  dequeue_.reactive_semaphore_.Increase();
+}
diff --git a/gd/os/linux_generic/queue_unittest.cc b/gd/os/linux_generic/queue_unittest.cc
new file mode 100644
index 0000000..a4dc6d7
--- /dev/null
+++ b/gd/os/linux_generic/queue_unittest.cc
@@ -0,0 +1,683 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "os/queue.h"
+
+#include <sys/eventfd.h>
+#include <future>
+#include <unordered_map>
+
+#include "gtest/gtest.h"
+#include "os/reactor.h"
+
+namespace bluetooth {
+namespace os {
+namespace {
+
+constexpr int kQueueSize = 10;
+constexpr int kHalfOfQueueSize = kQueueSize / 2;
+constexpr int kDoubleOfQueueSize = kQueueSize * 2;
+constexpr int kQueueSizeOne = 1;
+
+class QueueTest : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    enqueue_thread_ = new Thread("enqueue_thread", Thread::Priority::NORMAL);
+    enqueue_handler_ = new Handler(enqueue_thread_);
+    dequeue_thread_ = new Thread("dequeue_thread", Thread::Priority::NORMAL);
+    dequeue_handler_ = new Handler(dequeue_thread_);
+  }
+  void TearDown() override {
+    delete enqueue_handler_;
+    delete enqueue_thread_;
+    delete dequeue_handler_;
+    delete dequeue_thread_;
+    enqueue_handler_ = nullptr;
+    enqueue_thread_ = nullptr;
+    dequeue_handler_ = nullptr;
+    dequeue_thread_ = nullptr;
+  }
+
+  Thread* enqueue_thread_;
+  Handler* enqueue_handler_;
+  Thread* dequeue_thread_;
+  Handler* dequeue_handler_;
+};
+
+class TestEnqueueEnd {
+ public:
+  explicit TestEnqueueEnd(Queue<std::string>* queue, Handler* handler)
+      : count(0), handler_(handler), queue_(queue), delay_(0) {}
+
+  ~TestEnqueueEnd() {
+    LOG_INFO("~TestEnqueueEnd");  // Debug log, will be removed
+  }
+
+  void RegisterEnqueue(std::unordered_map<int, std::promise<int>>* promise_map) {
+    LOG_INFO("RegisterEnqueue");  // Debug log, will be removed
+    promise_map_ = promise_map;
+    handler_->Post([this] { queue_->RegisterEnqueue(handler_, [this] { return EnqueueCallbackForTest(); }); });
+  }
+
+  void UnregisterEnqueue() {
+    LOG_INFO("UnregisterEnqueue");  // Debug log, will be removed
+    std::promise<void> promise;
+    auto feature = promise.get_future();
+
+    handler_->Post([this, &promise] {
+      queue_->UnregisterEnqueue();
+      promise.set_value();
+    });
+    feature.wait();
+  }
+
+  std::unique_ptr<std::string> EnqueueCallbackForTest() {
+    if (delay_ != 0) {
+      std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
+    }
+
+    count++;
+    std::unique_ptr<std::string> data = std::move(buffer_.front());
+    buffer_.pop();
+    std::string copy = *data;
+    LOG_INFO(": pop %s, size %d", copy.c_str(), (int)buffer_.size());  // Debug log, will be removed
+    if (buffer_.empty()) {
+      queue_->UnregisterEnqueue();
+    }
+
+    auto pair = promise_map_->find(buffer_.size());
+    if (pair != promise_map_->end()) {
+      LOG_INFO("promises : %d", pair->first);  // Debug log, will be removed
+      pair->second.set_value(pair->first);
+      promise_map_->erase(pair->first);
+    }
+    return data;
+  }
+
+  void setDelay(int value) {
+    delay_ = value;
+  }
+
+  std::queue<std::unique_ptr<std::string>> buffer_;
+  int count;
+
+ private:
+  Handler* handler_;
+  Queue<std::string>* queue_;
+  std::unordered_map<int, std::promise<int>>* promise_map_;
+  int delay_;
+};
+
+class TestDequeueEnd {
+ public:
+  explicit TestDequeueEnd(Queue<std::string>* queue, Handler* handler, int capacity)
+      : count(0), handler_(handler), queue_(queue), capacity_(capacity), delay_(0) {}
+
+  ~TestDequeueEnd() {
+    LOG_INFO("~TestDequeueEnd");  // Debug log, will be removed
+  }
+
+  void RegisterDequeue(std::unordered_map<int, std::promise<int>>* promise_map) {
+    LOG_INFO("RegisterDequeue");  // Debug log, will be removed
+    promise_map_ = promise_map;
+    handler_->Post([this] { queue_->RegisterDequeue(handler_, [this] { DequeueCallbackForTest(); }); });
+  }
+
+  void UnregisterDequeue() {
+    LOG_INFO("UnregisterDequeue");  // Debug log, will be removed
+    std::promise<void> promise;
+    auto feature = promise.get_future();
+
+    handler_->Post([this, &promise] {
+      queue_->UnregisterDequeue();
+      promise.set_value();
+    });
+    feature.wait();
+  }
+
+  void DequeueCallbackForTest() {
+    if (delay_ != 0) {
+      std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
+    }
+
+    count++;
+    std::unique_ptr<std::string> data = queue_->TryDequeue();
+    std::string copy = *data;  // Debug log, will be removed
+    buffer_.push(std::move(data));
+    LOG_INFO("push %s, size %d", copy.c_str(), (int)buffer_.size());  // Debug log, will be removed
+
+    auto pair = promise_map_->find(buffer_.size());
+    if (pair != promise_map_->end()) {
+      LOG_INFO("promises : %d", pair->first);  // Debug log, will be removed
+      pair->second.set_value(pair->first);
+      promise_map_->erase(pair->first);
+    }
+
+    if (buffer_.size() == capacity_) {
+      queue_->UnregisterDequeue();
+    }
+  }
+
+  void setDelay(int value) {
+    delay_ = value;
+  }
+
+  std::queue<std::unique_ptr<std::string>> buffer_;
+  int count;
+
+ private:
+  Handler* handler_;
+  Queue<std::string>* queue_;
+  std::unordered_map<int, std::promise<int>>* promise_map_;
+  int capacity_;
+  int delay_;
+};
+
+// Enqueue end level : 0 -> queue is full, 1 - >  queue isn't full
+// Dequeue end level : 0 -> queue is empty, 1 - >  queue isn't empty
+
+// Test 1 : Queue is empty
+
+// Enqueue end level : 1
+// Dequeue end level : 0
+// Test 1-1 EnqueueCallback should continually be invoked when queue isn't full
+TEST_F(QueueTest, register_enqueue_with_empty_queue) {
+  Queue<std::string> queue(kQueueSize);
+  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
+
+  // Push kQueueSize data to enqueue_end buffer
+  for (int i = 0; i < kQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+  EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
+
+  // Register enqueue and expect data move to Queue
+  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
+  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+  auto enqueue_future = enqueue_promise_map[0].get_future();
+  enqueue_future.wait();
+  EXPECT_EQ(enqueue_future.get(), 0);
+  std::this_thread::sleep_for(std::chrono::milliseconds(20));
+}
+
+// Enqueue end level : 1
+// Dequeue end level : 0
+// Test 1-2 DequeueCallback shouldn't be invoked when queue is empty
+TEST_F(QueueTest, register_dequeue_with_empty_queue) {
+  Queue<std::string> queue(kQueueSize);
+  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
+
+  // Register dequeue, DequeueCallback shouldn't be invoked
+  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
+  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
+  std::this_thread::sleep_for(std::chrono::milliseconds(20));
+  EXPECT_EQ(test_dequeue_end.count, 0);
+
+  test_dequeue_end.UnregisterDequeue();
+}
+
+// Test 2 : Queue is full
+
+// Enqueue end level : 0
+// Dequeue end level : 1
+// Test 2-1 EnqueueCallback shouldn't be invoked when queue is full
+TEST_F(QueueTest, register_enqueue_with_full_queue) {
+  Queue<std::string> queue(kQueueSize);
+  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
+
+  // make Queue full
+  for (int i = 0; i < kQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
+  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+  auto enqueue_future = enqueue_promise_map[0].get_future();
+  enqueue_future.wait();
+  EXPECT_EQ(enqueue_future.get(), 0);
+
+  // push some data to enqueue_end buffer and register enqueue;
+  for (int i = 0; i < kHalfOfQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+
+  // EnqueueCallback shouldn't be invoked
+  std::this_thread::sleep_for(std::chrono::milliseconds(20));
+  EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
+  EXPECT_EQ(test_enqueue_end.count, kQueueSize);
+
+  test_enqueue_end.UnregisterEnqueue();
+}
+
+// Enqueue end level : 0
+// Dequeue end level : 1
+// Test 2-2 DequeueCallback should continually be invoked when queue isn't empty
+TEST_F(QueueTest, register_dequeue_with_full_queue) {
+  Queue<std::string> queue(kQueueSize);
+  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
+  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
+
+  // make Queue full
+  for (int i = 0; i < kQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
+  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+  auto enqueue_future = enqueue_promise_map[0].get_future();
+  enqueue_future.wait();
+  EXPECT_EQ(enqueue_future.get(), 0);
+
+  // Register dequeue and expect data move to dequeue end buffer
+  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
+  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
+  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
+  auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
+  dequeue_future.wait();
+  EXPECT_EQ(dequeue_future.get(), kQueueSize);
+
+  test_dequeue_end.UnregisterDequeue();
+}
+
+// Test 3 : Queue is non-empty and non-full
+
+// Enqueue end level : 1
+// Dequeue end level : 1
+// Test 3-1 Register enqueue with half empty queue, EnqueueCallback should continually be invoked
+TEST_F(QueueTest, register_enqueue_with_half_empty_queue) {
+  Queue<std::string> queue(kQueueSize);
+  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
+
+  // make Queue half empty
+  for (int i = 0; i < kHalfOfQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
+  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+  auto enqueue_future = enqueue_promise_map[0].get_future();
+  enqueue_future.wait();
+  EXPECT_EQ(enqueue_future.get(), 0);
+
+  // push some data to enqueue_end buffer and register enqueue;
+  for (int i = 0; i < kHalfOfQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+
+  // Register enqueue and expect data move to Queue
+  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+  enqueue_future = enqueue_promise_map[0].get_future();
+  enqueue_future.wait();
+  EXPECT_EQ(enqueue_future.get(), 0);
+}
+
+// Enqueue end level : 1
+// Dequeue end level : 1
+// Test 3-2 Register dequeue with half empty queue, DequeueCallback should continually be invoked
+TEST_F(QueueTest, register_dequeue_with_half_empty_queue) {
+  Queue<std::string> queue(kQueueSize);
+  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
+  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
+
+  // make Queue half empty
+  for (int i = 0; i < kHalfOfQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
+  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+  auto enqueue_future = enqueue_promise_map[0].get_future();
+  enqueue_future.wait();
+  EXPECT_EQ(enqueue_future.get(), 0);
+
+  // Register dequeue and expect data move to dequeue end buffer
+  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
+  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
+                              std::forward_as_tuple());
+  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
+  auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
+  dequeue_future.wait();
+  EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
+
+  test_dequeue_end.UnregisterDequeue();
+}
+
+// Dynamic level test
+
+// Test 4 : Queue becomes full during test, EnqueueCallback should stop to be invoked
+
+// Enqueue end level : 1 -> 0
+// Dequeue end level : 1
+// Test 4-1 Queue becomes full due to only register EnqueueCallback
+TEST_F(QueueTest, queue_becomes_full_enqueue_callback_only) {
+  Queue<std::string> queue(kQueueSize);
+  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
+
+  // push double of kQueueSize to enqueue end buffer
+  for (int i = 0; i < kDoubleOfQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+
+  // Register enqueue and expect kQueueSize data move to Queue
+  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
+  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+  auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
+  enqueue_future.wait();
+  EXPECT_EQ(enqueue_future.get(), kQueueSize);
+
+  // EnqueueCallback shouldn't be invoked and buffer size stay in kQueueSize
+  std::this_thread::sleep_for(std::chrono::milliseconds(20));
+  EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
+  EXPECT_EQ(test_enqueue_end.count, kQueueSize);
+
+  test_enqueue_end.UnregisterEnqueue();
+}
+
+// Enqueue end level : 1 -> 0
+// Dequeue end level : 1
+// Test 4-2 Queue becomes full due to DequeueCallback unregister during test
+TEST_F(QueueTest, queue_becomes_full_dequeue_callback_unregister) {
+  Queue<std::string> queue(kQueueSize);
+  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
+  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
+
+  // push double of kQueueSize to enqueue end buffer
+  for (int i = 0; i < kDoubleOfQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+
+  // Register dequeue
+  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
+  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
+                              std::forward_as_tuple());
+  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
+  auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
+
+  // Register enqueue
+  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
+  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
+                              std::forward_as_tuple());
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+  auto enqueue_future = enqueue_promise_map[kHalfOfQueueSize].get_future();
+
+  // Dequeue end will unregister when buffer size is kHalfOfQueueSize
+  dequeue_future.wait();
+  EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
+
+  // EnqueueCallback shouldn't be invoked and buffer size stay in kHalfOfQueueSize
+  enqueue_future.wait();
+  EXPECT_EQ(enqueue_future.get(), kHalfOfQueueSize);
+  std::this_thread::sleep_for(std::chrono::milliseconds(20));
+  EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
+  EXPECT_EQ(test_enqueue_end.count, kQueueSize + kHalfOfQueueSize);
+
+  test_enqueue_end.UnregisterEnqueue();
+}
+
+// Enqueue end level : 1 -> 0
+// Dequeue end level : 1
+// Test 4-3 Queue becomes full due to DequeueCallback is slower
+TEST_F(QueueTest, queue_becomes_full_dequeue_callback_slower) {
+  Queue<std::string> queue(kQueueSize);
+  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
+  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
+
+  // push double of kDoubleOfQueueSize to enqueue end buffer
+  for (int i = 0; i < kDoubleOfQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+
+  // Set 20 ms delay for callback and register dequeue
+  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
+  test_dequeue_end.setDelay(20);
+  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
+  auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
+
+  // Register enqueue
+  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
+  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+  auto enqueue_future = enqueue_promise_map[0].get_future();
+
+  // Wait for enqueue buffer empty and expect queue is full
+  enqueue_future.wait();
+  EXPECT_EQ(enqueue_future.get(), 0);
+  EXPECT_EQ(test_dequeue_end.buffer_.size(), kQueueSize);
+
+  test_dequeue_end.UnregisterDequeue();
+}
+
+// Enqueue end level : 0 -> 1
+// Dequeue end level : 1 -> 0
+// Test 5 Queue becomes full and non empty at same time.
+TEST_F(QueueTest, queue_becomes_full_and_non_empty_at_same_time) {
+  Queue<std::string> queue(kQueueSizeOne);
+  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
+  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
+
+  // push double of kQueueSize to enqueue end buffer
+  for (int i = 0; i < kQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+
+  // Register dequeue
+  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
+  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
+  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
+  auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
+
+  // Register enqueue
+  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+  auto enqueue_future = enqueue_promise_map[0].get_future();
+
+  // Wait for all data move from enqueue end buffer to dequeue end buffer
+  dequeue_future.wait();
+  EXPECT_EQ(dequeue_future.get(), kQueueSize);
+
+  test_dequeue_end.UnregisterDequeue();
+}
+
+// Enqueue end level : 1 -> 0
+// Dequeue end level : 1
+// Test 6 Queue becomes not full during test, EnqueueCallback should start to be invoked
+TEST_F(QueueTest, queue_becomes_non_full_during_test) {
+  Queue<std::string> queue(kQueueSize);
+  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
+  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize * 3);
+
+  // make Queue full
+  for (int i = 0; i < kDoubleOfQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
+  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
+  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+  auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
+  enqueue_future.wait();
+  EXPECT_EQ(enqueue_future.get(), kQueueSize);
+
+  // Expect kQueueSize data block in enqueue end buffer
+  std::this_thread::sleep_for(std::chrono::milliseconds(20));
+  EXPECT_EQ(test_enqueue_end.buffer_.size(), kQueueSize);
+
+  // Register dequeue
+  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
+  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
+
+  // Expect enqueue end will empty
+  enqueue_future = enqueue_promise_map[0].get_future();
+  enqueue_future.wait();
+  EXPECT_EQ(enqueue_future.get(), 0);
+
+  test_dequeue_end.UnregisterDequeue();
+}
+
+// Enqueue end level : 0 -> 1
+// Dequeue end level : 1 -> 0
+// Test 7 Queue becomes non full and empty at same time. (Exactly same as Test 5)
+TEST_F(QueueTest, queue_becomes_non_full_and_empty_at_same_time) {
+  Queue<std::string> queue(kQueueSizeOne);
+  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
+  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
+
+  // push double of kQueueSize to enqueue end buffer
+  for (int i = 0; i < kQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+
+  // Register dequeue
+  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
+  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
+  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
+  auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
+
+  // Register enqueue
+  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+  auto enqueue_future = enqueue_promise_map[0].get_future();
+
+  // Wait for all data move from enqueue end buffer to dequeue end buffer
+  dequeue_future.wait();
+  EXPECT_EQ(dequeue_future.get(), kQueueSize);
+
+  test_dequeue_end.UnregisterDequeue();
+}
+
+// Test 8 : Queue becomes empty during test, DequeueCallback should stop to be invoked
+
+// Enqueue end level : 1
+// Dequeue end level : 1 -> 0
+// Test 8-1 Queue becomes empty due to only register DequeueCallback
+TEST_F(QueueTest, queue_becomes_empty_dequeue_callback_only) {
+  Queue<std::string> queue(kQueueSize);
+  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
+  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
+
+  // make Queue half empty
+  for (int i = 0; i < kHalfOfQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
+  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+  auto enqueue_future = enqueue_promise_map[0].get_future();
+  enqueue_future.wait();
+  EXPECT_EQ(enqueue_future.get(), 0);
+
+  // Register dequeue, expect kHalfOfQueueSize data move to dequeue end buffer
+  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
+  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
+                              std::forward_as_tuple());
+  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
+  auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
+  dequeue_future.wait();
+  EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
+
+  // Expect DequeueCallback should stop to be invoked
+  std::this_thread::sleep_for(std::chrono::milliseconds(20));
+  EXPECT_EQ(test_dequeue_end.count, kHalfOfQueueSize);
+}
+
+// Enqueue end level : 1
+// Dequeue end level : 1 -> 0
+// Test 8-2 Queue becomes empty due to EnqueueCallback unregister during test
+TEST_F(QueueTest, queue_becomes_empty_enqueue_callback_unregister) {
+  Queue<std::string> queue(kQueueSize);
+  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
+  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
+
+  // make Queue half empty
+  for (int i = 0; i < kHalfOfQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
+  enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+  auto enqueue_future = enqueue_promise_map[0].get_future();
+  enqueue_future.wait();
+  EXPECT_EQ(enqueue_future.get(), 0);
+
+  // push kHalfOfQueueSize to enqueue end buffer and register enqueue.
+  for (int i = 0; i < kHalfOfQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+
+  // Register dequeue, expect kQueueSize move to dequeue end buffer
+  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
+  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
+  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
+  auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
+  dequeue_future.wait();
+  EXPECT_EQ(dequeue_future.get(), kQueueSize);
+
+  // Expect DequeueCallback should stop to be invoked
+  std::this_thread::sleep_for(std::chrono::milliseconds(20));
+  EXPECT_EQ(test_dequeue_end.count, kQueueSize);
+}
+
+// Enqueue end level : 1
+// Dequeue end level : 0 -> 1
+// Test 9 Queue becomes not empty during test, DequeueCallback should start to be invoked
+TEST_F(QueueTest, queue_becomes_non_empty_during_test) {
+  Queue<std::string> queue(kQueueSize);
+  TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
+  TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
+
+  // Register dequeue
+  std::unordered_map<int, std::promise<int>> dequeue_promise_map;
+  dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
+  test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
+
+  // push kQueueSize data to enqueue end buffer and register enqueue
+  for (int i = 0; i < kQueueSize; i++) {
+    std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
+    test_enqueue_end.buffer_.push(std::move(data));
+  }
+  std::unordered_map<int, std::promise<int>> enqueue_promise_map;
+  test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
+
+  // Expect kQueueSize data move to dequeue end buffer
+  auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
+  dequeue_future.wait();
+  EXPECT_EQ(dequeue_future.get(), kQueueSize);
+}
+
+}  // namespace
+}  // namespace os
+}  // namespace bluetooth
diff --git a/gd/os/linux_generic/reactive_semaphore.cc b/gd/os/linux_generic/reactive_semaphore.cc
new file mode 100644
index 0000000..4cc0b4c
--- /dev/null
+++ b/gd/os/linux_generic/reactive_semaphore.cc
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "reactive_semaphore.h"
+
+#include <sys/eventfd.h>
+#include <unistd.h>
+#include <functional>
+
+#include "os/linux_generic/linux.h"
+#include "os/log.h"
+
+namespace bluetooth {
+namespace os {
+
+ReactiveSemaphore::ReactiveSemaphore(unsigned int value) : fd_(eventfd(value, EFD_SEMAPHORE | EFD_NONBLOCK)) {
+  ASSERT(fd_ != -1);
+}
+
+ReactiveSemaphore::~ReactiveSemaphore() {
+  int close_status;
+  RUN_NO_INTR(close_status = close(fd_));
+  ASSERT(close_status != -1);
+}
+
+void ReactiveSemaphore::Decrease() {
+  uint64_t val = 0;
+  auto read_result = eventfd_read(fd_, &val);
+  ASSERT(read_result != -1);
+}
+
+void ReactiveSemaphore::Increase() {
+  uint64_t val = 1;
+  auto write_result = eventfd_write(fd_, val);
+  ASSERT(write_result != -1);
+}
+
+int ReactiveSemaphore::GetFd() {
+  return fd_;
+}
+
+}  // namespace os
+}  // namespace bluetooth
diff --git a/gd/os/linux_generic/reactive_semaphore.h b/gd/os/linux_generic/reactive_semaphore.h
new file mode 100644
index 0000000..718666c
--- /dev/null
+++ b/gd/os/linux_generic/reactive_semaphore.h
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "os/utils.h"
+
+namespace bluetooth {
+namespace os {
+
+// A event_fd work in non-blocking and Semaphore mode
+class ReactiveSemaphore {
+ public:
+  // Creates a new ReactiveSemaphore with an initial value of |value|.
+  explicit ReactiveSemaphore(unsigned int value);
+  ~ReactiveSemaphore();
+  // Decrements the value of |fd_|, this will cause a crash if |fd_| unreadable.
+  void Decrease();
+  // Increase the value of |fd_|, this will cause a crash if |fd_| unwritable.
+  void Increase();
+  int GetFd();
+
+  DISALLOW_COPY_AND_ASSIGN(ReactiveSemaphore);
+
+ private:
+  int fd_;
+};
+
+}  // namespace os
+}  // namespace bluetooth
diff --git a/gd/os/queue.h b/gd/os/queue.h
new file mode 100644
index 0000000..9479306
--- /dev/null
+++ b/gd/os/queue.h
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <unistd.h>
+#include <functional>
+#include <mutex>
+#include <queue>
+
+#include "os/handler.h"
+#ifdef OS_LINUX_GENERIC
+#include "os/linux_generic/reactive_semaphore.h"
+#endif
+#include "os/log.h"
+
+namespace bluetooth {
+namespace os {
+
+template <typename T>
+class Queue {
+ public:
+  // A function moving data from enqueue end buffer to queue, it will be continually be invoked until queue
+  // is full. Enqueue end should make sure buffer isn't empty and UnregisterEnqueue when buffer become empty.
+  using EnqueueCallback = std::function<std::unique_ptr<T>()>;
+  // A function moving data form queue to dequeue end buffer, it will be continually be invoked until queue
+  // is empty. TryDequeue should be use in this function to get data from queue.
+  using DequeueCallback = std::function<void()>;
+  // Create a queue with |capacity| is the maximum number of messages a queue can contain
+  explicit Queue(size_t capacity);
+  ~Queue();
+  // Register |callback| that will be called on |handler| when the queue is able to enqueue one piece of data.
+  // This will cause a crash if handler or callback has already been registered before.
+  void RegisterEnqueue(Handler* handler, EnqueueCallback callback);
+  // Unregister current EnqueueCallback from this queue, this will cause a crash if not registered yet.
+  void UnregisterEnqueue();
+  // Register |callback| that will be called on |handler| when the queue has at least one piece of data ready
+  // for dequeue. This will cause a crash if handler or callback has already been registered before.
+  void RegisterDequeue(Handler* handler, DequeueCallback callback);
+  // Unregister current DequeueCallback from this queue, this will cause a crash if not registered yet.
+  void UnregisterDequeue();
+
+  // Try to dequeue an item from this queue. Return nullptr when there is nothing in the queue.
+  std::unique_ptr<T> TryDequeue();
+
+ private:
+  void EnqueueCallbackInternal();
+  // An internal queue that holds at most |capacity| pieces of data
+  std::queue<std::unique_ptr<T>> queue_;
+  // A mutex that guards data in this queue
+  std::mutex mutex_;
+  // Current dequeue callback
+  EnqueueCallback enqueue_callback_;
+  // Current enqueue callback
+  DequeueCallback dequeue_callback_;
+
+  class QueueEndpoint {
+   public:
+#ifdef OS_LINUX_GENERIC
+    explicit QueueEndpoint(unsigned int initial_value)
+        : reactive_semaphore_(initial_value), handler_(nullptr), reactable_(nullptr) {}
+    ReactiveSemaphore reactive_semaphore_;
+#endif
+    Handler* handler_;
+    Reactor::Reactable* reactable_;
+  };
+
+  QueueEndpoint enqueue_;
+  QueueEndpoint dequeue_;
+};
+
+#ifdef OS_LINUX_GENERIC
+#include "os/linux_generic/queue.tpp"
+#endif
+
+}  // namespace os
+}  // namespace bluetooth
diff --git a/gd/os/utils.h b/gd/os/utils.h
index 26e8f23..89fca66 100644
--- a/gd/os/utils.h
+++ b/gd/os/utils.h
@@ -15,6 +15,7 @@
  */
 
 #pragma once
+#include <errno.h>
 
 // A macro to re-try a syscall when it receives EINTR
 #ifndef RUN_NO_INTR