ART: Destroy unprocessed tasks in TaskProcessor.

In the past I've seen valgrind complain about leaked tasks
in the TaskProcessor, so make sure we clean up properly.
Also hold the mutex and condition variable directly in the
object instead of allocating separate objects on the heap.

Test: m test-art-host-gtest
Test: testrunner.py --host --optimizing
Change-Id: If0d4ac39bf3d8e2aa9b465186d4fa7c3cb746718
diff --git a/runtime/gc/task_processor.cc b/runtime/gc/task_processor.cc
index e928644..64e8322 100644
--- a/runtime/gc/task_processor.cc
+++ b/runtime/gc/task_processor.cc
@@ -23,32 +23,37 @@
 namespace gc {
 
 TaskProcessor::TaskProcessor()
-    : lock_(new Mutex("Task processor lock", kReferenceProcessorLock)), is_running_(false),
+    : lock_("Task processor lock", kReferenceProcessorLock),
+      cond_("Task processor condition", lock_),
+      is_running_(false),
       running_thread_(nullptr) {
-  // Piggyback off the reference processor lock level.
-  cond_.reset(new ConditionVariable("Task processor condition", *lock_));
 }
 
 TaskProcessor::~TaskProcessor() {
-  delete lock_;
+  if (!tasks_.empty()) {
+    LOG(WARNING) << "TaskProcessor: Finalizing " << tasks_.size() << " unprocessed tasks.";
+    for (HeapTask* task : tasks_) {
+      task->Finalize();
+    }
+  }
 }
 
 void TaskProcessor::AddTask(Thread* self, HeapTask* task) {
   ScopedThreadStateChange tsc(self, kWaitingForTaskProcessor);
-  MutexLock mu(self, *lock_);
+  MutexLock mu(self, lock_);
   tasks_.insert(task);
-  cond_->Signal(self);
+  cond_.Signal(self);
 }
 
 HeapTask* TaskProcessor::GetTask(Thread* self) {
   ScopedThreadStateChange tsc(self, kWaitingForTaskProcessor);
-  MutexLock mu(self, *lock_);
+  MutexLock mu(self, lock_);
   while (true) {
     if (tasks_.empty()) {
       if (!is_running_) {
         return nullptr;
       }
-      cond_->Wait(self);  // Empty queue, wait until we are signalled.
+      cond_.Wait(self);  // Empty queue, wait until we are signalled.
     } else {
       // Non empty queue, look at the top element and see if we are ready to run it.
       const uint64_t current_time = NanoTime();
@@ -61,18 +66,18 @@
         return task;
       }
       DCHECK_GT(target_time, current_time);
-      // Wait untl we hit the target run time.
+      // Wait until we hit the target run time.
       const uint64_t delta_time = target_time - current_time;
       const uint64_t ms_delta = NsToMs(delta_time);
       const uint64_t ns_delta = delta_time - MsToNs(ms_delta);
-      cond_->TimedWait(self, static_cast<int64_t>(ms_delta), static_cast<int32_t>(ns_delta));
+      cond_.TimedWait(self, static_cast<int64_t>(ms_delta), static_cast<int32_t>(ns_delta));
     }
   }
   UNREACHABLE();
 }
 
 void TaskProcessor::UpdateTargetRunTime(Thread* self, HeapTask* task, uint64_t new_target_time) {
-  MutexLock mu(self, *lock_);
+  MutexLock mu(self, lock_);
   // Find the task.
   auto range = tasks_.equal_range(task);
   for (auto it = range.first; it != range.second; ++it) {
@@ -85,7 +90,7 @@
         // If we became the first task then we may need to signal since we changed the task that we
         // are sleeping on.
         if (*tasks_.begin() == task) {
-          cond_->Signal(self);
+          cond_.Signal(self);
         }
         return;
       }
@@ -94,24 +99,24 @@
 }
 
 bool TaskProcessor::IsRunning() const {
-  MutexLock mu(Thread::Current(), *lock_);
+  MutexLock mu(Thread::Current(), lock_);
   return is_running_;
 }
 
 Thread* TaskProcessor::GetRunningThread() const {
-  MutexLock mu(Thread::Current(), *lock_);
+  MutexLock mu(Thread::Current(), lock_);
   return running_thread_;
 }
 
 void TaskProcessor::Stop(Thread* self) {
-  MutexLock mu(self, *lock_);
+  MutexLock mu(self, lock_);
   is_running_ = false;
   running_thread_ = nullptr;
-  cond_->Broadcast(self);
+  cond_.Broadcast(self);
 }
 
 void TaskProcessor::Start(Thread* self) {
-  MutexLock mu(self, *lock_);
+  MutexLock mu(self, lock_);
   is_running_ = true;
   running_thread_ = self;
 }