Daniel Rosenberg | 83bef81 | 2016-08-12 13:12:21 -0700 | [diff] [blame] | 1 | # |
| 2 | # Copyright (C) 2016 The Android Open Source Project |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | """Provides functionality to interact with a device via `fastboot`.""" |
| 17 | |
| 18 | import os |
| 19 | import re |
| 20 | import subprocess |
| 21 | |
| 22 | |
| 23 | class FastbootError(Exception): |
| 24 | """Something went wrong interacting with fastboot.""" |
| 25 | |
| 26 | |
| 27 | class FastbootDevice(object): |
| 28 | """Class to interact with a fastboot device.""" |
| 29 | |
| 30 | # Prefix for INFO-type messages when printed by fastboot. If we want |
| 31 | # to parse the output from an INFO message we need to strip this off. |
| 32 | INFO_PREFIX = '(bootloader) ' |
| 33 | |
| 34 | def __init__(self, path='fastboot'): |
| 35 | """Initialization. |
| 36 | |
| 37 | Args: |
| 38 | path: path to the fastboot executable to test with. |
| 39 | |
| 40 | Raises: |
| 41 | FastbootError: Failed to find a device in fastboot mode. |
| 42 | """ |
| 43 | self.path = path |
| 44 | |
| 45 | # Make sure the fastboot executable is available. |
| 46 | try: |
| 47 | _subprocess_check_output([self.path, '--version']) |
| 48 | except OSError: |
| 49 | raise FastbootError('Could not execute `{}`'.format(self.path)) |
| 50 | |
| 51 | # Make sure exactly 1 fastboot device is available if <specific device> |
| 52 | # was not given as an argument. Do not try to find an adb device and |
| 53 | # put it in fastboot mode, it would be too easy to accidentally |
| 54 | # download to the wrong device. |
| 55 | if not self._check_single_device(): |
| 56 | raise FastbootError('Requires exactly 1 device in fastboot mode') |
| 57 | |
| 58 | def _check_single_device(self): |
| 59 | """Returns True if there is exactly one fastboot device attached. |
| 60 | When ANDROID_SERIAL is set it checks that the device is available. |
| 61 | """ |
| 62 | |
| 63 | if 'ANDROID_SERIAL' in os.environ: |
| 64 | try: |
| 65 | self.getvar('product') |
| 66 | return True |
| 67 | except subprocess.CalledProcessError: |
| 68 | return False |
| 69 | devices = _subprocess_check_output([self.path, 'devices']).splitlines() |
| 70 | return len(devices) == 1 and devices[0].split()[1] == 'fastboot' |
| 71 | |
| 72 | def getvar(self, name): |
| 73 | """Calls `fastboot getvar`. |
| 74 | |
| 75 | To query all variables (fastboot getvar all) use getvar_all() |
| 76 | instead. |
| 77 | |
| 78 | Args: |
| 79 | name: variable name to access. |
| 80 | |
| 81 | Returns: |
| 82 | String value of variable |name| or None if not found. |
| 83 | """ |
| 84 | try: |
| 85 | output = _subprocess_check_output([self.path, 'getvar', name], |
| 86 | stderr=subprocess.STDOUT).splitlines() |
| 87 | except subprocess.CalledProcessError: |
| 88 | return None |
| 89 | # Output format is <name>:<whitespace><value>. |
| 90 | out = 0 |
| 91 | if output[0] == "< waiting for any device >": |
| 92 | out = 1 |
| 93 | result = re.search(r'{}:\s*(.*)'.format(name), output[out]) |
| 94 | if result: |
| 95 | return result.group(1) |
| 96 | else: |
| 97 | return None |
| 98 | |
| 99 | def getvar_all(self): |
| 100 | """Calls `fastboot getvar all`. |
| 101 | |
| 102 | Returns: |
| 103 | A {name, value} dictionary of variables. |
| 104 | """ |
| 105 | output = _subprocess_check_output([self.path, 'getvar', 'all'], |
| 106 | stderr=subprocess.STDOUT).splitlines() |
| 107 | all_vars = {} |
| 108 | for line in output: |
| 109 | result = re.search(r'(.*):\s*(.*)', line) |
| 110 | if result: |
| 111 | var_name = result.group(1) |
| 112 | |
| 113 | # `getvar all` works by sending one INFO message per variable |
| 114 | # so we need to strip out the info prefix string. |
| 115 | if var_name.startswith(self.INFO_PREFIX): |
| 116 | var_name = var_name[len(self.INFO_PREFIX):] |
| 117 | |
| 118 | # In addition to returning all variables the bootloader may |
| 119 | # also think it's supposed to query a return a variable named |
| 120 | # "all", so ignore this line if so. Fastboot also prints a |
| 121 | # summary line that we want to ignore. |
| 122 | if var_name != 'all' and 'total time' not in var_name: |
| 123 | all_vars[var_name] = result.group(2) |
| 124 | return all_vars |
| 125 | |
| 126 | def flashall(self, wipe_user=True, slot=None, skip_secondary=False, quiet=True): |
| 127 | """Calls `fastboot [-w] flashall`. |
| 128 | |
| 129 | Args: |
| 130 | wipe_user: whether to set the -w flag or not. |
| 131 | slot: slot to flash if device supports A/B, otherwise default will be used. |
| 132 | skip_secondary: on A/B devices, flashes only the primary images if true. |
| 133 | quiet: True to hide output, false to send it to stdout. |
| 134 | """ |
| 135 | func = (_subprocess_check_output if quiet else subprocess.check_call) |
| 136 | command = [self.path, 'flashall'] |
| 137 | if slot: |
| 138 | command.extend(['--slot', slot]) |
| 139 | if skip_secondary: |
| 140 | command.append("--skip-secondary") |
| 141 | if wipe_user: |
| 142 | command.append('-w') |
| 143 | func(command, stderr=subprocess.STDOUT) |
| 144 | |
| 145 | def flash(self, partition='cache', img=None, slot=None, quiet=True): |
| 146 | """Calls `fastboot flash`. |
| 147 | |
| 148 | Args: |
| 149 | partition: which partition to flash. |
| 150 | img: path to .img file, otherwise the default will be used. |
| 151 | slot: slot to flash if device supports A/B, otherwise default will be used. |
| 152 | quiet: True to hide output, false to send it to stdout. |
| 153 | """ |
| 154 | func = (_subprocess_check_output if quiet else subprocess.check_call) |
| 155 | command = [self.path, 'flash', partition] |
| 156 | if img: |
| 157 | command.append(img) |
| 158 | if slot: |
| 159 | command.extend(['--slot', slot]) |
| 160 | if skip_secondary: |
| 161 | command.append("--skip-secondary") |
| 162 | func(command, stderr=subprocess.STDOUT) |
| 163 | |
| 164 | def reboot(self, bootloader=False): |
| 165 | """Calls `fastboot reboot [bootloader]`. |
| 166 | |
| 167 | Args: |
| 168 | bootloader: True to reboot back to the bootloader. |
| 169 | """ |
| 170 | command = [self.path, 'reboot'] |
| 171 | if bootloader: |
| 172 | command.append('bootloader') |
| 173 | _subprocess_check_output(command, stderr=subprocess.STDOUT) |
| 174 | |
| 175 | def set_active(self, slot): |
| 176 | """Calls `fastboot set_active <slot>`. |
| 177 | |
| 178 | Args: |
| 179 | slot: The slot to set as the current slot.""" |
| 180 | command = [self.path, 'set_active', slot] |
| 181 | _subprocess_check_output(command, stderr=subprocess.STDOUT) |
| 182 | |
| 183 | # If necessary, modifies subprocess.check_output() or subprocess.Popen() args |
| 184 | # to run the subprocess via Windows PowerShell to work-around an issue in |
| 185 | # Python 2's subprocess class on Windows where it doesn't support Unicode. |
| 186 | def _get_subprocess_args(args): |
| 187 | # Only do this slow work-around if Unicode is in the cmd line on Windows. |
| 188 | # PowerShell takes 600-700ms to startup on a 2013-2014 machine, which is |
| 189 | # very slow. |
| 190 | if os.name != 'nt' or all(not isinstance(arg, unicode) for arg in args[0]): |
| 191 | return args |
| 192 | |
| 193 | def escape_arg(arg): |
| 194 | # Escape for the parsing that the C Runtime does in Windows apps. In |
| 195 | # particular, this will take care of double-quotes. |
| 196 | arg = subprocess.list2cmdline([arg]) |
| 197 | # Escape single-quote with another single-quote because we're about |
| 198 | # to... |
| 199 | arg = arg.replace(u"'", u"''") |
| 200 | # ...put the arg in a single-quoted string for PowerShell to parse. |
| 201 | arg = u"'" + arg + u"'" |
| 202 | return arg |
| 203 | |
| 204 | # Escape command line args. |
| 205 | argv = map(escape_arg, args[0]) |
| 206 | # Cause script errors (such as adb not found) to stop script immediately |
| 207 | # with an error. |
| 208 | ps_code = u'$ErrorActionPreference = "Stop"\r\n' |
| 209 | # Add current directory to the PATH var, to match cmd.exe/CreateProcess() |
| 210 | # behavior. |
| 211 | ps_code += u'$env:Path = ".;" + $env:Path\r\n' |
| 212 | # Precede by &, the PowerShell call operator, and separate args by space. |
| 213 | ps_code += u'& ' + u' '.join(argv) |
| 214 | # Make the PowerShell exit code the exit code of the subprocess. |
| 215 | ps_code += u'\r\nExit $LastExitCode' |
| 216 | # Encode as UTF-16LE (without Byte-Order-Mark) which Windows natively |
| 217 | # understands. |
| 218 | ps_code = ps_code.encode('utf-16le') |
| 219 | |
| 220 | # Encode the PowerShell command as base64 and use the special |
| 221 | # -EncodedCommand option that base64 decodes. Base64 is just plain ASCII, |
| 222 | # so it should have no problem passing through Win32 CreateProcessA() |
| 223 | # (which python erroneously calls instead of CreateProcessW()). |
| 224 | return (['powershell.exe', '-NoProfile', '-NonInteractive', |
| 225 | '-EncodedCommand', base64.b64encode(ps_code)],) + args[1:] |
| 226 | |
| 227 | # Call this instead of subprocess.check_output() to work-around issue in Python |
| 228 | # 2's subprocess class on Windows where it doesn't support Unicode. |
| 229 | def _subprocess_check_output(*args, **kwargs): |
| 230 | try: |
| 231 | return subprocess.check_output(*_get_subprocess_args(args), **kwargs) |
| 232 | except subprocess.CalledProcessError as e: |
| 233 | # Show real command line instead of the powershell.exe command line. |
| 234 | raise subprocess.CalledProcessError(e.returncode, args[0], |
| 235 | output=e.output) |