_extraction.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. # -*- coding: UTF-8 -*-
  2. # Copyright (c) 2019 The ungoogled-chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """
  6. Archive extraction utilities
  7. """
  8. import os
  9. import shutil
  10. import subprocess
  11. import tarfile
  12. from pathlib import Path, PurePosixPath
  13. from _common import (USE_REGISTRY, PlatformEnum, ExtractorEnum, get_logger, get_running_platform)
  14. from prune_binaries import CONTINGENT_PATHS
  15. DEFAULT_EXTRACTORS = {
  16. ExtractorEnum.SEVENZIP: USE_REGISTRY,
  17. ExtractorEnum.TAR: 'tar',
  18. ExtractorEnum.WINRAR: USE_REGISTRY,
  19. }
  20. class ExtractionError(BaseException):
  21. """Exceptions thrown in this module's methods"""
  22. def _find_7z_by_registry():
  23. """
  24. Return a string to 7-zip's 7z.exe from the Windows Registry.
  25. Raises ExtractionError if it fails.
  26. """
  27. import winreg #pylint: disable=import-error
  28. sub_key_7zfm = 'SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\App Paths\\7zFM.exe'
  29. try:
  30. with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, sub_key_7zfm) as key_handle:
  31. sevenzipfm_dir = winreg.QueryValueEx(key_handle, 'Path')[0]
  32. except OSError:
  33. get_logger().exception('Unable to locate 7-zip from the Windows Registry')
  34. raise ExtractionError()
  35. sevenzip_path = Path(sevenzipfm_dir, '7z.exe')
  36. if not sevenzip_path.is_file():
  37. get_logger().error('7z.exe not found at path from registry: %s', sevenzip_path)
  38. return sevenzip_path
  39. def _find_winrar_by_registry():
  40. """
  41. Return a string to WinRAR's WinRAR.exe from the Windows Registry.
  42. Raises ExtractionError if it fails.
  43. """
  44. import winreg #pylint: disable=import-error
  45. sub_key_winrar = 'SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\App Paths\\WinRAR.exe'
  46. try:
  47. with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, sub_key_winrar) as key_handle:
  48. winrar_dir = winreg.QueryValueEx(key_handle, 'Path')[0]
  49. except OSError:
  50. get_logger().exception('Unable to locale WinRAR from the Windows Registry')
  51. raise ExtractionError()
  52. winrar_path = Path(winrar_dir, 'WinRAR.exe')
  53. if not winrar_path.is_file():
  54. get_logger().error('WinRAR.exe not found at path from registry: %s', winrar_path)
  55. return winrar_path
  56. def _find_extractor_by_cmd(extractor_cmd):
  57. """Returns a string path to the binary; None if it couldn't be found"""
  58. if not extractor_cmd:
  59. return None
  60. if Path(extractor_cmd).is_file():
  61. return extractor_cmd
  62. return shutil.which(extractor_cmd)
  63. def _process_relative_to(unpack_root, relative_to):
  64. """
  65. For an extractor that doesn't support an automatic transform, move the extracted
  66. contents from the relative_to/ directory to the unpack_root
  67. If relative_to is None, nothing is done.
  68. """
  69. if relative_to is None:
  70. return
  71. relative_root = unpack_root / relative_to
  72. if not relative_root.is_dir():
  73. get_logger().error('Could not find relative_to directory in extracted files: %s',
  74. relative_to)
  75. raise ExtractionError()
  76. for src_path in relative_root.iterdir():
  77. dest_path = unpack_root / src_path.name
  78. src_path.rename(dest_path)
  79. relative_root.rmdir()
  80. def _extract_tar_with_7z(binary, archive_path, output_dir, relative_to, skip_unused):
  81. get_logger().debug('Using 7-zip extractor')
  82. if not relative_to is None and (output_dir / relative_to).exists():
  83. get_logger().error('Temporary unpacking directory already exists: %s',
  84. output_dir / relative_to)
  85. raise ExtractionError()
  86. cmd1 = (binary, 'x', str(archive_path), '-so')
  87. cmd2 = (binary, 'x', '-si', '-aoa', '-ttar', '-o{}'.format(str(output_dir)))
  88. if skip_unused:
  89. for cpath in CONTINGENT_PATHS:
  90. cmd2 += ('-x!%s/%s' % (str(relative_to), cpath[:-1]), )
  91. get_logger().debug('7z command line: %s | %s', ' '.join(cmd1), ' '.join(cmd2))
  92. proc1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE)
  93. proc2 = subprocess.Popen(cmd2, stdin=proc1.stdout, stdout=subprocess.PIPE)
  94. proc1.stdout.close()
  95. (stdout_data, stderr_data) = proc2.communicate()
  96. if proc2.returncode != 0:
  97. get_logger().error('7z commands returned non-zero status: %s', proc2.returncode)
  98. get_logger().debug('stdout: %s', stdout_data)
  99. get_logger().debug('stderr: %s', stderr_data)
  100. raise ExtractionError()
  101. _process_relative_to(output_dir, relative_to)
  102. def _extract_tar_with_tar(binary, archive_path, output_dir, relative_to, skip_unused):
  103. get_logger().debug('Using BSD or GNU tar extractor')
  104. output_dir.mkdir(exist_ok=True)
  105. cmd = (binary, '-xf', str(archive_path), '-C', str(output_dir))
  106. if skip_unused:
  107. for cpath in CONTINGENT_PATHS:
  108. cmd += ('--exclude=%s/%s' % (str(relative_to), cpath[:-1]), )
  109. get_logger().debug('tar command line: %s', ' '.join(cmd))
  110. result = subprocess.run(cmd)
  111. if result.returncode != 0:
  112. get_logger().error('tar command returned %s', result.returncode)
  113. raise ExtractionError()
  114. # for gnu tar, the --transform option could be used. but to keep compatibility with
  115. # bsdtar on macos, we just do this ourselves
  116. _process_relative_to(output_dir, relative_to)
  117. def _extract_tar_with_winrar(binary, archive_path, output_dir, relative_to, skip_unused):
  118. get_logger().debug('Using WinRAR extractor')
  119. output_dir.mkdir(exist_ok=True)
  120. cmd = (binary, 'x', '-o+', str(archive_path), str(output_dir))
  121. if skip_unused:
  122. for cpath in CONTINGENT_PATHS:
  123. cmd += ('-x%s%s%s' % (str(relative_to), os.sep, cpath[:-1].replace('/'), os.sep), )
  124. get_logger().debug('WinRAR command line: %s', ' '.join(cmd))
  125. result = subprocess.run(cmd)
  126. if result.returncode != 0:
  127. get_logger().error('WinRAR command returned %s', result.returncode)
  128. raise ExtractionError()
  129. _process_relative_to(output_dir, relative_to)
  130. def _extract_tar_with_python(archive_path, output_dir, relative_to, skip_unused):
  131. get_logger().debug('Using pure Python tar extractor')
  132. class NoAppendList(list):
  133. """Hack to workaround memory issues with large tar files"""
  134. def append(self, obj):
  135. pass
  136. # Simple hack to check if symlinks are supported
  137. try:
  138. os.symlink('', '')
  139. except FileNotFoundError:
  140. # Symlinks probably supported
  141. symlink_supported = True
  142. except OSError:
  143. # Symlinks probably not supported
  144. get_logger().info('System does not support symlinks. Ignoring them.')
  145. symlink_supported = False
  146. except BaseException:
  147. # Unexpected exception
  148. get_logger().exception('Unexpected exception during symlink support check.')
  149. raise ExtractionError()
  150. with tarfile.open(str(archive_path), 'r|%s' % archive_path.suffix[1:]) as tar_file_obj:
  151. tar_file_obj.members = NoAppendList()
  152. for tarinfo in tar_file_obj:
  153. try:
  154. if skip_unused and [
  155. cpath for cpath in CONTINGENT_PATHS
  156. if tarinfo.name.startswith(str(relative_to) + '/' + cpath)
  157. ]:
  158. continue
  159. if relative_to is None:
  160. destination = output_dir / PurePosixPath(tarinfo.name)
  161. else:
  162. destination = output_dir / PurePosixPath(tarinfo.name).relative_to(relative_to)
  163. if tarinfo.issym() and not symlink_supported:
  164. # In this situation, TarFile.makelink() will try to create a copy of the
  165. # target. But this fails because TarFile.members is empty
  166. # But if symlinks are not supported, it's safe to assume that symlinks
  167. # aren't needed. The only situation where this happens is on Windows.
  168. continue
  169. if tarinfo.islnk():
  170. # Derived from TarFile.extract()
  171. new_target = output_dir / PurePosixPath(
  172. tarinfo.linkname).relative_to(relative_to)
  173. tarinfo._link_target = new_target.as_posix() # pylint: disable=protected-access
  174. if destination.is_symlink():
  175. destination.unlink()
  176. tar_file_obj._extract_member(tarinfo, str(destination)) # pylint: disable=protected-access
  177. except BaseException:
  178. get_logger().exception('Exception thrown for tar member: %s', tarinfo.name)
  179. raise ExtractionError()
  180. def extract_tar_file(archive_path, output_dir, relative_to, skip_unused, extractors=None):
  181. """
  182. Extract regular or compressed tar archive into the output directory.
  183. archive_path is the pathlib.Path to the archive to unpack
  184. output_dir is a pathlib.Path to the directory to unpack. It must already exist.
  185. relative_to is a pathlib.Path for directories that should be stripped relative to the
  186. root of the archive, or None if no path components should be stripped.
  187. extractors is a dictionary of PlatformEnum to a command or path to the
  188. extractor binary. Defaults to 'tar' for tar, and '_use_registry' for 7-Zip and WinRAR.
  189. Raises ExtractionError if unexpected issues arise during unpacking.
  190. """
  191. if extractors is None:
  192. extractors = DEFAULT_EXTRACTORS
  193. current_platform = get_running_platform()
  194. if current_platform == PlatformEnum.WINDOWS:
  195. # Try to use 7-zip first
  196. sevenzip_cmd = extractors.get(ExtractorEnum.SEVENZIP)
  197. if sevenzip_cmd == USE_REGISTRY:
  198. sevenzip_cmd = str(_find_7z_by_registry())
  199. sevenzip_bin = _find_extractor_by_cmd(sevenzip_cmd)
  200. if sevenzip_bin is not None:
  201. _extract_tar_with_7z(sevenzip_bin, archive_path, output_dir, relative_to, skip_unused)
  202. return
  203. # Use WinRAR if 7-zip is not found
  204. winrar_cmd = extractors.get(ExtractorEnum.WINRAR)
  205. if winrar_cmd == USE_REGISTRY:
  206. winrar_cmd = str(_find_winrar_by_registry())
  207. winrar_bin = _find_extractor_by_cmd(winrar_cmd)
  208. if winrar_bin is not None:
  209. _extract_tar_with_winrar(winrar_bin, archive_path, output_dir, relative_to, skip_unused)
  210. return
  211. get_logger().warning(
  212. 'Neither 7-zip nor WinRAR were found. Falling back to Python extractor...')
  213. elif current_platform == PlatformEnum.UNIX:
  214. # NOTE: 7-zip isn't an option because it doesn't preserve file permissions
  215. tar_bin = _find_extractor_by_cmd(extractors.get(ExtractorEnum.TAR))
  216. if not tar_bin is None:
  217. _extract_tar_with_tar(tar_bin, archive_path, output_dir, relative_to, skip_unused)
  218. return
  219. else:
  220. # This is not a normal code path, so make it clear.
  221. raise NotImplementedError(current_platform)
  222. # Fallback to Python-based extractor on all platforms
  223. _extract_tar_with_python(archive_path, output_dir, relative_to, skip_unused)
  224. def extract_with_7z(
  225. archive_path,
  226. output_dir,
  227. relative_to, #pylint: disable=too-many-arguments
  228. skip_unused,
  229. extractors=None):
  230. """
  231. Extract archives with 7-zip into the output directory.
  232. Only supports archives with one layer of unpacking, so compressed tar archives don't work.
  233. archive_path is the pathlib.Path to the archive to unpack
  234. output_dir is a pathlib.Path to the directory to unpack. It must already exist.
  235. relative_to is a pathlib.Path for directories that should be stripped relative to the
  236. root of the archive.
  237. extractors is a dictionary of PlatformEnum to a command or path to the
  238. extractor binary. Defaults to 'tar' for tar, and '_use_registry' for 7-Zip.
  239. Raises ExtractionError if unexpected issues arise during unpacking.
  240. """
  241. # TODO: It would be nice to extend this to support arbitrary standard IO chaining of 7z
  242. # instances, so _extract_tar_with_7z and other future formats could use this.
  243. if extractors is None:
  244. extractors = DEFAULT_EXTRACTORS
  245. sevenzip_cmd = extractors.get(ExtractorEnum.SEVENZIP)
  246. if sevenzip_cmd == USE_REGISTRY:
  247. if not get_running_platform() == PlatformEnum.WINDOWS:
  248. get_logger().error('"%s" for 7-zip is only available on Windows', sevenzip_cmd)
  249. raise ExtractionError()
  250. sevenzip_cmd = str(_find_7z_by_registry())
  251. sevenzip_bin = _find_extractor_by_cmd(sevenzip_cmd)
  252. if not relative_to is None and (output_dir / relative_to).exists():
  253. get_logger().error('Temporary unpacking directory already exists: %s',
  254. output_dir / relative_to)
  255. raise ExtractionError()
  256. cmd = (sevenzip_bin, 'x', str(archive_path), '-aoa', '-o{}'.format(str(output_dir)))
  257. if skip_unused:
  258. for cpath in CONTINGENT_PATHS:
  259. cmd += ('-x!%s/%s' % (str(relative_to), cpath[:-1]), )
  260. get_logger().debug('7z command line: %s', ' '.join(cmd))
  261. result = subprocess.run(cmd)
  262. if result.returncode != 0:
  263. get_logger().error('7z command returned %s', result.returncode)
  264. raise ExtractionError()
  265. _process_relative_to(output_dir, relative_to)
  266. def extract_with_winrar(
  267. archive_path,
  268. output_dir,
  269. relative_to, #pylint: disable=too-many-arguments
  270. skip_unused,
  271. extractors=None):
  272. """
  273. Extract archives with WinRAR into the output directory.
  274. Only supports archives with one layer of unpacking, so compressed tar archives don't work.
  275. archive_path is the pathlib.Path to the archive to unpack
  276. output_dir is a pathlib.Path to the directory to unpack. It must already exist.
  277. relative_to is a pathlib.Path for directories that should be stripped relative to the
  278. root of the archive.
  279. extractors is a dictionary of PlatformEnum to a command or path to the
  280. extractor binary. Defaults to 'tar' for tar, and '_use_registry' for WinRAR.
  281. Raises ExtractionError if unexpected issues arise during unpacking.
  282. """
  283. if extractors is None:
  284. extractors = DEFAULT_EXTRACTORS
  285. winrar_cmd = extractors.get(ExtractorEnum.WINRAR)
  286. if winrar_cmd == USE_REGISTRY:
  287. if not get_running_platform() == PlatformEnum.WINDOWS:
  288. get_logger().error('"%s" for WinRAR is only available on Windows', winrar_cmd)
  289. raise ExtractionError()
  290. winrar_cmd = str(_find_winrar_by_registry())
  291. winrar_bin = _find_extractor_by_cmd(winrar_cmd)
  292. if not relative_to is None and (output_dir / relative_to).exists():
  293. get_logger().error('Temporary unpacking directory already exists: %s',
  294. output_dir / relative_to)
  295. raise ExtractionError()
  296. cmd = (winrar_bin, 'x', '-o+', str(archive_path), str(output_dir))
  297. if skip_unused:
  298. for cpath in CONTINGENT_PATHS:
  299. cmd += ('-x%s%s%s' % (str(relative_to), os.sep, cpath[:-1].replace('/', os.sep)), )
  300. get_logger().debug('WinRAR command line: %s', ' '.join(cmd))
  301. result = subprocess.run(cmd)
  302. if result.returncode != 0:
  303. get_logger().error('WinRAR command returned %s', result.returncode)
  304. raise ExtractionError()
  305. _process_relative_to(output_dir, relative_to)