Move python-adb to development/python-packages.
We want to be able to use this in the NDK without having to pull in
all of system core.
Also, this clarifies the separation of adb and its python interface.
Bug: http://b/22881740
Change-Id: I0b437d9bf621e371d4698d7f8e8828072c7ff347
diff --git a/adb/__init__.py b/adb/__init__.py
deleted file mode 100644
index 6b509c6..0000000
--- a/adb/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Copyright (C) 2015 The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from __future__ import absolute_import
-from .device import * # pylint: disable=wildcard-import
diff --git a/adb/device.py b/adb/device.py
deleted file mode 100644
index 516e880..0000000
--- a/adb/device.py
+++ /dev/null
@@ -1,339 +0,0 @@
-#
-# Copyright (C) 2015 The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import logging
-import os
-import re
-import subprocess
-import tempfile
-
-
-class FindDeviceError(RuntimeError):
- pass
-
-
-class DeviceNotFoundError(FindDeviceError):
- def __init__(self, serial):
- self.serial = serial
- super(DeviceNotFoundError, self).__init__(
- 'No device with serial {}'.format(serial))
-
-
-class NoUniqueDeviceError(FindDeviceError):
- def __init__(self):
- super(NoUniqueDeviceError, self).__init__('No unique device')
-
-
-class ShellError(RuntimeError):
- def __init__(self, cmd, stdout, stderr, exit_code):
- super(ShellError, self).__init__(
- '`{0}` exited with code {1}'.format(cmd, exit_code))
- self.cmd = cmd
- self.stdout = stdout
- self.stderr = stderr
- self.exit_code = exit_code
-
-
-def get_devices():
- with open(os.devnull, 'wb') as devnull:
- subprocess.check_call(['adb', 'start-server'], stdout=devnull,
- stderr=devnull)
- out = subprocess.check_output(['adb', 'devices']).splitlines()
-
- # The first line of `adb devices` just says "List of attached devices", so
- # skip that.
- devices = []
- for line in out[1:]:
- if not line.strip():
- continue
- if 'offline' in line:
- continue
-
- serial, _ = re.split(r'\s+', line, maxsplit=1)
- devices.append(serial)
- return devices
-
-
-def _get_unique_device(product=None):
- devices = get_devices()
- if len(devices) != 1:
- raise NoUniqueDeviceError()
- return AndroidDevice(devices[0], product)
-
-
-def _get_device_by_serial(serial, product=None):
- for device in get_devices():
- if device == serial:
- return AndroidDevice(serial, product)
- raise DeviceNotFoundError(serial)
-
-
-def get_device(serial=None, product=None):
- """Get a uniquely identified AndroidDevice if one is available.
-
- Raises:
- DeviceNotFoundError:
- The serial specified by `serial` or $ANDROID_SERIAL is not
- connected.
-
- NoUniqueDeviceError:
- Neither `serial` nor $ANDROID_SERIAL was set, and the number of
- devices connected to the system is not 1. Having 0 connected
- devices will also result in this error.
-
- Returns:
- An AndroidDevice associated with the first non-None identifier in the
- following order of preference:
-
- 1) The `serial` argument.
- 2) The environment variable $ANDROID_SERIAL.
- 3) The single device connnected to the system.
- """
- if serial is not None:
- return _get_device_by_serial(serial, product)
-
- android_serial = os.getenv('ANDROID_SERIAL')
- if android_serial is not None:
- return _get_device_by_serial(android_serial, product)
-
- return _get_unique_device(product)
-
-# Call this instead of subprocess.check_output() to work-around issue in Python
-# 2's subprocess class on Windows where it doesn't support Unicode. This
-# writes the command line to a UTF-8 batch file that is properly interpreted
-# by cmd.exe.
-def _subprocess_check_output(*popenargs, **kwargs):
- # Only do this slow work-around if Unicode is in the cmd line.
- if (os.name == 'nt' and
- any(isinstance(arg, unicode) for arg in popenargs[0])):
- # cmd.exe requires a suffix to know that it is running a batch file
- tf = tempfile.NamedTemporaryFile('wb', suffix='.cmd', delete=False)
- # @ in batch suppresses echo of the current line.
- # Change the codepage to 65001, the UTF-8 codepage.
- tf.write('@chcp 65001 > nul\r\n')
- tf.write('@')
- # Properly quote all the arguments and encode in UTF-8.
- tf.write(subprocess.list2cmdline(popenargs[0]).encode('utf-8'))
- tf.close()
-
- try:
- result = subprocess.check_output(['cmd.exe', '/c', tf.name],
- **kwargs)
- except subprocess.CalledProcessError as e:
- # Show real command line instead of the cmd.exe command line.
- raise subprocess.CalledProcessError(e.returncode, popenargs[0],
- output=e.output)
- finally:
- os.remove(tf.name)
- return result
- else:
- return subprocess.check_output(*popenargs, **kwargs)
-
-class AndroidDevice(object):
- # Delimiter string to indicate the start of the exit code.
- _RETURN_CODE_DELIMITER = 'x'
-
- # Follow any shell command with this string to get the exit
- # status of a program since this isn't propagated by adb.
- #
- # The delimiter is needed because `printf 1; echo $?` would print
- # "10", and we wouldn't be able to distinguish the exit code.
- _RETURN_CODE_PROBE_STRING = 'echo "{0}$?"'.format(_RETURN_CODE_DELIMITER)
-
- # Maximum search distance from the output end to find the delimiter.
- # adb on Windows returns \r\n even if adbd returns \n.
- _RETURN_CODE_SEARCH_LENGTH = len('{0}255\r\n'.format(_RETURN_CODE_DELIMITER))
-
- # Shell protocol feature string.
- SHELL_PROTOCOL_FEATURE = 'shell_2'
-
- def __init__(self, serial, product=None):
- self.serial = serial
- self.product = product
- self.adb_cmd = ['adb']
- if self.serial is not None:
- self.adb_cmd.extend(['-s', serial])
- if self.product is not None:
- self.adb_cmd.extend(['-p', product])
- self._linesep = None
- self._features = None
-
- @property
- def linesep(self):
- if self._linesep is None:
- self._linesep = subprocess.check_output(self.adb_cmd +
- ['shell', 'echo'])
- return self._linesep
-
- @property
- def features(self):
- if self._features is None:
- try:
- self._features = self._simple_call(['features']).splitlines()
- except subprocess.CalledProcessError:
- self._features = []
- return self._features
-
- def _make_shell_cmd(self, user_cmd):
- command = self.adb_cmd + ['shell'] + user_cmd
- if self.SHELL_PROTOCOL_FEATURE not in self.features:
- command.append('; ' + self._RETURN_CODE_PROBE_STRING)
- return command
-
- def _parse_shell_output(self, out):
- """Finds the exit code string from shell output.
-
- Args:
- out: Shell output string.
-
- Returns:
- An (exit_code, output_string) tuple. The output string is
- cleaned of any additional stuff we appended to find the
- exit code.
-
- Raises:
- RuntimeError: Could not find the exit code in |out|.
- """
- search_text = out
- if len(search_text) > self._RETURN_CODE_SEARCH_LENGTH:
- # We don't want to search over massive amounts of data when we know
- # the part we want is right at the end.
- search_text = search_text[-self._RETURN_CODE_SEARCH_LENGTH:]
- partition = search_text.rpartition(self._RETURN_CODE_DELIMITER)
- if partition[1] == '':
- raise RuntimeError('Could not find exit status in shell output.')
- result = int(partition[2])
- # partition[0] won't contain the full text if search_text was truncated,
- # pull from the original string instead.
- out = out[:-len(partition[1]) - len(partition[2])]
- return result, out
-
- def _simple_call(self, cmd):
- logging.info(' '.join(self.adb_cmd + cmd))
- return _subprocess_check_output(
- self.adb_cmd + cmd, stderr=subprocess.STDOUT)
-
- def shell(self, cmd):
- """Calls `adb shell`
-
- Args:
- cmd: string shell command to execute.
-
- Returns:
- A (stdout, stderr) tuple. Stderr may be combined into stdout
- if the device doesn't support separate streams.
-
- Raises:
- ShellError: the exit code was non-zero.
- """
- exit_code, stdout, stderr = self.shell_nocheck(cmd)
- if exit_code != 0:
- raise ShellError(cmd, stdout, stderr, exit_code)
- return stdout, stderr
-
- def shell_nocheck(self, cmd):
- """Calls `adb shell`
-
- Args:
- cmd: string shell command to execute.
-
- Returns:
- An (exit_code, stdout, stderr) tuple. Stderr may be combined
- into stdout if the device doesn't support separate streams.
- """
- cmd = self._make_shell_cmd(cmd)
- logging.info(' '.join(cmd))
- p = subprocess.Popen(
- cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- if self.SHELL_PROTOCOL_FEATURE in self.features:
- exit_code = p.returncode
- else:
- exit_code, stdout = self._parse_shell_output(stdout)
- return exit_code, stdout, stderr
-
- def install(self, filename, replace=False):
- cmd = ['install']
- if replace:
- cmd.append('-r')
- cmd.append(filename)
- return self._simple_call(cmd)
-
- def push(self, local, remote):
- return self._simple_call(['push', local, remote])
-
- def pull(self, remote, local):
- return self._simple_call(['pull', remote, local])
-
- def sync(self, directory=None):
- cmd = ['sync']
- if directory is not None:
- cmd.append(directory)
- return self._simple_call(cmd)
-
- def forward(self, local, remote):
- return self._simple_call(['forward', local, remote])
-
- def tcpip(self, port):
- return self._simple_call(['tcpip', port])
-
- def usb(self):
- return self._simple_call(['usb'])
-
- def reboot(self):
- return self._simple_call(['reboot'])
-
- def root(self):
- return self._simple_call(['root'])
-
- def unroot(self):
- return self._simple_call(['unroot'])
-
- def forward_remove(self, local):
- return self._simple_call(['forward', '--remove', local])
-
- def forward_remove_all(self):
- return self._simple_call(['forward', '--remove-all'])
-
- def connect(self, host):
- return self._simple_call(['connect', host])
-
- def disconnect(self, host):
- return self._simple_call(['disconnect', host])
-
- def reverse(self, remote, local):
- return self._simple_call(['reverse', remote, local])
-
- def reverse_remove_all(self):
- return self._simple_call(['reverse', '--remove-all'])
-
- def reverse_remove(self, remote):
- return self._simple_call(['reverse', '--remove', remote])
-
- def wait(self):
- return self._simple_call(['wait-for-device'])
-
- def get_prop(self, prop_name):
- output = self.shell(['getprop', prop_name])[0].splitlines()
- if len(output) != 1:
- raise RuntimeError('Too many lines in getprop output:\n' +
- '\n'.join(output))
- value = output[0]
- if not value.strip():
- return None
- return value
-
- def set_prop(self, prop_name, value):
- self.shell(['setprop', prop_name, value])
diff --git a/adb/test_device.py b/adb/test_device.py
deleted file mode 100644
index d033a01..0000000
--- a/adb/test_device.py
+++ /dev/null
@@ -1,537 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2015 The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from __future__ import print_function
-
-import hashlib
-import os
-import posixpath
-import random
-import shlex
-import shutil
-import signal
-import subprocess
-import tempfile
-import unittest
-
-import mock
-
-import adb
-
-
-def requires_root(func):
- def wrapper(self, *args):
- if self.device.get_prop('ro.debuggable') != '1':
- raise unittest.SkipTest('requires rootable build')
-
- was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
- if not was_root:
- self.device.root()
- self.device.wait()
-
- try:
- func(self, *args)
- finally:
- if not was_root:
- self.device.unroot()
- self.device.wait()
-
- return wrapper
-
-
-class GetDeviceTest(unittest.TestCase):
- def setUp(self):
- self.android_serial = os.getenv('ANDROID_SERIAL')
- if 'ANDROID_SERIAL' in os.environ:
- del os.environ['ANDROID_SERIAL']
-
- def tearDown(self):
- if self.android_serial is not None:
- os.environ['ANDROID_SERIAL'] = self.android_serial
- else:
- if 'ANDROID_SERIAL' in os.environ:
- del os.environ['ANDROID_SERIAL']
-
- @mock.patch('adb.device.get_devices')
- def test_explicit(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- device = adb.get_device('foo')
- self.assertEqual(device.serial, 'foo')
-
- @mock.patch('adb.device.get_devices')
- def test_from_env(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- os.environ['ANDROID_SERIAL'] = 'foo'
- device = adb.get_device()
- self.assertEqual(device.serial, 'foo')
-
- @mock.patch('adb.device.get_devices')
- def test_arg_beats_env(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- os.environ['ANDROID_SERIAL'] = 'bar'
- device = adb.get_device('foo')
- self.assertEqual(device.serial, 'foo')
-
- @mock.patch('adb.device.get_devices')
- def test_no_such_device(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
-
- os.environ['ANDROID_SERIAL'] = 'baz'
- self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
-
- @mock.patch('adb.device.get_devices')
- def test_unique_device(self, mock_get_devices):
- mock_get_devices.return_value = ['foo']
- device = adb.get_device()
- self.assertEqual(device.serial, 'foo')
-
- @mock.patch('adb.device.get_devices')
- def test_no_unique_device(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
-
-
-class DeviceTest(unittest.TestCase):
- def setUp(self):
- self.device = adb.get_device()
-
-
-class ShellTest(DeviceTest):
- def test_cat(self):
- """Check that we can at least cat a file."""
- out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
- elements = out.split()
- self.assertEqual(len(elements), 2)
-
- uptime, idle = elements
- self.assertGreater(float(uptime), 0.0)
- self.assertGreater(float(idle), 0.0)
-
- def test_throws_on_failure(self):
- self.assertRaises(adb.ShellError, self.device.shell, ['false'])
-
- def test_output_not_stripped(self):
- out = self.device.shell(['echo', 'foo'])[0]
- self.assertEqual(out, 'foo' + self.device.linesep)
-
- def test_shell_nocheck_failure(self):
- rc, out, _ = self.device.shell_nocheck(['false'])
- self.assertNotEqual(rc, 0)
- self.assertEqual(out, '')
-
- def test_shell_nocheck_output_not_stripped(self):
- rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
- self.assertEqual(rc, 0)
- self.assertEqual(out, 'foo' + self.device.linesep)
-
- def test_can_distinguish_tricky_results(self):
- # If result checking on ADB shell is naively implemented as
- # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
- # output from the result for a cmd of `echo -n 1`.
- rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
- self.assertEqual(rc, 0)
- self.assertEqual(out, '1')
-
- def test_line_endings(self):
- """Ensure that line ending translation is not happening in the pty.
-
- Bug: http://b/19735063
- """
- output = self.device.shell(['uname'])[0]
- self.assertEqual(output, 'Linux' + self.device.linesep)
-
- def test_pty_logic(self):
- """Verify PTY logic for shells.
-
- Interactive shells should use a PTY, non-interactive should not.
-
- Bug: http://b/21215503
- """
- proc = subprocess.Popen(
- self.device.adb_cmd + ['shell'], stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- # [ -t 0 ] is used (rather than `tty`) to provide portability. This
- # gives an exit code of 0 iff stdin is connected to a terminal.
- #
- # Closing host-side stdin doesn't currently trigger the interactive
- # shell to exit so we need to explicitly add an exit command to
- # close the session from the device side, and append \n to complete
- # the interactive command.
- result = proc.communicate('[ -t 0 ]; echo x$?; exit 0\n')[0]
- partition = result.rpartition('x')
- self.assertEqual(partition[1], 'x')
- self.assertEqual(int(partition[2]), 0)
-
- exit_code = self.device.shell_nocheck(['[ -t 0 ]'])[0]
- self.assertEqual(exit_code, 1)
-
- def test_shell_protocol(self):
- """Tests the shell protocol on the device.
-
- If the device supports shell protocol, this gives us the ability
- to separate stdout/stderr and return the exit code directly.
-
- Bug: http://b/19734861
- """
- if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
- raise unittest.SkipTest('shell protocol unsupported on this device')
- result = self.device.shell_nocheck(
- shlex.split('echo foo; echo bar >&2; exit 17'))
-
- self.assertEqual(17, result[0])
- self.assertEqual('foo' + self.device.linesep, result[1])
- self.assertEqual('bar' + self.device.linesep, result[2])
-
- def test_non_interactive_sigint(self):
- """Tests that SIGINT in a non-interactive shell kills the process.
-
- This requires the shell protocol in order to detect the broken
- pipe; raw data transfer mode will only see the break once the
- subprocess tries to read or write.
-
- Bug: http://b/23825725
- """
- if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
- raise unittest.SkipTest('shell protocol unsupported on this device')
-
- # Start a long-running process.
- sleep_proc = subprocess.Popen(
- self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- remote_pid = sleep_proc.stdout.readline().strip()
- self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
- proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
-
- # Verify that the process is running, send signal, verify it stopped.
- self.device.shell(proc_query)
- os.kill(sleep_proc.pid, signal.SIGINT)
- sleep_proc.communicate()
- self.assertEqual(1, self.device.shell_nocheck(proc_query)[0],
- 'subprocess failed to terminate')
-
-
-class ArgumentEscapingTest(DeviceTest):
- def test_shell_escaping(self):
- """Make sure that argument escaping is somewhat sane."""
-
- # http://b/19734868
- # Note that this actually matches ssh(1)'s behavior --- it's
- # converted to `sh -c echo hello; echo world` which sh interprets
- # as `sh -c echo` (with an argument to that shell of "hello"),
- # and then `echo world` back in the first shell.
- result = self.device.shell(
- shlex.split("sh -c 'echo hello; echo world'"))[0]
- result = result.splitlines()
- self.assertEqual(['', 'world'], result)
- # If you really wanted "hello" and "world", here's what you'd do:
- result = self.device.shell(
- shlex.split(r'echo hello\;echo world'))[0].splitlines()
- self.assertEqual(['hello', 'world'], result)
-
- # http://b/15479704
- result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
- self.assertEqual('t', result)
- result = self.device.shell(
- shlex.split("sh -c 'true && echo t'"))[0].strip()
- self.assertEqual('t', result)
-
- # http://b/20564385
- result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
- self.assertEqual('t', result)
- result = self.device.shell(
- shlex.split(r'echo -n 123\;uname'))[0].strip()
- self.assertEqual('123Linux', result)
-
- def test_install_argument_escaping(self):
- """Make sure that install argument escaping works."""
- # http://b/20323053
- tf = tempfile.NamedTemporaryFile('wb', suffix='-text;ls;1.apk',
- delete=False)
- tf.close()
- self.assertIn("-text;ls;1.apk", self.device.install(tf.name))
- os.remove(tf.name)
-
- # http://b/3090932
- tf = tempfile.NamedTemporaryFile('wb', suffix="-Live Hold'em.apk",
- delete=False)
- tf.close()
- self.assertIn("-Live Hold'em.apk", self.device.install(tf.name))
- os.remove(tf.name)
-
-
-class RootUnrootTest(DeviceTest):
- def _test_root(self):
- message = self.device.root()
- if 'adbd cannot run as root in production builds' in message:
- return
- self.device.wait()
- self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
-
- def _test_unroot(self):
- self.device.unroot()
- self.device.wait()
- self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
-
- def test_root_unroot(self):
- """Make sure that adb root and adb unroot work, using id(1)."""
- if self.device.get_prop('ro.debuggable') != '1':
- raise unittest.SkipTest('requires rootable build')
-
- original_user = self.device.shell(['id', '-un'])[0].strip()
- try:
- if original_user == 'root':
- self._test_unroot()
- self._test_root()
- elif original_user == 'shell':
- self._test_root()
- self._test_unroot()
- finally:
- if original_user == 'root':
- self.device.root()
- else:
- self.device.unroot()
- self.device.wait()
-
-
-class TcpIpTest(DeviceTest):
- def test_tcpip_failure_raises(self):
- """adb tcpip requires a port.
-
- Bug: http://b/22636927
- """
- self.assertRaises(
- subprocess.CalledProcessError, self.device.tcpip, '')
- self.assertRaises(
- subprocess.CalledProcessError, self.device.tcpip, 'foo')
-
-
-class SystemPropertiesTest(DeviceTest):
- def test_get_prop(self):
- self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
-
- @requires_root
- def test_set_prop(self):
- prop_name = 'foo.bar'
- self.device.shell(['setprop', prop_name, '""'])
-
- self.device.set_prop(prop_name, 'qux')
- self.assertEqual(
- self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
-
-
-def compute_md5(string):
- hsh = hashlib.md5()
- hsh.update(string)
- return hsh.hexdigest()
-
-
-def get_md5_prog(device):
- """Older platforms (pre-L) had the name md5 rather than md5sum."""
- try:
- device.shell(['md5sum', '/proc/uptime'])
- return 'md5sum'
- except subprocess.CalledProcessError:
- return 'md5'
-
-
-class HostFile(object):
- def __init__(self, handle, checksum):
- self.handle = handle
- self.checksum = checksum
- self.full_path = handle.name
- self.base_name = os.path.basename(self.full_path)
-
-
-class DeviceFile(object):
- def __init__(self, checksum, full_path):
- self.checksum = checksum
- self.full_path = full_path
- self.base_name = posixpath.basename(self.full_path)
-
-
-def make_random_host_files(in_dir, num_files):
- min_size = 1 * (1 << 10)
- max_size = 16 * (1 << 10)
-
- files = []
- for _ in xrange(num_files):
- file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
-
- size = random.randrange(min_size, max_size, 1024)
- rand_str = os.urandom(size)
- file_handle.write(rand_str)
- file_handle.flush()
- file_handle.close()
-
- md5 = compute_md5(rand_str)
- files.append(HostFile(file_handle, md5))
- return files
-
-
-def make_random_device_files(device, in_dir, num_files):
- min_size = 1 * (1 << 10)
- max_size = 16 * (1 << 10)
-
- files = []
- for file_num in xrange(num_files):
- size = random.randrange(min_size, max_size, 1024)
-
- base_name = 'device_tmpfile' + str(file_num)
- full_path = posixpath.join(in_dir, base_name)
-
- device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
- 'bs={}'.format(size), 'count=1'])
- dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
-
- files.append(DeviceFile(dev_md5, full_path))
- return files
-
-
-class FileOperationsTest(DeviceTest):
- SCRATCH_DIR = '/data/local/tmp'
- DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
- DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
-
- def _test_push(self, local_file, checksum):
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
- self.device.push(local=local_file, remote=self.DEVICE_TEMP_FILE)
- dev_md5, _ = self.device.shell([get_md5_prog(self.device),
- self.DEVICE_TEMP_FILE])[0].split()
- self.assertEqual(checksum, dev_md5)
- self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
-
- def test_push(self):
- """Push a randomly generated file to specified device."""
- kbytes = 512
- tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
- rand_str = os.urandom(1024 * kbytes)
- tmp.write(rand_str)
- tmp.close()
- self._test_push(tmp.name, compute_md5(rand_str))
- os.remove(tmp.name)
-
- # TODO: write push directory test.
-
- def _test_pull(self, remote_file, checksum):
- tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
- tmp_write.close()
- self.device.pull(remote=remote_file, local=tmp_write.name)
- with open(tmp_write.name, 'rb') as tmp_read:
- host_contents = tmp_read.read()
- host_md5 = compute_md5(host_contents)
- self.assertEqual(checksum, host_md5)
- os.remove(tmp_write.name)
-
- def test_pull(self):
- """Pull a randomly generated file from specified device."""
- kbytes = 512
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
- cmd = ['dd', 'if=/dev/urandom',
- 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
- 'count={}'.format(kbytes)]
- self.device.shell(cmd)
- dev_md5, _ = self.device.shell(
- [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
- self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
- self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
-
- def test_pull_dir(self):
- """Pull a randomly generated directory of files from the device."""
- host_dir = tempfile.mkdtemp()
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
-
- # Populate device directory with random files.
- temp_files = make_random_device_files(
- self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
-
- self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
-
- for temp_file in temp_files:
- host_path = os.path.join(host_dir, temp_file.base_name)
- with open(host_path, 'rb') as host_file:
- host_md5 = compute_md5(host_file.read())
- self.assertEqual(host_md5, temp_file.checksum)
-
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- if host_dir is not None:
- shutil.rmtree(host_dir)
-
- def test_sync(self):
- """Sync a randomly generated directory of files to specified device."""
- base_dir = tempfile.mkdtemp()
-
- # Create mirror device directory hierarchy within base_dir.
- full_dir_path = base_dir + self.DEVICE_TEMP_DIR
- os.makedirs(full_dir_path)
-
- # Create 32 random files within the host mirror.
- temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32)
-
- # Clean up any trash on the device.
- device = adb.get_device(product=base_dir)
- device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
-
- device.sync('data')
-
- # Confirm that every file on the device mirrors that on the host.
- for temp_file in temp_files:
- device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
- temp_file.base_name)
- dev_md5, _ = device.shell(
- [get_md5_prog(self.device), device_full_path])[0].split()
- self.assertEqual(temp_file.checksum, dev_md5)
-
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- if base_dir is not None:
- shutil.rmtree(base_dir)
-
- def test_unicode_paths(self):
- """Ensure that we can support non-ASCII paths, even on Windows."""
- name = u'로보카 폴리'
-
- ## push.
- tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
- tf.close()
- self.device.push(tf.name, u'/data/local/tmp/adb-test-{}'.format(name))
- os.remove(tf.name)
- self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
-
- # pull.
- cmd = ['touch', u'"/data/local/tmp/adb-test-{}"'.format(name)]
- self.device.shell(cmd)
-
- tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
- tf.close()
- self.device.pull(u'/data/local/tmp/adb-test-{}'.format(name), tf.name)
- os.remove(tf.name)
- self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
-
-
-def main():
- random.seed(0)
- if len(adb.get_devices()) > 0:
- suite = unittest.TestLoader().loadTestsFromName(__name__)
- unittest.TextTestRunner(verbosity=3).run(suite)
- else:
- print('Test suite must be run with attached devices')
-
-
-if __name__ == '__main__':
- main()