am 82b9e941: am 1bcb4677: am 71f878f8: Merge "Move python-adb to development/python-packages."
* commit '82b9e941731bb6e2c37ac17ac2c8ff23201e117c':
Move python-adb to development/python-packages.
diff --git a/adb/__init__.py b/adb/__init__.py
deleted file mode 100644
index 6b509c6..0000000
--- a/adb/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Copyright (C) 2015 The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from __future__ import absolute_import
-from .device import * # pylint: disable=wildcard-import
diff --git a/adb/device.py b/adb/device.py
deleted file mode 100644
index 516e880..0000000
--- a/adb/device.py
+++ /dev/null
@@ -1,339 +0,0 @@
-#
-# Copyright (C) 2015 The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import logging
-import os
-import re
-import subprocess
-import tempfile
-
-
-class FindDeviceError(RuntimeError):
- pass
-
-
-class DeviceNotFoundError(FindDeviceError):
- def __init__(self, serial):
- self.serial = serial
- super(DeviceNotFoundError, self).__init__(
- 'No device with serial {}'.format(serial))
-
-
-class NoUniqueDeviceError(FindDeviceError):
- def __init__(self):
- super(NoUniqueDeviceError, self).__init__('No unique device')
-
-
-class ShellError(RuntimeError):
- def __init__(self, cmd, stdout, stderr, exit_code):
- super(ShellError, self).__init__(
- '`{0}` exited with code {1}'.format(cmd, exit_code))
- self.cmd = cmd
- self.stdout = stdout
- self.stderr = stderr
- self.exit_code = exit_code
-
-
-def get_devices():
- with open(os.devnull, 'wb') as devnull:
- subprocess.check_call(['adb', 'start-server'], stdout=devnull,
- stderr=devnull)
- out = subprocess.check_output(['adb', 'devices']).splitlines()
-
- # The first line of `adb devices` just says "List of attached devices", so
- # skip that.
- devices = []
- for line in out[1:]:
- if not line.strip():
- continue
- if 'offline' in line:
- continue
-
- serial, _ = re.split(r'\s+', line, maxsplit=1)
- devices.append(serial)
- return devices
-
-
-def _get_unique_device(product=None):
- devices = get_devices()
- if len(devices) != 1:
- raise NoUniqueDeviceError()
- return AndroidDevice(devices[0], product)
-
-
-def _get_device_by_serial(serial, product=None):
- for device in get_devices():
- if device == serial:
- return AndroidDevice(serial, product)
- raise DeviceNotFoundError(serial)
-
-
-def get_device(serial=None, product=None):
- """Get a uniquely identified AndroidDevice if one is available.
-
- Raises:
- DeviceNotFoundError:
- The serial specified by `serial` or $ANDROID_SERIAL is not
- connected.
-
- NoUniqueDeviceError:
- Neither `serial` nor $ANDROID_SERIAL was set, and the number of
- devices connected to the system is not 1. Having 0 connected
- devices will also result in this error.
-
- Returns:
- An AndroidDevice associated with the first non-None identifier in the
- following order of preference:
-
- 1) The `serial` argument.
- 2) The environment variable $ANDROID_SERIAL.
- 3) The single device connnected to the system.
- """
- if serial is not None:
- return _get_device_by_serial(serial, product)
-
- android_serial = os.getenv('ANDROID_SERIAL')
- if android_serial is not None:
- return _get_device_by_serial(android_serial, product)
-
- return _get_unique_device(product)
-
-# Call this instead of subprocess.check_output() to work-around issue in Python
-# 2's subprocess class on Windows where it doesn't support Unicode. This
-# writes the command line to a UTF-8 batch file that is properly interpreted
-# by cmd.exe.
-def _subprocess_check_output(*popenargs, **kwargs):
- # Only do this slow work-around if Unicode is in the cmd line.
- if (os.name == 'nt' and
- any(isinstance(arg, unicode) for arg in popenargs[0])):
- # cmd.exe requires a suffix to know that it is running a batch file
- tf = tempfile.NamedTemporaryFile('wb', suffix='.cmd', delete=False)
- # @ in batch suppresses echo of the current line.
- # Change the codepage to 65001, the UTF-8 codepage.
- tf.write('@chcp 65001 > nul\r\n')
- tf.write('@')
- # Properly quote all the arguments and encode in UTF-8.
- tf.write(subprocess.list2cmdline(popenargs[0]).encode('utf-8'))
- tf.close()
-
- try:
- result = subprocess.check_output(['cmd.exe', '/c', tf.name],
- **kwargs)
- except subprocess.CalledProcessError as e:
- # Show real command line instead of the cmd.exe command line.
- raise subprocess.CalledProcessError(e.returncode, popenargs[0],
- output=e.output)
- finally:
- os.remove(tf.name)
- return result
- else:
- return subprocess.check_output(*popenargs, **kwargs)
-
-class AndroidDevice(object):
- # Delimiter string to indicate the start of the exit code.
- _RETURN_CODE_DELIMITER = 'x'
-
- # Follow any shell command with this string to get the exit
- # status of a program since this isn't propagated by adb.
- #
- # The delimiter is needed because `printf 1; echo $?` would print
- # "10", and we wouldn't be able to distinguish the exit code.
- _RETURN_CODE_PROBE_STRING = 'echo "{0}$?"'.format(_RETURN_CODE_DELIMITER)
-
- # Maximum search distance from the output end to find the delimiter.
- # adb on Windows returns \r\n even if adbd returns \n.
- _RETURN_CODE_SEARCH_LENGTH = len('{0}255\r\n'.format(_RETURN_CODE_DELIMITER))
-
- # Shell protocol feature string.
- SHELL_PROTOCOL_FEATURE = 'shell_2'
-
- def __init__(self, serial, product=None):
- self.serial = serial
- self.product = product
- self.adb_cmd = ['adb']
- if self.serial is not None:
- self.adb_cmd.extend(['-s', serial])
- if self.product is not None:
- self.adb_cmd.extend(['-p', product])
- self._linesep = None
- self._features = None
-
- @property
- def linesep(self):
- if self._linesep is None:
- self._linesep = subprocess.check_output(self.adb_cmd +
- ['shell', 'echo'])
- return self._linesep
-
- @property
- def features(self):
- if self._features is None:
- try:
- self._features = self._simple_call(['features']).splitlines()
- except subprocess.CalledProcessError:
- self._features = []
- return self._features
-
- def _make_shell_cmd(self, user_cmd):
- command = self.adb_cmd + ['shell'] + user_cmd
- if self.SHELL_PROTOCOL_FEATURE not in self.features:
- command.append('; ' + self._RETURN_CODE_PROBE_STRING)
- return command
-
- def _parse_shell_output(self, out):
- """Finds the exit code string from shell output.
-
- Args:
- out: Shell output string.
-
- Returns:
- An (exit_code, output_string) tuple. The output string is
- cleaned of any additional stuff we appended to find the
- exit code.
-
- Raises:
- RuntimeError: Could not find the exit code in |out|.
- """
- search_text = out
- if len(search_text) > self._RETURN_CODE_SEARCH_LENGTH:
- # We don't want to search over massive amounts of data when we know
- # the part we want is right at the end.
- search_text = search_text[-self._RETURN_CODE_SEARCH_LENGTH:]
- partition = search_text.rpartition(self._RETURN_CODE_DELIMITER)
- if partition[1] == '':
- raise RuntimeError('Could not find exit status in shell output.')
- result = int(partition[2])
- # partition[0] won't contain the full text if search_text was truncated,
- # pull from the original string instead.
- out = out[:-len(partition[1]) - len(partition[2])]
- return result, out
-
- def _simple_call(self, cmd):
- logging.info(' '.join(self.adb_cmd + cmd))
- return _subprocess_check_output(
- self.adb_cmd + cmd, stderr=subprocess.STDOUT)
-
- def shell(self, cmd):
- """Calls `adb shell`
-
- Args:
- cmd: string shell command to execute.
-
- Returns:
- A (stdout, stderr) tuple. Stderr may be combined into stdout
- if the device doesn't support separate streams.
-
- Raises:
- ShellError: the exit code was non-zero.
- """
- exit_code, stdout, stderr = self.shell_nocheck(cmd)
- if exit_code != 0:
- raise ShellError(cmd, stdout, stderr, exit_code)
- return stdout, stderr
-
- def shell_nocheck(self, cmd):
- """Calls `adb shell`
-
- Args:
- cmd: string shell command to execute.
-
- Returns:
- An (exit_code, stdout, stderr) tuple. Stderr may be combined
- into stdout if the device doesn't support separate streams.
- """
- cmd = self._make_shell_cmd(cmd)
- logging.info(' '.join(cmd))
- p = subprocess.Popen(
- cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- if self.SHELL_PROTOCOL_FEATURE in self.features:
- exit_code = p.returncode
- else:
- exit_code, stdout = self._parse_shell_output(stdout)
- return exit_code, stdout, stderr
-
- def install(self, filename, replace=False):
- cmd = ['install']
- if replace:
- cmd.append('-r')
- cmd.append(filename)
- return self._simple_call(cmd)
-
- def push(self, local, remote):
- return self._simple_call(['push', local, remote])
-
- def pull(self, remote, local):
- return self._simple_call(['pull', remote, local])
-
- def sync(self, directory=None):
- cmd = ['sync']
- if directory is not None:
- cmd.append(directory)
- return self._simple_call(cmd)
-
- def forward(self, local, remote):
- return self._simple_call(['forward', local, remote])
-
- def tcpip(self, port):
- return self._simple_call(['tcpip', port])
-
- def usb(self):
- return self._simple_call(['usb'])
-
- def reboot(self):
- return self._simple_call(['reboot'])
-
- def root(self):
- return self._simple_call(['root'])
-
- def unroot(self):
- return self._simple_call(['unroot'])
-
- def forward_remove(self, local):
- return self._simple_call(['forward', '--remove', local])
-
- def forward_remove_all(self):
- return self._simple_call(['forward', '--remove-all'])
-
- def connect(self, host):
- return self._simple_call(['connect', host])
-
- def disconnect(self, host):
- return self._simple_call(['disconnect', host])
-
- def reverse(self, remote, local):
- return self._simple_call(['reverse', remote, local])
-
- def reverse_remove_all(self):
- return self._simple_call(['reverse', '--remove-all'])
-
- def reverse_remove(self, remote):
- return self._simple_call(['reverse', '--remove', remote])
-
- def wait(self):
- return self._simple_call(['wait-for-device'])
-
- def get_prop(self, prop_name):
- output = self.shell(['getprop', prop_name])[0].splitlines()
- if len(output) != 1:
- raise RuntimeError('Too many lines in getprop output:\n' +
- '\n'.join(output))
- value = output[0]
- if not value.strip():
- return None
- return value
-
- def set_prop(self, prop_name, value):
- self.shell(['setprop', prop_name, value])
diff --git a/adb/test_device.py b/adb/test_device.py
deleted file mode 100644
index d033a01..0000000
--- a/adb/test_device.py
+++ /dev/null
@@ -1,537 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2015 The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from __future__ import print_function
-
-import hashlib
-import os
-import posixpath
-import random
-import shlex
-import shutil
-import signal
-import subprocess
-import tempfile
-import unittest
-
-import mock
-
-import adb
-
-
-def requires_root(func):
- def wrapper(self, *args):
- if self.device.get_prop('ro.debuggable') != '1':
- raise unittest.SkipTest('requires rootable build')
-
- was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
- if not was_root:
- self.device.root()
- self.device.wait()
-
- try:
- func(self, *args)
- finally:
- if not was_root:
- self.device.unroot()
- self.device.wait()
-
- return wrapper
-
-
-class GetDeviceTest(unittest.TestCase):
- def setUp(self):
- self.android_serial = os.getenv('ANDROID_SERIAL')
- if 'ANDROID_SERIAL' in os.environ:
- del os.environ['ANDROID_SERIAL']
-
- def tearDown(self):
- if self.android_serial is not None:
- os.environ['ANDROID_SERIAL'] = self.android_serial
- else:
- if 'ANDROID_SERIAL' in os.environ:
- del os.environ['ANDROID_SERIAL']
-
- @mock.patch('adb.device.get_devices')
- def test_explicit(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- device = adb.get_device('foo')
- self.assertEqual(device.serial, 'foo')
-
- @mock.patch('adb.device.get_devices')
- def test_from_env(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- os.environ['ANDROID_SERIAL'] = 'foo'
- device = adb.get_device()
- self.assertEqual(device.serial, 'foo')
-
- @mock.patch('adb.device.get_devices')
- def test_arg_beats_env(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- os.environ['ANDROID_SERIAL'] = 'bar'
- device = adb.get_device('foo')
- self.assertEqual(device.serial, 'foo')
-
- @mock.patch('adb.device.get_devices')
- def test_no_such_device(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
-
- os.environ['ANDROID_SERIAL'] = 'baz'
- self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
-
- @mock.patch('adb.device.get_devices')
- def test_unique_device(self, mock_get_devices):
- mock_get_devices.return_value = ['foo']
- device = adb.get_device()
- self.assertEqual(device.serial, 'foo')
-
- @mock.patch('adb.device.get_devices')
- def test_no_unique_device(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
-
-
-class DeviceTest(unittest.TestCase):
- def setUp(self):
- self.device = adb.get_device()
-
-
-class ShellTest(DeviceTest):
- def test_cat(self):
- """Check that we can at least cat a file."""
- out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
- elements = out.split()
- self.assertEqual(len(elements), 2)
-
- uptime, idle = elements
- self.assertGreater(float(uptime), 0.0)
- self.assertGreater(float(idle), 0.0)
-
- def test_throws_on_failure(self):
- self.assertRaises(adb.ShellError, self.device.shell, ['false'])
-
- def test_output_not_stripped(self):
- out = self.device.shell(['echo', 'foo'])[0]
- self.assertEqual(out, 'foo' + self.device.linesep)
-
- def test_shell_nocheck_failure(self):
- rc, out, _ = self.device.shell_nocheck(['false'])
- self.assertNotEqual(rc, 0)
- self.assertEqual(out, '')
-
- def test_shell_nocheck_output_not_stripped(self):
- rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
- self.assertEqual(rc, 0)
- self.assertEqual(out, 'foo' + self.device.linesep)
-
- def test_can_distinguish_tricky_results(self):
- # If result checking on ADB shell is naively implemented as
- # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
- # output from the result for a cmd of `echo -n 1`.
- rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
- self.assertEqual(rc, 0)
- self.assertEqual(out, '1')
-
- def test_line_endings(self):
- """Ensure that line ending translation is not happening in the pty.
-
- Bug: http://b/19735063
- """
- output = self.device.shell(['uname'])[0]
- self.assertEqual(output, 'Linux' + self.device.linesep)
-
- def test_pty_logic(self):
- """Verify PTY logic for shells.
-
- Interactive shells should use a PTY, non-interactive should not.
-
- Bug: http://b/21215503
- """
- proc = subprocess.Popen(
- self.device.adb_cmd + ['shell'], stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- # [ -t 0 ] is used (rather than `tty`) to provide portability. This
- # gives an exit code of 0 iff stdin is connected to a terminal.
- #
- # Closing host-side stdin doesn't currently trigger the interactive
- # shell to exit so we need to explicitly add an exit command to
- # close the session from the device side, and append \n to complete
- # the interactive command.
- result = proc.communicate('[ -t 0 ]; echo x$?; exit 0\n')[0]
- partition = result.rpartition('x')
- self.assertEqual(partition[1], 'x')
- self.assertEqual(int(partition[2]), 0)
-
- exit_code = self.device.shell_nocheck(['[ -t 0 ]'])[0]
- self.assertEqual(exit_code, 1)
-
- def test_shell_protocol(self):
- """Tests the shell protocol on the device.
-
- If the device supports shell protocol, this gives us the ability
- to separate stdout/stderr and return the exit code directly.
-
- Bug: http://b/19734861
- """
- if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
- raise unittest.SkipTest('shell protocol unsupported on this device')
- result = self.device.shell_nocheck(
- shlex.split('echo foo; echo bar >&2; exit 17'))
-
- self.assertEqual(17, result[0])
- self.assertEqual('foo' + self.device.linesep, result[1])
- self.assertEqual('bar' + self.device.linesep, result[2])
-
- def test_non_interactive_sigint(self):
- """Tests that SIGINT in a non-interactive shell kills the process.
-
- This requires the shell protocol in order to detect the broken
- pipe; raw data transfer mode will only see the break once the
- subprocess tries to read or write.
-
- Bug: http://b/23825725
- """
- if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
- raise unittest.SkipTest('shell protocol unsupported on this device')
-
- # Start a long-running process.
- sleep_proc = subprocess.Popen(
- self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- remote_pid = sleep_proc.stdout.readline().strip()
- self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
- proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
-
- # Verify that the process is running, send signal, verify it stopped.
- self.device.shell(proc_query)
- os.kill(sleep_proc.pid, signal.SIGINT)
- sleep_proc.communicate()
- self.assertEqual(1, self.device.shell_nocheck(proc_query)[0],
- 'subprocess failed to terminate')
-
-
-class ArgumentEscapingTest(DeviceTest):
- def test_shell_escaping(self):
- """Make sure that argument escaping is somewhat sane."""
-
- # http://b/19734868
- # Note that this actually matches ssh(1)'s behavior --- it's
- # converted to `sh -c echo hello; echo world` which sh interprets
- # as `sh -c echo` (with an argument to that shell of "hello"),
- # and then `echo world` back in the first shell.
- result = self.device.shell(
- shlex.split("sh -c 'echo hello; echo world'"))[0]
- result = result.splitlines()
- self.assertEqual(['', 'world'], result)
- # If you really wanted "hello" and "world", here's what you'd do:
- result = self.device.shell(
- shlex.split(r'echo hello\;echo world'))[0].splitlines()
- self.assertEqual(['hello', 'world'], result)
-
- # http://b/15479704
- result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
- self.assertEqual('t', result)
- result = self.device.shell(
- shlex.split("sh -c 'true && echo t'"))[0].strip()
- self.assertEqual('t', result)
-
- # http://b/20564385
- result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
- self.assertEqual('t', result)
- result = self.device.shell(
- shlex.split(r'echo -n 123\;uname'))[0].strip()
- self.assertEqual('123Linux', result)
-
- def test_install_argument_escaping(self):
- """Make sure that install argument escaping works."""
- # http://b/20323053
- tf = tempfile.NamedTemporaryFile('wb', suffix='-text;ls;1.apk',
- delete=False)
- tf.close()
- self.assertIn("-text;ls;1.apk", self.device.install(tf.name))
- os.remove(tf.name)
-
- # http://b/3090932
- tf = tempfile.NamedTemporaryFile('wb', suffix="-Live Hold'em.apk",
- delete=False)
- tf.close()
- self.assertIn("-Live Hold'em.apk", self.device.install(tf.name))
- os.remove(tf.name)
-
-
-class RootUnrootTest(DeviceTest):
- def _test_root(self):
- message = self.device.root()
- if 'adbd cannot run as root in production builds' in message:
- return
- self.device.wait()
- self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
-
- def _test_unroot(self):
- self.device.unroot()
- self.device.wait()
- self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
-
- def test_root_unroot(self):
- """Make sure that adb root and adb unroot work, using id(1)."""
- if self.device.get_prop('ro.debuggable') != '1':
- raise unittest.SkipTest('requires rootable build')
-
- original_user = self.device.shell(['id', '-un'])[0].strip()
- try:
- if original_user == 'root':
- self._test_unroot()
- self._test_root()
- elif original_user == 'shell':
- self._test_root()
- self._test_unroot()
- finally:
- if original_user == 'root':
- self.device.root()
- else:
- self.device.unroot()
- self.device.wait()
-
-
-class TcpIpTest(DeviceTest):
- def test_tcpip_failure_raises(self):
- """adb tcpip requires a port.
-
- Bug: http://b/22636927
- """
- self.assertRaises(
- subprocess.CalledProcessError, self.device.tcpip, '')
- self.assertRaises(
- subprocess.CalledProcessError, self.device.tcpip, 'foo')
-
-
-class SystemPropertiesTest(DeviceTest):
- def test_get_prop(self):
- self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
-
- @requires_root
- def test_set_prop(self):
- prop_name = 'foo.bar'
- self.device.shell(['setprop', prop_name, '""'])
-
- self.device.set_prop(prop_name, 'qux')
- self.assertEqual(
- self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
-
-
-def compute_md5(string):
- hsh = hashlib.md5()
- hsh.update(string)
- return hsh.hexdigest()
-
-
-def get_md5_prog(device):
- """Older platforms (pre-L) had the name md5 rather than md5sum."""
- try:
- device.shell(['md5sum', '/proc/uptime'])
- return 'md5sum'
- except subprocess.CalledProcessError:
- return 'md5'
-
-
-class HostFile(object):
- def __init__(self, handle, checksum):
- self.handle = handle
- self.checksum = checksum
- self.full_path = handle.name
- self.base_name = os.path.basename(self.full_path)
-
-
-class DeviceFile(object):
- def __init__(self, checksum, full_path):
- self.checksum = checksum
- self.full_path = full_path
- self.base_name = posixpath.basename(self.full_path)
-
-
-def make_random_host_files(in_dir, num_files):
- min_size = 1 * (1 << 10)
- max_size = 16 * (1 << 10)
-
- files = []
- for _ in xrange(num_files):
- file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
-
- size = random.randrange(min_size, max_size, 1024)
- rand_str = os.urandom(size)
- file_handle.write(rand_str)
- file_handle.flush()
- file_handle.close()
-
- md5 = compute_md5(rand_str)
- files.append(HostFile(file_handle, md5))
- return files
-
-
-def make_random_device_files(device, in_dir, num_files):
- min_size = 1 * (1 << 10)
- max_size = 16 * (1 << 10)
-
- files = []
- for file_num in xrange(num_files):
- size = random.randrange(min_size, max_size, 1024)
-
- base_name = 'device_tmpfile' + str(file_num)
- full_path = posixpath.join(in_dir, base_name)
-
- device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
- 'bs={}'.format(size), 'count=1'])
- dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
-
- files.append(DeviceFile(dev_md5, full_path))
- return files
-
-
-class FileOperationsTest(DeviceTest):
- SCRATCH_DIR = '/data/local/tmp'
- DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
- DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
-
- def _test_push(self, local_file, checksum):
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
- self.device.push(local=local_file, remote=self.DEVICE_TEMP_FILE)
- dev_md5, _ = self.device.shell([get_md5_prog(self.device),
- self.DEVICE_TEMP_FILE])[0].split()
- self.assertEqual(checksum, dev_md5)
- self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
-
- def test_push(self):
- """Push a randomly generated file to specified device."""
- kbytes = 512
- tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
- rand_str = os.urandom(1024 * kbytes)
- tmp.write(rand_str)
- tmp.close()
- self._test_push(tmp.name, compute_md5(rand_str))
- os.remove(tmp.name)
-
- # TODO: write push directory test.
-
- def _test_pull(self, remote_file, checksum):
- tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
- tmp_write.close()
- self.device.pull(remote=remote_file, local=tmp_write.name)
- with open(tmp_write.name, 'rb') as tmp_read:
- host_contents = tmp_read.read()
- host_md5 = compute_md5(host_contents)
- self.assertEqual(checksum, host_md5)
- os.remove(tmp_write.name)
-
- def test_pull(self):
- """Pull a randomly generated file from specified device."""
- kbytes = 512
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
- cmd = ['dd', 'if=/dev/urandom',
- 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
- 'count={}'.format(kbytes)]
- self.device.shell(cmd)
- dev_md5, _ = self.device.shell(
- [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
- self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
- self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
-
- def test_pull_dir(self):
- """Pull a randomly generated directory of files from the device."""
- host_dir = tempfile.mkdtemp()
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
-
- # Populate device directory with random files.
- temp_files = make_random_device_files(
- self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
-
- self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
-
- for temp_file in temp_files:
- host_path = os.path.join(host_dir, temp_file.base_name)
- with open(host_path, 'rb') as host_file:
- host_md5 = compute_md5(host_file.read())
- self.assertEqual(host_md5, temp_file.checksum)
-
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- if host_dir is not None:
- shutil.rmtree(host_dir)
-
- def test_sync(self):
- """Sync a randomly generated directory of files to specified device."""
- base_dir = tempfile.mkdtemp()
-
- # Create mirror device directory hierarchy within base_dir.
- full_dir_path = base_dir + self.DEVICE_TEMP_DIR
- os.makedirs(full_dir_path)
-
- # Create 32 random files within the host mirror.
- temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32)
-
- # Clean up any trash on the device.
- device = adb.get_device(product=base_dir)
- device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
-
- device.sync('data')
-
- # Confirm that every file on the device mirrors that on the host.
- for temp_file in temp_files:
- device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
- temp_file.base_name)
- dev_md5, _ = device.shell(
- [get_md5_prog(self.device), device_full_path])[0].split()
- self.assertEqual(temp_file.checksum, dev_md5)
-
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- if base_dir is not None:
- shutil.rmtree(base_dir)
-
- def test_unicode_paths(self):
- """Ensure that we can support non-ASCII paths, even on Windows."""
- name = u'로보카 폴리'
-
- ## push.
- tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
- tf.close()
- self.device.push(tf.name, u'/data/local/tmp/adb-test-{}'.format(name))
- os.remove(tf.name)
- self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
-
- # pull.
- cmd = ['touch', u'"/data/local/tmp/adb-test-{}"'.format(name)]
- self.device.shell(cmd)
-
- tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
- tf.close()
- self.device.pull(u'/data/local/tmp/adb-test-{}'.format(name), tf.name)
- os.remove(tf.name)
- self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
-
-
-def main():
- random.seed(0)
- if len(adb.get_devices()) > 0:
- suite = unittest.TestLoader().loadTestsFromName(__name__)
- unittest.TextTestRunner(verbosity=3).run(suite)
- else:
- print('Test suite must be run with attached devices')
-
-
-if __name__ == '__main__':
- main()