Merge "Make the memory store operations serial." am: 1f397201e3 am: dbca5369f5 am: c8697f7246
am: c4b93e74b4

Change-Id: I6ccdc801e3888a61b22272c8ce9480f45fa26df2
diff --git a/packages/NetworkStack/src/com/android/server/connectivity/ipmemorystore/IpMemoryStoreService.java b/packages/NetworkStack/src/com/android/server/connectivity/ipmemorystore/IpMemoryStoreService.java
index ad2bae89..55ab8d4 100644
--- a/packages/NetworkStack/src/com/android/server/connectivity/ipmemorystore/IpMemoryStoreService.java
+++ b/packages/NetworkStack/src/com/android/server/connectivity/ipmemorystore/IpMemoryStoreService.java
@@ -60,7 +60,6 @@
  */
 public class IpMemoryStoreService extends IIpMemoryStore.Stub {
     private static final String TAG = IpMemoryStoreService.class.getSimpleName();
-    private static final int MAX_CONCURRENT_THREADS = 4;
     private static final int DATABASE_SIZE_THRESHOLD = 10 * 1024 * 1024; //10MB
     private static final int MAX_DROP_RECORD_TIMES = 500;
     private static final int MIN_DELETE_NUM = 5;
@@ -107,23 +106,17 @@
             db = null;
         }
         mDb = db;
-        // The work-stealing thread pool executor will spawn threads as needed up to
-        // the max only when there is no free thread available. This generally behaves
-        // exactly like one would expect it intuitively :
-        // - When work arrives, it will spawn a new thread iff there are no available threads
-        // - When there is no work to do it will shutdown threads after a while (the while
-        //   being equal to 2 seconds (not configurable) when max threads are spun up and
-        //   twice as much for every one less thread)
-        // - When all threads are busy the work is enqueued and waits for any worker
-        //   to become available.
-        // Because the stealing pool is made for very heavily parallel execution of
-        // small tasks that spawn others, it creates a queue per thread that in this
-        // case is overhead. However, the three behaviors above make it a superior
-        // choice to cached or fixedThreadPoolExecutor, neither of which can actually
-        // enqueue a task waiting for a thread to be free. This can probably be solved
-        // with judicious subclassing of ThreadPoolExecutor, but that's a lot of dangerous
-        // complexity for little benefit in this case.
-        mExecutor = Executors.newWorkStealingPool(MAX_CONCURRENT_THREADS);
+        // The single thread executor guarantees that all work is executed sequentially on the
+        // same thread, and no two tasks can be active at the same time. This is required to
+        // ensure operations from multiple clients don't interfere with each other (in particular,
+        // operations involving a transaction must not run concurrently with other operations
+        // as the other operations might be taken as part of the transaction). By default, the
+        // single thread executor runs off an unbounded queue.
+        // TODO : investigate replacing this scheme with a scheme where each thread has its own
+        // instance of the database, as it may be faster. It is likely however that IpMemoryStore
+        // operations are mostly IO-bound anyway, and additional contention is unlikely to bring
+        // benefits. Alternatively, a read-write lock might increase throughput.
+        mExecutor = Executors.newSingleThreadExecutor();
         RegularMaintenanceJobService.schedule(mContext, this);
     }
 
diff --git a/packages/NetworkStack/tests/unit/src/com/android/server/connectivity/ipmemorystore/IpMemoryStoreServiceTest.java b/packages/NetworkStack/tests/unit/src/com/android/server/connectivity/ipmemorystore/IpMemoryStoreServiceTest.java
index c1d6a05..64fe3a6 100644
--- a/packages/NetworkStack/tests/unit/src/com/android/server/connectivity/ipmemorystore/IpMemoryStoreServiceTest.java
+++ b/packages/NetworkStack/tests/unit/src/com/android/server/connectivity/ipmemorystore/IpMemoryStoreServiceTest.java
@@ -732,4 +732,25 @@
                             latch.countDown();
                         })));
     }
+
+    public void testTasksAreSerial() {
+        final long sleepTimeMs = 1000;
+        final long startTime = System.currentTimeMillis();
+        mService.retrieveNetworkAttributes("somekey", onNetworkAttributesRetrieved(
+                (status, key, attr) -> {
+                    assertTrue("Unexpected status : " + status.resultCode, status.isSuccess());
+                    try {
+                        Thread.sleep(sleepTimeMs);
+                    } catch (InterruptedException e) {
+                        fail("InterruptedException");
+                    }
+                }));
+        doLatched("Serial tasks timing out", latch ->
+                mService.retrieveNetworkAttributes("somekey", onNetworkAttributesRetrieved(
+                        (status, key, attr) -> {
+                            assertTrue("Unexpected status : " + status.resultCode,
+                                    status.isSuccess());
+                            assertTrue(System.currentTimeMillis() >= startTime + sleepTimeMs);
+                        })), DEFAULT_TIMEOUT_MS);
+    }
 }