Python unit test for adb.
Change-Id: I1b2b8004f47b708e6a1d6a4ad20816e7beb21a0f
diff --git a/adb/tests/test_adb.py b/adb/tests/test_adb.py
new file mode 100644
index 0000000..b0ae07f
--- /dev/null
+++ b/adb/tests/test_adb.py
@@ -0,0 +1,390 @@
+#!/usr/bin/env python2
+"""Simple conformance test for adb.
+
+This script will use the available adb in path and run simple
+tests that attempt to touch all accessible attached devices.
+"""
+import hashlib
+import os
+import random
+import re
+import subprocess
+import tempfile
+import unittest
+import sys
+import shlex
+
+
+def trace(cmd):
+ """Print debug message if tracing enabled."""
+ if False:
+ print >> sys.stderr, cmd
+
+
+def call(cmd_str):
+ """Run process and return output tuple (stdout, stderr, ret code)."""
+ trace(cmd_str)
+ process = subprocess.Popen(shlex.split(cmd_str),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ stdout, stderr = process.communicate()
+ return stdout, stderr, process.returncode
+
+
+def call_combined(cmd_str):
+ """Run process and return output tuple (stdout+stderr, ret code)."""
+ trace(cmd_str)
+ process = subprocess.Popen(shlex.split(cmd_str),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ stdout, _ = process.communicate()
+ return stdout, process.returncode
+
+
+def call_checked(cmd_str):
+ """Run process and get stdout+stderr, raise an exception on trouble."""
+ trace(cmd_str)
+ return subprocess.check_output(shlex.split(cmd_str),
+ stderr=subprocess.STDOUT)
+
+
+def call_checked_list(cmd_str):
+ return call_checked(cmd_str).split('\n')
+
+
+def call_checked_list_skip(cmd_str):
+ out_list = call_checked_list(cmd_str)
+
+ def is_init_line(line):
+ if (len(line) >= 3) and (line[0] == "*") and (line[-2] == "*"):
+ return True
+ else:
+ return False
+
+ return [line for line in out_list if not is_init_line(line)]
+
+
+def get_device_list():
+ output = call_checked_list_skip("adb devices")
+ dev_list = []
+ for line in output[1:]:
+ if line.strip() == "":
+ continue
+ device, _ = line.split()
+ dev_list.append(device)
+ return dev_list
+
+
+def get_attached_device_count():
+ return len(get_device_list())
+
+
+def compute_md5(string):
+ hsh = hashlib.md5()
+ hsh.update(string)
+ return hsh.hexdigest()
+
+
+class HostFile(object):
+ def __init__(self, handle, md5):
+ self.handle = handle
+ self.md5 = md5
+ self.full_path = handle.name
+ self.base_name = os.path.basename(self.full_path)
+
+
+class DeviceFile(object):
+ def __init__(self, md5, full_path):
+ self.md5 = md5
+ self.full_path = full_path
+ self.base_name = os.path.basename(self.full_path)
+
+
+def make_random_host_files(in_dir, num_files, rand_size=True):
+ files = {}
+ min_size = 1 * (1 << 10)
+ max_size = 16 * (1 << 10)
+ fixed_size = min_size
+
+ for _ in range(num_files):
+ file_handle = tempfile.NamedTemporaryFile(dir=in_dir)
+
+ if rand_size:
+ size = random.randrange(min_size, max_size, 1024)
+ else:
+ size = fixed_size
+ rand_str = os.urandom(size)
+ file_handle.write(rand_str)
+ file_handle.flush()
+
+ md5 = compute_md5(rand_str)
+ files[file_handle.name] = HostFile(file_handle, md5)
+ return files
+
+
+def make_random_device_files(adb, in_dir, num_files, rand_size=True):
+ files = {}
+ min_size = 1 * (1 << 10)
+ max_size = 16 * (1 << 10)
+ fixed_size = min_size
+
+ for i in range(num_files):
+ if rand_size:
+ size = random.randrange(min_size, max_size, 1024)
+ else:
+ size = fixed_size
+
+ base_name = "device_tmpfile" + str(i)
+ full_path = in_dir + "/" + base_name
+
+ adb.shell("dd if=/dev/urandom of={} bs={} count=1".format(full_path,
+ size))
+ dev_md5, _ = adb.shell("md5sum {}".format(full_path)).split()
+
+ files[full_path] = DeviceFile(dev_md5, full_path)
+ return files
+
+
+class AdbWrapper(object):
+ """Convenience wrapper object for the adb command."""
+ def __init__(self, device=None, out_dir=None):
+ self.device = device
+ self.out_dir = out_dir
+ self.adb_cmd = "adb "
+ if self.device:
+ self.adb_cmd += "-s {} ".format(device)
+ if self.out_dir:
+ self.adb_cmd += "-p {} ".format(out_dir)
+
+ def shell(self, cmd):
+ return call_checked(self.adb_cmd + "shell " + cmd)
+
+ def shell_nocheck(self, cmd):
+ return call_combined(self.adb_cmd + "shell " + cmd)
+
+ def push(self, local, remote):
+ return call_checked(self.adb_cmd + "push {} {}".format(local, remote))
+
+ def pull(self, remote, local):
+ return call_checked(self.adb_cmd + "pull {} {}".format(remote, local))
+
+ def sync(self, directory=""):
+ return call_checked(self.adb_cmd + "sync {}".format(directory))
+
+ def forward(self, local, remote):
+ return call_checked(self.adb_cmd + "forward {} {}".format(local,
+ remote))
+
+ def tcpip(self, port):
+ return call_checked(self.adb_cmd + "tcpip {}".format(port))
+
+ def usb(self):
+ return call_checked(self.adb_cmd + "usb")
+
+ def forward_remove(self, local):
+ return call_checked(self.adb_cmd + "forward --remove {}".format(local))
+
+ def forward_remove_all(self):
+ return call_checked(self.adb_cmd + "forward --remove-all")
+
+ def connect(self, host):
+ return call_checked(self.adb_cmd + "connect {}".format(host))
+
+ def disconnect(self, host):
+ return call_checked(self.adb_cmd + "disconnect {}".format(host))
+
+ def reverse(self, remote, local):
+ return call_checked(self.adb_cmd + "reverse {} {}".format(remote,
+ local))
+
+ def reverse_remove_all(self):
+ return call_checked(self.adb_cmd + "reverse --remove-all")
+
+ def reverse_remove(self, remote):
+ return call_checked(
+ self.adb_cmd + "reverse --remove {}".format(remote))
+
+ def wait(self):
+ return call_checked(self.adb_cmd + "wait-for-device")
+
+
+class AdbBasic(unittest.TestCase):
+ def test_devices(self):
+ """Get uptime for each device plugged in from /proc/uptime."""
+ dev_list = get_device_list()
+ for device in dev_list:
+ out = call_checked(
+ "adb -s {} shell cat /proc/uptime".format(device))
+ self.assertEqual(len(out.split()), 2)
+ self.assertGreater(float(out.split()[0]), 0.0)
+ self.assertGreater(float(out.split()[1]), 0.0)
+
+ def test_help(self):
+ """Make sure we get _something_ out of help."""
+ out = call_checked("adb help")
+ self.assertTrue(len(out) > 0)
+
+ def test_version(self):
+ """Get a version number out of the output of adb."""
+ out = call_checked("adb version").split()
+ version_num = False
+ for item in out:
+ if re.match(r"[\d+\.]*\d", item):
+ version_num = True
+ self.assertTrue(version_num)
+
+
+class AdbFile(unittest.TestCase):
+ SCRATCH_DIR = "/data/local/tmp"
+ DEVICE_TEMP_FILE = SCRATCH_DIR + "/adb_test_file"
+ DEVICE_TEMP_DIR = SCRATCH_DIR + "/adb_test_dir"
+
+ def test_push(self):
+ """Push a file to all attached devices."""
+ dev_list = get_device_list()
+ for device in dev_list:
+ self.push_with_device(device)
+
+ def push_with_device(self, device):
+ """Push a randomly generated file to specified device."""
+ kbytes = 512
+ adb = AdbWrapper(device)
+ with tempfile.NamedTemporaryFile(mode="w") as tmp:
+ rand_str = os.urandom(1024 * kbytes)
+ tmp.write(rand_str)
+ tmp.flush()
+
+ host_md5 = compute_md5(rand_str)
+ adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_FILE))
+ try:
+ adb.push(local=tmp.name, remote=AdbFile.DEVICE_TEMP_FILE)
+ dev_md5, _ = adb.shell(
+ "md5sum {}".format(AdbFile.DEVICE_TEMP_FILE)).split()
+ self.assertEqual(host_md5, dev_md5)
+ finally:
+ adb.shell_nocheck("rm {}".format(AdbFile.DEVICE_TEMP_FILE))
+
+ # TODO: write push directory test.
+
+ def test_pull(self):
+ """Pull a file from all attached devices."""
+ dev_list = get_device_list()
+ for device in dev_list:
+ self.pull_with_device(device)
+
+ def pull_with_device(self, device):
+ """Pull a randomly generated file from specified device."""
+ kbytes = 512
+ adb = AdbWrapper(device)
+ adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_FILE))
+ try:
+ adb.shell("dd if=/dev/urandom of={} bs=1024 count={}".format(
+ AdbFile.DEVICE_TEMP_FILE, kbytes))
+ dev_md5, _ = adb.shell(
+ "md5sum {}".format(AdbFile.DEVICE_TEMP_FILE)).split()
+
+ with tempfile.NamedTemporaryFile(mode="w") as tmp_write:
+ adb.pull(remote=AdbFile.DEVICE_TEMP_FILE, local=tmp_write.name)
+ with open(tmp_write.name) as tmp_read:
+ host_contents = tmp_read.read()
+ host_md5 = compute_md5(host_contents)
+ self.assertEqual(dev_md5, host_md5)
+ finally:
+ adb.shell_nocheck("rm {}".format(AdbFile.DEVICE_TEMP_FILE))
+
+ def test_pull_dir(self):
+ """Pull a directory from all attached devices."""
+ dev_list = get_device_list()
+ for device in dev_list:
+ self.pull_dir_with_device(device)
+
+ def pull_dir_with_device(self, device):
+ """Pull a randomly generated directory of files from the device."""
+ adb = AdbWrapper(device)
+ temp_files = {}
+ host_dir = None
+ try:
+ # create temporary host directory
+ host_dir = tempfile.mkdtemp()
+
+ # create temporary dir on device
+ adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))
+ adb.shell("mkdir -p {}".format(AdbFile.DEVICE_TEMP_DIR))
+
+ # populate device dir with random files
+ temp_files = make_random_device_files(
+ adb, in_dir=AdbFile.DEVICE_TEMP_DIR, num_files=32)
+
+ adb.pull(remote=AdbFile.DEVICE_TEMP_DIR, local=host_dir)
+
+ for device_full_path in temp_files:
+ host_path = os.path.join(
+ host_dir, temp_files[device_full_path].base_name)
+ with open(host_path) as host_file:
+ host_md5 = compute_md5(host_file.read())
+ self.assertEqual(host_md5,
+ temp_files[device_full_path].md5)
+ finally:
+ for dev_file in temp_files.values():
+ host_path = os.path.join(host_dir, dev_file.base_name)
+ os.remove(host_path)
+ adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))
+ if host_dir:
+ os.removedirs(host_dir)
+
+ def test_sync(self):
+ """Sync a directory with all attached devices."""
+ dev_list = get_device_list()
+ for device in dev_list:
+ self.sync_dir_with_device(device)
+
+ def sync_dir_with_device(self, device):
+ """Sync a randomly generated directory of files to specified device."""
+ try:
+ adb = AdbWrapper(device)
+ temp_files = {}
+
+ # create temporary host directory
+ base_dir = tempfile.mkdtemp()
+
+ # create mirror device directory hierarchy within base_dir
+ full_dir_path = base_dir + AdbFile.DEVICE_TEMP_DIR
+ os.makedirs(full_dir_path)
+
+ # create 32 random files within the host mirror
+ temp_files = make_random_host_files(in_dir=full_dir_path,
+ num_files=32)
+
+ # clean up any trash on the device
+ adb = AdbWrapper(device, out_dir=base_dir)
+ adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))
+
+ # issue the sync
+ adb.sync("data")
+
+ # confirm that every file on the device mirrors that on the host
+ for host_full_path in temp_files.keys():
+ device_full_path = os.path.join(
+ AdbFile.DEVICE_TEMP_DIR,
+ temp_files[host_full_path].base_name)
+ dev_md5, _ = adb.shell(
+ "md5sum {}".format(device_full_path)).split()
+ self.assertEqual(temp_files[host_full_path].md5, dev_md5)
+
+ finally:
+ adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))
+ if temp_files:
+ for tf in temp_files.values():
+ tf.handle.close()
+ if base_dir:
+ os.removedirs(base_dir + AdbFile.DEVICE_TEMP_DIR)
+
+
+if __name__ == '__main__':
+ random.seed(0)
+ dev_count = get_attached_device_count()
+ if dev_count:
+ suite = unittest.TestLoader().loadTestsFromName(__name__)
+ unittest.TextTestRunner(verbosity=3).run(suite)
+ else:
+ print "Test suite must be run with attached devices"