Implement suspend, resume and cancel the download.

The DownloadAction can now be suspended and resumed, using the existing
libcurl hooks to pause the download. For canceling an ongoing update,
this patch leverages the existing StopProcessing method previously used
in unittest only with a slight change: Stopping the ActionProcessor
also removes all the pending actions.

The LibcurlHttpFetcher Pause/Unpause methods where improved to support
(not crash) if paused in circumstances where there isn't a current
connection, like when waiting for the proxy resolver and when trying to
reconnect.

Finally, the value of ongoing_update_ is now properly set in the
UpdateAttempter.

Bug: 27047026
TEST=Tested suspending, resuming and canceling the update on a device.
TEST=Added unittest for the Pause/Unpause logic.

Change-Id: I0df1e1a8cf70a3b736bc9cd4899d37813f381b94
diff --git a/common/action_processor.cc b/common/action_processor.cc
index 2618e4e..3549e08 100644
--- a/common/action_processor.cc
+++ b/common/action_processor.cc
@@ -60,6 +60,10 @@
             << (suspended_ ? " while suspended" : "");
   current_action_ = nullptr;
   suspended_ = false;
+  // Delete all the actions before calling the delegate.
+  for (auto action : actions_)
+    action->SetProcessor(nullptr);
+  actions_.clear();
   if (delegate_)
     delegate_->ProcessingStopped(this);
 }
diff --git a/common/action_processor.h b/common/action_processor.h
index 050eee9..c9c179e 100644
--- a/common/action_processor.h
+++ b/common/action_processor.h
@@ -51,8 +51,9 @@
   virtual void StartProcessing();
 
   // Aborts processing. If an Action is running, it will have
-  // TerminateProcessing() called on it. The Action that was running
-  // will be lost and must be re-enqueued if this Processor is to use it.
+  // TerminateProcessing() called on it. The Action that was running and all the
+  // remaining actions will be lost and must be re-enqueued if this Processor is
+  // to use it.
   void StopProcessing();
 
   // Suspend the processing. If an Action is running, it will have the
@@ -63,7 +64,7 @@
   void SuspendProcessing();
 
   // Resume the suspended processing. If the ActionProcessor is not suspended
-  // or not running on the first place this method performs no action.
+  // or not running in the first place this method performs no action.
   void ResumeProcessing();
 
   // Returns true iff the processing was started but not yet completed nor
diff --git a/common/http_fetcher_unittest.cc b/common/http_fetcher_unittest.cc
index aaa538c..5450958 100644
--- a/common/http_fetcher_unittest.cc
+++ b/common/http_fetcher_unittest.cc
@@ -48,6 +48,7 @@
 #include "update_engine/common/multi_range_http_fetcher.h"
 #include "update_engine/common/test_utils.h"
 #include "update_engine/common/utils.h"
+#include "update_engine/mock_proxy_resolver.h"
 #include "update_engine/proxy_resolver.h"
 
 using brillo::MessageLoop;
@@ -56,6 +57,10 @@
 using std::string;
 using std::unique_ptr;
 using std::vector;
+using testing::DoAll;
+using testing::Return;
+using testing::SaveArg;
+using testing::_;
 
 namespace {
 
@@ -193,14 +198,19 @@
   AnyHttpFetcherTest() {}
   virtual ~AnyHttpFetcherTest() {}
 
-  virtual HttpFetcher* NewLargeFetcher(size_t num_proxies) = 0;
+  virtual HttpFetcher* NewLargeFetcher(ProxyResolver* proxy_resolver) = 0;
+  HttpFetcher* NewLargeFetcher(size_t num_proxies) {
+    proxy_resolver_.set_num_proxies(num_proxies);
+    return NewLargeFetcher(&proxy_resolver_);
+  }
   HttpFetcher* NewLargeFetcher() {
     return NewLargeFetcher(1);
   }
 
-  virtual HttpFetcher* NewSmallFetcher(size_t num_proxies) = 0;
+  virtual HttpFetcher* NewSmallFetcher(ProxyResolver* proxy_resolver) = 0;
   HttpFetcher* NewSmallFetcher() {
-    return NewSmallFetcher(1);
+    proxy_resolver_.set_num_proxies(1);
+    return NewSmallFetcher(&proxy_resolver_);
   }
 
   virtual string BigUrl(in_port_t port) const { return kUnusedUrl; }
@@ -227,25 +237,16 @@
  public:
   // Necessary to unhide the definition in the base class.
   using AnyHttpFetcherTest::NewLargeFetcher;
-  HttpFetcher* NewLargeFetcher(size_t num_proxies) override {
+  HttpFetcher* NewLargeFetcher(ProxyResolver* proxy_resolver) override {
     brillo::Blob big_data(1000000);
-    CHECK_GT(num_proxies, 0u);
-    proxy_resolver_.set_num_proxies(num_proxies);
     return new MockHttpFetcher(
-        big_data.data(),
-        big_data.size(),
-        reinterpret_cast<ProxyResolver*>(&proxy_resolver_));
+        big_data.data(), big_data.size(), proxy_resolver);
   }
 
   // Necessary to unhide the definition in the base class.
   using AnyHttpFetcherTest::NewSmallFetcher;
-  HttpFetcher* NewSmallFetcher(size_t num_proxies) override {
-    CHECK_GT(num_proxies, 0u);
-    proxy_resolver_.set_num_proxies(num_proxies);
-    return new MockHttpFetcher(
-        "x",
-        1,
-        reinterpret_cast<ProxyResolver*>(&proxy_resolver_));
+  HttpFetcher* NewSmallFetcher(ProxyResolver* proxy_resolver) override {
+    return new MockHttpFetcher("x", 1, proxy_resolver);
   }
 
   bool IsMock() const override { return true; }
@@ -260,12 +261,9 @@
  public:
   // Necessary to unhide the definition in the base class.
   using AnyHttpFetcherTest::NewLargeFetcher;
-  HttpFetcher* NewLargeFetcher(size_t num_proxies) override {
-    CHECK_GT(num_proxies, 0u);
-    proxy_resolver_.set_num_proxies(num_proxies);
-    LibcurlHttpFetcher *ret = new
-        LibcurlHttpFetcher(reinterpret_cast<ProxyResolver*>(&proxy_resolver_),
-                           &fake_hardware_);
+  HttpFetcher* NewLargeFetcher(ProxyResolver* proxy_resolver) override {
+    LibcurlHttpFetcher* ret =
+        new LibcurlHttpFetcher(proxy_resolver, &fake_hardware_);
     // Speed up test execution.
     ret->set_idle_seconds(1);
     ret->set_retry_seconds(1);
@@ -275,8 +273,8 @@
 
   // Necessary to unhide the definition in the base class.
   using AnyHttpFetcherTest::NewSmallFetcher;
-  HttpFetcher* NewSmallFetcher(size_t num_proxies) override {
-    return NewLargeFetcher(num_proxies);
+  HttpFetcher* NewSmallFetcher(ProxyResolver* proxy_resolver) override {
+    return NewLargeFetcher(proxy_resolver);
   }
 
   string BigUrl(in_port_t port) const override {
@@ -307,14 +305,9 @@
  public:
   // Necessary to unhide the definition in the base class.
   using AnyHttpFetcherTest::NewLargeFetcher;
-  HttpFetcher* NewLargeFetcher(size_t num_proxies) override {
-    CHECK_GT(num_proxies, 0u);
-    proxy_resolver_.set_num_proxies(num_proxies);
-    ProxyResolver* resolver =
-        reinterpret_cast<ProxyResolver*>(&proxy_resolver_);
-    MultiRangeHttpFetcher *ret =
-        new MultiRangeHttpFetcher(
-            new LibcurlHttpFetcher(resolver, &fake_hardware_));
+  HttpFetcher* NewLargeFetcher(ProxyResolver* proxy_resolver) override {
+    MultiRangeHttpFetcher* ret = new MultiRangeHttpFetcher(
+        new LibcurlHttpFetcher(proxy_resolver, &fake_hardware_));
     ret->ClearRanges();
     ret->AddRange(0);
     // Speed up test execution.
@@ -326,8 +319,8 @@
 
   // Necessary to unhide the definition in the base class.
   using AnyHttpFetcherTest::NewSmallFetcher;
-  HttpFetcher* NewSmallFetcher(size_t num_proxies) override {
-    return NewLargeFetcher(num_proxies);
+  HttpFetcher* NewSmallFetcher(ProxyResolver* proxy_resolver) override {
+    return NewLargeFetcher(proxy_resolver);
   }
 
   bool IsMulti() const override { return true; }
@@ -517,26 +510,51 @@
 }  // namespace
 
 TYPED_TEST(HttpFetcherTest, PauseTest) {
-  {
-    PausingHttpFetcherTestDelegate delegate;
-    unique_ptr<HttpFetcher> fetcher(this->test_.NewLargeFetcher());
-    delegate.paused_ = false;
-    delegate.fetcher_ = fetcher.get();
-    fetcher->set_delegate(&delegate);
+  PausingHttpFetcherTestDelegate delegate;
+  unique_ptr<HttpFetcher> fetcher(this->test_.NewLargeFetcher());
+  delegate.paused_ = false;
+  delegate.fetcher_ = fetcher.get();
+  fetcher->set_delegate(&delegate);
 
-    unique_ptr<HttpServer> server(this->test_.CreateServer());
-    ASSERT_TRUE(server->started_);
+  unique_ptr<HttpServer> server(this->test_.CreateServer());
+  ASSERT_TRUE(server->started_);
 
-    MessageLoop::TaskId callback_id;
-    callback_id = this->loop_.PostDelayedTask(
-        FROM_HERE,
-        base::Bind(&UnpausingTimeoutCallback, &delegate, &callback_id),
-        base::TimeDelta::FromMilliseconds(200));
-    fetcher->BeginTransfer(this->test_.BigUrl(server->GetPort()));
+  MessageLoop::TaskId callback_id;
+  callback_id = this->loop_.PostDelayedTask(
+      FROM_HERE,
+      base::Bind(&UnpausingTimeoutCallback, &delegate, &callback_id),
+      base::TimeDelta::FromMilliseconds(200));
+  fetcher->BeginTransfer(this->test_.BigUrl(server->GetPort()));
 
-    this->loop_.Run();
-    EXPECT_TRUE(this->loop_.CancelTask(callback_id));
-  }
+  this->loop_.Run();
+  EXPECT_TRUE(this->loop_.CancelTask(callback_id));
+}
+
+// This test will pause the fetcher while the download is not yet started
+// because it is waiting for the proxy to be resolved.
+TYPED_TEST(HttpFetcherTest, PauseWhileResolvingProxyTest) {
+  if (this->test_.IsMock())
+    return;
+  MockProxyResolver mock_resolver;
+  unique_ptr<HttpFetcher> fetcher(this->test_.NewLargeFetcher(&mock_resolver));
+
+  // Saved arguments from the proxy call.
+  ProxiesResolvedFn proxy_callback = nullptr;
+  void* proxy_data = nullptr;
+
+  EXPECT_CALL(mock_resolver, GetProxiesForUrl("http://fake_url", _, _))
+      .WillOnce(DoAll(
+          SaveArg<1>(&proxy_callback), SaveArg<2>(&proxy_data), Return(true)));
+  fetcher->BeginTransfer("http://fake_url");
+  testing::Mock::VerifyAndClearExpectations(&mock_resolver);
+
+  // Pausing and unpausing while resolving the proxy should not affect anything.
+  fetcher->Pause();
+  fetcher->Unpause();
+  fetcher->Pause();
+  // Proxy resolver comes back after we paused the fetcher.
+  ASSERT_TRUE(proxy_callback);
+  (*proxy_callback)({1, kNoProxy}, proxy_data);
 }
 
 namespace {
diff --git a/common/libcurl_http_fetcher.cc b/common/libcurl_http_fetcher.cc
index 761b74e..789f46e 100644
--- a/common/libcurl_http_fetcher.cc
+++ b/common/libcurl_http_fetcher.cc
@@ -99,6 +99,7 @@
 
   curl_handle_ = curl_easy_init();
   CHECK(curl_handle_);
+  ignore_failure_ = false;
 
   CHECK(HasProxy());
   bool is_direct = (GetCurrentProxy() == kNoProxy);
@@ -291,6 +292,12 @@
   http_response_code_ = 0;
   terminate_requested_ = false;
   sent_byte_ = false;
+
+  // If we are paused, we delay these two operations until Unpause is called.
+  if (transfer_paused_) {
+    restart_transfer_on_unpause_ = true;
+    return;
+  }
   ResumeTransfer(url_);
   CurlPerformOnce();
 }
@@ -325,96 +332,110 @@
       return;
     }
   }
-  if (0 == running_handles) {
-    GetHttpResponseCode();
-    if (http_response_code_) {
-      LOG(INFO) << "HTTP response code: " << http_response_code_;
-      no_network_retry_count_ = 0;
+
+  // If the transfer completes while paused, we should ignore the failure once
+  // the fetcher is unpaused.
+  if (running_handles == 0 && transfer_paused_ && !ignore_failure_) {
+    LOG(INFO) << "Connection closed while paused, ignoring failure.";
+    ignore_failure_ = true;
+  }
+
+  if (running_handles != 0 || transfer_paused_) {
+    // There's either more work to do or we are paused, so we just keep the
+    // file descriptors to watch up to date and exit, until we are done with the
+    // work and we are not paused.
+    SetupMessageLoopSources();
+    return;
+  }
+
+  // At this point, the transfer was completed in some way (error, connection
+  // closed or download finished).
+
+  GetHttpResponseCode();
+  if (http_response_code_) {
+    LOG(INFO) << "HTTP response code: " << http_response_code_;
+    no_network_retry_count_ = 0;
+  } else {
+    LOG(ERROR) << "Unable to get http response code.";
+  }
+
+  // we're done!
+  CleanUp();
+
+  // TODO(petkov): This temporary code tries to deal with the case where the
+  // update engine performs an update check while the network is not ready
+  // (e.g., right after resume). Longer term, we should check if the network
+  // is online/offline and return an appropriate error code.
+  if (!sent_byte_ &&
+      http_response_code_ == 0 &&
+      no_network_retry_count_ < no_network_max_retries_) {
+    no_network_retry_count_++;
+    MessageLoop::current()->PostDelayedTask(
+        FROM_HERE,
+        base::Bind(&LibcurlHttpFetcher::RetryTimeoutCallback,
+                   base::Unretained(this)),
+        TimeDelta::FromSeconds(kNoNetworkRetrySeconds));
+    LOG(INFO) << "No HTTP response, retry " << no_network_retry_count_;
+  } else if ((!sent_byte_ && !IsHttpResponseSuccess()) ||
+             IsHttpResponseError()) {
+    // The transfer completed w/ error and we didn't get any bytes.
+    // If we have another proxy to try, try that.
+    //
+    // TODO(garnold) in fact there are two separate cases here: one case is an
+    // other-than-success return code (including no return code) and no
+    // received bytes, which is necessary due to the way callbacks are
+    // currently processing error conditions;  the second is an explicit HTTP
+    // error code, where some data may have been received (as in the case of a
+    // semi-successful multi-chunk fetch).  This is a confusing behavior and
+    // should be unified into a complete, coherent interface.
+    LOG(INFO) << "Transfer resulted in an error (" << http_response_code_
+              << "), " << bytes_downloaded_ << " bytes downloaded";
+
+    PopProxy();  // Delete the proxy we just gave up on.
+
+    if (HasProxy()) {
+      // We have another proxy. Retry immediately.
+      LOG(INFO) << "Retrying with next proxy setting";
+      MessageLoop::current()->PostTask(
+          FROM_HERE,
+          base::Bind(&LibcurlHttpFetcher::RetryTimeoutCallback,
+                     base::Unretained(this)));
     } else {
-      LOG(ERROR) << "Unable to get http response code.";
+      // Out of proxies. Give up.
+      LOG(INFO) << "No further proxies, indicating transfer complete";
+      if (delegate_)
+        delegate_->TransferComplete(this, false);  // signal fail
     }
+  } else if ((transfer_size_ >= 0) && (bytes_downloaded_ < transfer_size_)) {
+    if (!ignore_failure_)
+      retry_count_++;
+    LOG(INFO) << "Transfer interrupted after downloading "
+              << bytes_downloaded_ << " of " << transfer_size_ << " bytes. "
+              << transfer_size_ - bytes_downloaded_ << " bytes remaining "
+              << "after " << retry_count_ << " attempt(s)";
 
-    // we're done!
-    CleanUp();
-
-    // TODO(petkov): This temporary code tries to deal with the case where the
-    // update engine performs an update check while the network is not ready
-    // (e.g., right after resume). Longer term, we should check if the network
-    // is online/offline and return an appropriate error code.
-    if (!sent_byte_ &&
-        http_response_code_ == 0 &&
-        no_network_retry_count_ < no_network_max_retries_) {
-      no_network_retry_count_++;
+    if (retry_count_ > max_retry_count_) {
+      LOG(INFO) << "Reached max attempts (" << retry_count_ << ")";
+      if (delegate_)
+        delegate_->TransferComplete(this, false);  // signal fail
+    } else {
+      // Need to restart transfer
+      LOG(INFO) << "Restarting transfer to download the remaining bytes";
       MessageLoop::current()->PostDelayedTask(
           FROM_HERE,
           base::Bind(&LibcurlHttpFetcher::RetryTimeoutCallback,
                      base::Unretained(this)),
-          TimeDelta::FromSeconds(kNoNetworkRetrySeconds));
-      LOG(INFO) << "No HTTP response, retry " << no_network_retry_count_;
-      return;
-    }
-
-    if ((!sent_byte_ && !IsHttpResponseSuccess()) || IsHttpResponseError()) {
-      // The transfer completed w/ error and we didn't get any bytes.
-      // If we have another proxy to try, try that.
-      //
-      // TODO(garnold) in fact there are two separate cases here: one case is an
-      // other-than-success return code (including no return code) and no
-      // received bytes, which is necessary due to the way callbacks are
-      // currently processing error conditions;  the second is an explicit HTTP
-      // error code, where some data may have been received (as in the case of a
-      // semi-successful multi-chunk fetch).  This is a confusing behavior and
-      // should be unified into a complete, coherent interface.
-      LOG(INFO) << "Transfer resulted in an error (" << http_response_code_
-                << "), " << bytes_downloaded_ << " bytes downloaded";
-
-      PopProxy();  // Delete the proxy we just gave up on.
-
-      if (HasProxy()) {
-        // We have another proxy. Retry immediately.
-        LOG(INFO) << "Retrying with next proxy setting";
-        MessageLoop::current()->PostTask(
-            FROM_HERE,
-            base::Bind(&LibcurlHttpFetcher::RetryTimeoutCallback,
-                       base::Unretained(this)));
-      } else {
-        // Out of proxies. Give up.
-        LOG(INFO) << "No further proxies, indicating transfer complete";
-        if (delegate_)
-          delegate_->TransferComplete(this, false);  // signal fail
-      }
-    } else if ((transfer_size_ >= 0) && (bytes_downloaded_ < transfer_size_)) {
-      retry_count_++;
-      LOG(INFO) << "Transfer interrupted after downloading "
-                << bytes_downloaded_ << " of " << transfer_size_ << " bytes. "
-                << transfer_size_ - bytes_downloaded_ << " bytes remaining "
-                << "after " << retry_count_ << " attempt(s)";
-
-      if (retry_count_ > max_retry_count_) {
-        LOG(INFO) << "Reached max attempts (" << retry_count_ << ")";
-        if (delegate_)
-          delegate_->TransferComplete(this, false);  // signal fail
-      } else {
-        // Need to restart transfer
-        LOG(INFO) << "Restarting transfer to download the remaining bytes";
-        MessageLoop::current()->PostDelayedTask(
-            FROM_HERE,
-            base::Bind(&LibcurlHttpFetcher::RetryTimeoutCallback,
-                       base::Unretained(this)),
-            TimeDelta::FromSeconds(retry_seconds_));
-      }
-    } else {
-      LOG(INFO) << "Transfer completed (" << http_response_code_
-                << "), " << bytes_downloaded_ << " bytes downloaded";
-      if (delegate_) {
-        bool success = IsHttpResponseSuccess();
-        delegate_->TransferComplete(this, success);
-      }
+          TimeDelta::FromSeconds(retry_seconds_));
     }
   } else {
-    // set up callback
-    SetupMessageLoopSources();
+    LOG(INFO) << "Transfer completed (" << http_response_code_
+              << "), " << bytes_downloaded_ << " bytes downloaded";
+    if (delegate_) {
+      bool success = IsHttpResponseSuccess();
+      delegate_->TransferComplete(this, success);
+    }
   }
+  ignore_failure_ = false;
 }
 
 size_t LibcurlHttpFetcher::LibcurlWrite(void *ptr, size_t size, size_t nmemb) {
@@ -449,15 +470,43 @@
 }
 
 void LibcurlHttpFetcher::Pause() {
+  if (transfer_paused_) {
+    LOG(ERROR) << "Fetcher already paused.";
+    return;
+  }
+  transfer_paused_ = true;
+  if (!transfer_in_progress_) {
+    // If pause before we started a connection, we don't need to notify curl
+    // about that, we will simply not start the connection later.
+    return;
+  }
   CHECK(curl_handle_);
-  CHECK(transfer_in_progress_);
   CHECK_EQ(curl_easy_pause(curl_handle_, CURLPAUSE_ALL), CURLE_OK);
 }
 
 void LibcurlHttpFetcher::Unpause() {
+  if (!transfer_paused_) {
+    LOG(ERROR) << "Resume attempted when fetcher not paused.";
+    return;
+  }
+  transfer_paused_ = false;
+  if (restart_transfer_on_unpause_) {
+    restart_transfer_on_unpause_ = false;
+    ResumeTransfer(url_);
+    CurlPerformOnce();
+    return;
+  }
+  if (!transfer_in_progress_) {
+    // If resumed before starting the connection, there's no need to notify
+    // anybody. We will simply start the connection once it is time.
+    return;
+  }
   CHECK(curl_handle_);
-  CHECK(transfer_in_progress_);
   CHECK_EQ(curl_easy_pause(curl_handle_, CURLPAUSE_CONT), CURLE_OK);
+  // Since the transfer is in progress, we need to dispatch a CurlPerformOnce()
+  // now to let the connection continue, otherwise it would be called by the
+  // TimeoutCallback but with a delay.
+  CurlPerformOnce();
 }
 
 // This method sets up callbacks with the MessageLoop.
@@ -537,7 +586,7 @@
 
   // Set up a timeout callback for libcurl.
   if (timeout_id_ == MessageLoop::kTaskIdNull) {
-    LOG(INFO) << "Setting up timeout source: " << idle_seconds_ << " seconds.";
+    VLOG(1) << "Setting up timeout source: " << idle_seconds_ << " seconds.";
     timeout_id_ = MessageLoop::current()->PostDelayedTask(
         FROM_HERE,
         base::Bind(&LibcurlHttpFetcher::TimeoutCallback,
@@ -547,6 +596,10 @@
 }
 
 void LibcurlHttpFetcher::RetryTimeoutCallback() {
+  if (transfer_paused_) {
+    restart_transfer_on_unpause_ = true;
+    return;
+  }
   ResumeTransfer(url_);
   CurlPerformOnce();
 }
@@ -599,6 +652,8 @@
     curl_multi_handle_ = nullptr;
   }
   transfer_in_progress_ = false;
+  transfer_paused_ = false;
+  restart_transfer_on_unpause_ = false;
 }
 
 void LibcurlHttpFetcher::GetHttpResponseCode() {
diff --git a/common/libcurl_http_fetcher.h b/common/libcurl_http_fetcher.h
index 66dbb18..218e6cb 100644
--- a/common/libcurl_http_fetcher.h
+++ b/common/libcurl_http_fetcher.h
@@ -132,9 +132,9 @@
   // Calls into curl_multi_perform to let libcurl do its work. Returns after
   // curl_multi_perform is finished, which may actually be after more than
   // one call to curl_multi_perform. This method will set up the message
-  // loop with sources for future work that libcurl will do.
+  // loop with sources for future work that libcurl will do, if any, or complete
+  // the transfer and finish the action if no work left to do.
   // This method will not block.
-  // Returns true if we should resume immediately after this call.
   void CurlPerformOnce();
 
   // Sets up message loop sources as needed by libcurl. This is generally
@@ -192,6 +192,16 @@
   brillo::MessageLoop::TaskId timeout_id_{brillo::MessageLoop::kTaskIdNull};
 
   bool transfer_in_progress_{false};
+  bool transfer_paused_{false};
+
+  // Whether it should ignore transfer failures for the purpose of retrying the
+  // connection.
+  bool ignore_failure_{false};
+
+  // Whether we should restart the transfer once Unpause() is called. This can
+  // be caused because either the connection dropped while pause or the proxy
+  // was resolved and we never started the transfer in the first place.
+  bool restart_transfer_on_unpause_{false};
 
   // The transfer size. -1 if not known.
   off_t transfer_size_{0};