_extraction.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. # -*- coding: UTF-8 -*-
  2. # Copyright (c) 2019 The ungoogled-chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """
  6. Archive extraction utilities
  7. """
  8. import os
  9. import shutil
  10. import subprocess
  11. import tarfile
  12. from pathlib import Path, PurePosixPath
  13. from _common import (USE_REGISTRY, PlatformEnum, ExtractorEnum, get_logger, get_running_platform)
  14. from prune_binaries import CONTINGENT_PATHS
  15. DEFAULT_EXTRACTORS = {
  16. ExtractorEnum.SEVENZIP: USE_REGISTRY,
  17. ExtractorEnum.TAR: 'tar',
  18. ExtractorEnum.WINRAR: USE_REGISTRY,
  19. }
  20. def _find_7z_by_registry():
  21. """
  22. Return a string to 7-zip's 7z.exe from the Windows Registry.
  23. """
  24. import winreg #pylint: disable=import-error, import-outside-toplevel
  25. sub_key_7zfm = 'SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\App Paths\\7zFM.exe'
  26. try:
  27. with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, sub_key_7zfm) as key_handle:
  28. sevenzipfm_dir = winreg.QueryValueEx(key_handle, 'Path')[0]
  29. except OSError:
  30. get_logger().exception('Unable to locate 7-zip from the Windows Registry')
  31. raise
  32. sevenzip_path = Path(sevenzipfm_dir, '7z.exe')
  33. if not sevenzip_path.is_file():
  34. get_logger().error('7z.exe not found at path from registry: %s', sevenzip_path)
  35. return sevenzip_path
  36. def _find_winrar_by_registry():
  37. """
  38. Return a string to WinRAR's WinRAR.exe from the Windows Registry.
  39. """
  40. import winreg #pylint: disable=import-error, import-outside-toplevel
  41. sub_key_winrar = 'SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\App Paths\\WinRAR.exe'
  42. try:
  43. with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, sub_key_winrar) as key_handle:
  44. winrar_dir = winreg.QueryValueEx(key_handle, 'Path')[0]
  45. except OSError:
  46. get_logger().exception('Unable to locale WinRAR from the Windows Registry')
  47. raise
  48. winrar_path = Path(winrar_dir, 'WinRAR.exe')
  49. if not winrar_path.is_file():
  50. get_logger().error('WinRAR.exe not found at path from registry: %s', winrar_path)
  51. return winrar_path
  52. def _find_extractor_by_cmd(extractor_cmd):
  53. """Returns a string path to the binary; None if it couldn't be found"""
  54. if not extractor_cmd:
  55. return None
  56. if Path(extractor_cmd).is_file():
  57. return extractor_cmd
  58. return shutil.which(extractor_cmd)
  59. def _process_relative_to(unpack_root, relative_to):
  60. """
  61. For an extractor that doesn't support an automatic transform, move the extracted
  62. contents from the relative_to/ directory to the unpack_root
  63. If relative_to is None, nothing is done.
  64. """
  65. if relative_to is None:
  66. return
  67. relative_root = unpack_root / relative_to
  68. if not relative_root.is_dir():
  69. get_logger().error('Could not find relative_to directory in extracted files: %s',
  70. relative_to)
  71. raise Exception()
  72. for src_path in relative_root.iterdir():
  73. dest_path = unpack_root / src_path.name
  74. src_path.rename(dest_path)
  75. relative_root.rmdir()
  76. def _extract_tar_with_7z(binary, archive_path, output_dir, relative_to, skip_unused):
  77. get_logger().debug('Using 7-zip extractor')
  78. if not relative_to is None and (output_dir / relative_to).exists():
  79. get_logger().error('Temporary unpacking directory already exists: %s',
  80. output_dir / relative_to)
  81. raise Exception()
  82. cmd1 = (binary, 'x', str(archive_path), '-so')
  83. cmd2 = (binary, 'x', '-si', '-aoa', '-ttar', '-o{}'.format(str(output_dir)))
  84. if skip_unused:
  85. for cpath in CONTINGENT_PATHS:
  86. cmd2 += ('-x!%s/%s' % (str(relative_to), cpath[:-1]), )
  87. get_logger().debug('7z command line: %s | %s', ' '.join(cmd1), ' '.join(cmd2))
  88. proc1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE)
  89. proc2 = subprocess.Popen(cmd2, stdin=proc1.stdout, stdout=subprocess.PIPE)
  90. proc1.stdout.close()
  91. (stdout_data, stderr_data) = proc2.communicate()
  92. if proc2.returncode != 0:
  93. get_logger().error('7z commands returned non-zero status: %s', proc2.returncode)
  94. get_logger().debug('stdout: %s', stdout_data)
  95. get_logger().debug('stderr: %s', stderr_data)
  96. raise Exception()
  97. _process_relative_to(output_dir, relative_to)
  98. def _extract_tar_with_tar(binary, archive_path, output_dir, relative_to, skip_unused):
  99. get_logger().debug('Using BSD or GNU tar extractor')
  100. output_dir.mkdir(exist_ok=True)
  101. cmd = (binary, '-xf', str(archive_path), '-C', str(output_dir))
  102. if skip_unused:
  103. for cpath in CONTINGENT_PATHS:
  104. cmd += ('--exclude=%s/%s' % (str(relative_to), cpath[:-1]), )
  105. get_logger().debug('tar command line: %s', ' '.join(cmd))
  106. result = subprocess.run(cmd, check=False)
  107. if result.returncode != 0:
  108. get_logger().error('tar command returned %s', result.returncode)
  109. raise Exception()
  110. # for gnu tar, the --transform option could be used. but to keep compatibility with
  111. # bsdtar on macos, we just do this ourselves
  112. _process_relative_to(output_dir, relative_to)
  113. def _extract_tar_with_winrar(binary, archive_path, output_dir, relative_to, skip_unused):
  114. get_logger().debug('Using WinRAR extractor')
  115. output_dir.mkdir(exist_ok=True)
  116. cmd = (binary, 'x', '-o+', str(archive_path), str(output_dir))
  117. if skip_unused:
  118. for cpath in CONTINGENT_PATHS:
  119. cmd += ('-x%s%s%s' % (str(relative_to), os.sep, cpath[:-1].replace('/')), )
  120. get_logger().debug('WinRAR command line: %s', ' '.join(cmd))
  121. result = subprocess.run(cmd, check=False)
  122. if result.returncode != 0:
  123. get_logger().error('WinRAR command returned %s', result.returncode)
  124. raise Exception()
  125. _process_relative_to(output_dir, relative_to)
  126. def _extract_tar_with_python(archive_path, output_dir, relative_to, skip_unused):
  127. get_logger().debug('Using pure Python tar extractor')
  128. class NoAppendList(list):
  129. """Hack to workaround memory issues with large tar files"""
  130. def append(self, obj):
  131. pass
  132. # Simple hack to check if symlinks are supported
  133. try:
  134. os.symlink('', '')
  135. except FileNotFoundError:
  136. # Symlinks probably supported
  137. symlink_supported = True
  138. except OSError:
  139. # Symlinks probably not supported
  140. get_logger().info('System does not support symlinks. Ignoring them.')
  141. symlink_supported = False
  142. except BaseException:
  143. # Unexpected exception
  144. get_logger().exception('Unexpected exception during symlink support check.')
  145. raise
  146. with tarfile.open(str(archive_path), 'r|%s' % archive_path.suffix[1:]) as tar_file_obj:
  147. tar_file_obj.members = NoAppendList()
  148. for tarinfo in tar_file_obj:
  149. try:
  150. if skip_unused and [
  151. cpath for cpath in CONTINGENT_PATHS
  152. if tarinfo.name.startswith(str(relative_to) + '/' + cpath)
  153. ]:
  154. continue
  155. if relative_to is None:
  156. destination = output_dir / PurePosixPath(tarinfo.name)
  157. else:
  158. destination = output_dir / PurePosixPath(tarinfo.name).relative_to(relative_to)
  159. if tarinfo.issym() and not symlink_supported:
  160. # In this situation, TarFile.makelink() will try to create a copy of the
  161. # target. But this fails because TarFile.members is empty
  162. # But if symlinks are not supported, it's safe to assume that symlinks
  163. # aren't needed. The only situation where this happens is on Windows.
  164. continue
  165. if tarinfo.islnk():
  166. # Derived from TarFile.extract()
  167. new_target = output_dir / PurePosixPath(
  168. tarinfo.linkname).relative_to(relative_to)
  169. tarinfo._link_target = new_target.as_posix() # pylint: disable=protected-access
  170. if destination.is_symlink():
  171. destination.unlink()
  172. tar_file_obj._extract_member(tarinfo, str(destination)) # pylint: disable=protected-access
  173. except BaseException:
  174. get_logger().exception('Exception thrown for tar member: %s', tarinfo.name)
  175. raise
  176. def extract_tar_file(archive_path, output_dir, relative_to, skip_unused, extractors=None):
  177. """
  178. Extract regular or compressed tar archive into the output directory.
  179. archive_path is the pathlib.Path to the archive to unpack
  180. output_dir is a pathlib.Path to the directory to unpack. It must already exist.
  181. relative_to is a pathlib.Path for directories that should be stripped relative to the
  182. root of the archive, or None if no path components should be stripped.
  183. extractors is a dictionary of PlatformEnum to a command or path to the
  184. extractor binary. Defaults to 'tar' for tar, and '_use_registry' for 7-Zip and WinRAR.
  185. """
  186. if extractors is None:
  187. extractors = DEFAULT_EXTRACTORS
  188. current_platform = get_running_platform()
  189. if current_platform == PlatformEnum.WINDOWS:
  190. # Try to use 7-zip first
  191. sevenzip_cmd = extractors.get(ExtractorEnum.SEVENZIP)
  192. if sevenzip_cmd == USE_REGISTRY:
  193. sevenzip_cmd = str(_find_7z_by_registry())
  194. sevenzip_bin = _find_extractor_by_cmd(sevenzip_cmd)
  195. if sevenzip_bin is not None:
  196. _extract_tar_with_7z(sevenzip_bin, archive_path, output_dir, relative_to, skip_unused)
  197. return
  198. # Use WinRAR if 7-zip is not found
  199. winrar_cmd = extractors.get(ExtractorEnum.WINRAR)
  200. if winrar_cmd == USE_REGISTRY:
  201. winrar_cmd = str(_find_winrar_by_registry())
  202. winrar_bin = _find_extractor_by_cmd(winrar_cmd)
  203. if winrar_bin is not None:
  204. _extract_tar_with_winrar(winrar_bin, archive_path, output_dir, relative_to, skip_unused)
  205. return
  206. get_logger().warning(
  207. 'Neither 7-zip nor WinRAR were found. Falling back to Python extractor...')
  208. elif current_platform == PlatformEnum.UNIX:
  209. # NOTE: 7-zip isn't an option because it doesn't preserve file permissions
  210. tar_bin = _find_extractor_by_cmd(extractors.get(ExtractorEnum.TAR))
  211. if not tar_bin is None:
  212. _extract_tar_with_tar(tar_bin, archive_path, output_dir, relative_to, skip_unused)
  213. return
  214. else:
  215. # This is not a normal code path, so make it clear.
  216. raise NotImplementedError(current_platform)
  217. # Fallback to Python-based extractor on all platforms
  218. _extract_tar_with_python(archive_path, output_dir, relative_to, skip_unused)
  219. def extract_with_7z(
  220. archive_path,
  221. output_dir,
  222. relative_to, #pylint: disable=too-many-arguments
  223. skip_unused,
  224. extractors=None):
  225. """
  226. Extract archives with 7-zip into the output directory.
  227. Only supports archives with one layer of unpacking, so compressed tar archives don't work.
  228. archive_path is the pathlib.Path to the archive to unpack
  229. output_dir is a pathlib.Path to the directory to unpack. It must already exist.
  230. relative_to is a pathlib.Path for directories that should be stripped relative to the
  231. root of the archive.
  232. extractors is a dictionary of PlatformEnum to a command or path to the
  233. extractor binary. Defaults to 'tar' for tar, and '_use_registry' for 7-Zip.
  234. """
  235. # TODO: It would be nice to extend this to support arbitrary standard IO chaining of 7z
  236. # instances, so _extract_tar_with_7z and other future formats could use this.
  237. if extractors is None:
  238. extractors = DEFAULT_EXTRACTORS
  239. sevenzip_cmd = extractors.get(ExtractorEnum.SEVENZIP)
  240. if sevenzip_cmd == USE_REGISTRY:
  241. if not get_running_platform() == PlatformEnum.WINDOWS:
  242. get_logger().error('"%s" for 7-zip is only available on Windows', sevenzip_cmd)
  243. raise Exception()
  244. sevenzip_cmd = str(_find_7z_by_registry())
  245. sevenzip_bin = _find_extractor_by_cmd(sevenzip_cmd)
  246. if not relative_to is None and (output_dir / relative_to).exists():
  247. get_logger().error('Temporary unpacking directory already exists: %s',
  248. output_dir / relative_to)
  249. raise Exception()
  250. cmd = (sevenzip_bin, 'x', str(archive_path), '-aoa', '-o{}'.format(str(output_dir)))
  251. if skip_unused:
  252. for cpath in CONTINGENT_PATHS:
  253. cmd += ('-x!%s/%s' % (str(relative_to), cpath[:-1]), )
  254. get_logger().debug('7z command line: %s', ' '.join(cmd))
  255. result = subprocess.run(cmd, check=False)
  256. if result.returncode != 0:
  257. get_logger().error('7z command returned %s', result.returncode)
  258. raise Exception()
  259. _process_relative_to(output_dir, relative_to)
  260. def extract_with_winrar(
  261. archive_path,
  262. output_dir,
  263. relative_to, #pylint: disable=too-many-arguments
  264. skip_unused,
  265. extractors=None):
  266. """
  267. Extract archives with WinRAR into the output directory.
  268. Only supports archives with one layer of unpacking, so compressed tar archives don't work.
  269. archive_path is the pathlib.Path to the archive to unpack
  270. output_dir is a pathlib.Path to the directory to unpack. It must already exist.
  271. relative_to is a pathlib.Path for directories that should be stripped relative to the
  272. root of the archive.
  273. extractors is a dictionary of PlatformEnum to a command or path to the
  274. extractor binary. Defaults to 'tar' for tar, and '_use_registry' for WinRAR.
  275. """
  276. if extractors is None:
  277. extractors = DEFAULT_EXTRACTORS
  278. winrar_cmd = extractors.get(ExtractorEnum.WINRAR)
  279. if winrar_cmd == USE_REGISTRY:
  280. if not get_running_platform() == PlatformEnum.WINDOWS:
  281. get_logger().error('"%s" for WinRAR is only available on Windows', winrar_cmd)
  282. raise Exception()
  283. winrar_cmd = str(_find_winrar_by_registry())
  284. winrar_bin = _find_extractor_by_cmd(winrar_cmd)
  285. if not relative_to is None and (output_dir / relative_to).exists():
  286. get_logger().error('Temporary unpacking directory already exists: %s',
  287. output_dir / relative_to)
  288. raise Exception()
  289. cmd = (winrar_bin, 'x', '-o+', str(archive_path), str(output_dir))
  290. if skip_unused:
  291. for cpath in CONTINGENT_PATHS:
  292. cmd += ('-x%s%s%s' % (str(relative_to), os.sep, cpath[:-1].replace('/', os.sep)), )
  293. get_logger().debug('WinRAR command line: %s', ' '.join(cmd))
  294. result = subprocess.run(cmd, check=False)
  295. if result.returncode != 0:
  296. get_logger().error('WinRAR command returned %s', result.returncode)
  297. raise Exception()
  298. _process_relative_to(output_dir, relative_to)