_extraction.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. # -*- coding: UTF-8 -*-
  2. # Copyright (c) 2019 The ungoogled-chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """
  6. Archive extraction utilities
  7. """
  8. import os
  9. import shutil
  10. import subprocess
  11. import tarfile
  12. from pathlib import Path, PurePosixPath
  13. from _common import (USE_REGISTRY, PlatformEnum, ExtractorEnum, get_logger, get_running_platform)
  14. DEFAULT_EXTRACTORS = {
  15. ExtractorEnum.SEVENZIP: USE_REGISTRY,
  16. ExtractorEnum.TAR: 'tar',
  17. ExtractorEnum.WINRAR: USE_REGISTRY,
  18. }
  19. class ExtractionError(BaseException):
  20. """Exceptions thrown in this module's methods"""
  21. def _find_7z_by_registry():
  22. """
  23. Return a string to 7-zip's 7z.exe from the Windows Registry.
  24. Raises ExtractionError if it fails.
  25. """
  26. import winreg #pylint: disable=import-error
  27. sub_key_7zfm = 'SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\App Paths\\7zFM.exe'
  28. try:
  29. with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, sub_key_7zfm) as key_handle:
  30. sevenzipfm_dir = winreg.QueryValueEx(key_handle, 'Path')[0]
  31. except OSError:
  32. get_logger().exception('Unable to locate 7-zip from the Windows Registry')
  33. raise ExtractionError()
  34. sevenzip_path = Path(sevenzipfm_dir, '7z.exe')
  35. if not sevenzip_path.is_file():
  36. get_logger().error('7z.exe not found at path from registry: %s', sevenzip_path)
  37. return sevenzip_path
  38. def _find_winrar_by_registry():
  39. """
  40. Return a string to WinRAR's WinRAR.exe from the Windows Registry.
  41. Raises ExtractionError if it fails.
  42. """
  43. import winreg #pylint: disable=import-error
  44. sub_key_winrar = 'SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\App Paths\\WinRAR.exe'
  45. try:
  46. with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, sub_key_winrar) as key_handle:
  47. winrar_dir = winreg.QueryValueEx(key_handle, 'Path')[0]
  48. except OSError:
  49. get_logger().exception('Unable to locale WinRAR from the Windows Registry')
  50. raise ExtractionError()
  51. winrar_path = Path(winrar_dir, 'WinRAR.exe')
  52. if not winrar_path.is_file():
  53. get_logger().error('WinRAR.exe not found at path from registry: %s', winrar_path)
  54. return winrar_path
  55. def _find_extractor_by_cmd(extractor_cmd):
  56. """Returns a string path to the binary; None if it couldn't be found"""
  57. if not extractor_cmd:
  58. return None
  59. if Path(extractor_cmd).is_file():
  60. return extractor_cmd
  61. return shutil.which(extractor_cmd)
  62. def _process_relative_to(unpack_root, relative_to):
  63. """
  64. For an extractor that doesn't support an automatic transform, move the extracted
  65. contents from the relative_to/ directory to the unpack_root
  66. If relative_to is None, nothing is done.
  67. """
  68. if relative_to is None:
  69. return
  70. relative_root = unpack_root / relative_to
  71. if not relative_root.is_dir():
  72. get_logger().error('Could not find relative_to directory in extracted files: %s',
  73. relative_to)
  74. raise ExtractionError()
  75. for src_path in relative_root.iterdir():
  76. dest_path = unpack_root / src_path.name
  77. src_path.rename(dest_path)
  78. relative_root.rmdir()
  79. def _extract_tar_with_7z(binary, archive_path, output_dir, relative_to):
  80. get_logger().debug('Using 7-zip extractor')
  81. if not relative_to is None and (output_dir / relative_to).exists():
  82. get_logger().error('Temporary unpacking directory already exists: %s',
  83. output_dir / relative_to)
  84. raise ExtractionError()
  85. cmd1 = (binary, 'x', str(archive_path), '-so')
  86. cmd2 = (binary, 'x', '-si', '-aoa', '-ttar', '-o{}'.format(str(output_dir)))
  87. get_logger().debug('7z command line: %s | %s', ' '.join(cmd1), ' '.join(cmd2))
  88. proc1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE)
  89. proc2 = subprocess.Popen(cmd2, stdin=proc1.stdout, stdout=subprocess.PIPE)
  90. proc1.stdout.close()
  91. (stdout_data, stderr_data) = proc2.communicate()
  92. if proc2.returncode != 0:
  93. get_logger().error('7z commands returned non-zero status: %s', proc2.returncode)
  94. get_logger().debug('stdout: %s', stdout_data)
  95. get_logger().debug('stderr: %s', stderr_data)
  96. raise ExtractionError()
  97. _process_relative_to(output_dir, relative_to)
  98. def _extract_tar_with_tar(binary, archive_path, output_dir, relative_to):
  99. get_logger().debug('Using BSD or GNU tar extractor')
  100. output_dir.mkdir(exist_ok=True)
  101. cmd = (binary, '-xf', str(archive_path), '-C', str(output_dir))
  102. get_logger().debug('tar command line: %s', ' '.join(cmd))
  103. result = subprocess.run(cmd)
  104. if result.returncode != 0:
  105. get_logger().error('tar command returned %s', result.returncode)
  106. raise ExtractionError()
  107. # for gnu tar, the --transform option could be used. but to keep compatibility with
  108. # bsdtar on macos, we just do this ourselves
  109. _process_relative_to(output_dir, relative_to)
  110. def _extract_tar_with_winrar(binary, archive_path, output_dir, relative_to):
  111. get_logger().debug('Using WinRAR extractor')
  112. output_dir.mkdir(exist_ok=True)
  113. cmd = (binary, 'x', '-o+', str(archive_path), str(output_dir))
  114. get_logger().debug('WinRAR command line: %s', ' '.join(cmd))
  115. result = subprocess.run(cmd)
  116. if result.returncode != 0:
  117. get_logger().error('WinRAR command returned %s', result.returncode)
  118. raise ExtractionError()
  119. _process_relative_to(output_dir, relative_to)
  120. def _extract_tar_with_python(archive_path, output_dir, relative_to):
  121. get_logger().debug('Using pure Python tar extractor')
  122. class NoAppendList(list):
  123. """Hack to workaround memory issues with large tar files"""
  124. def append(self, obj):
  125. pass
  126. # Simple hack to check if symlinks are supported
  127. try:
  128. os.symlink('', '')
  129. except FileNotFoundError:
  130. # Symlinks probably supported
  131. symlink_supported = True
  132. except OSError:
  133. # Symlinks probably not supported
  134. get_logger().info('System does not support symlinks. Ignoring them.')
  135. symlink_supported = False
  136. except BaseException:
  137. # Unexpected exception
  138. get_logger().exception('Unexpected exception during symlink support check.')
  139. raise ExtractionError()
  140. with tarfile.open(str(archive_path), 'r|%s' % archive_path.suffix[1:]) as tar_file_obj:
  141. tar_file_obj.members = NoAppendList()
  142. for tarinfo in tar_file_obj:
  143. try:
  144. if relative_to is None:
  145. destination = output_dir / PurePosixPath(tarinfo.name)
  146. else:
  147. destination = output_dir / PurePosixPath(tarinfo.name).relative_to(relative_to)
  148. if tarinfo.issym() and not symlink_supported:
  149. # In this situation, TarFile.makelink() will try to create a copy of the
  150. # target. But this fails because TarFile.members is empty
  151. # But if symlinks are not supported, it's safe to assume that symlinks
  152. # aren't needed. The only situation where this happens is on Windows.
  153. continue
  154. if tarinfo.islnk():
  155. # Derived from TarFile.extract()
  156. new_target = output_dir / PurePosixPath(
  157. tarinfo.linkname).relative_to(relative_to)
  158. tarinfo._link_target = new_target.as_posix() # pylint: disable=protected-access
  159. if destination.is_symlink():
  160. destination.unlink()
  161. tar_file_obj._extract_member(tarinfo, str(destination)) # pylint: disable=protected-access
  162. except BaseException:
  163. get_logger().exception('Exception thrown for tar member: %s', tarinfo.name)
  164. raise ExtractionError()
  165. def extract_tar_file(archive_path, output_dir, relative_to, extractors=None):
  166. """
  167. Extract regular or compressed tar archive into the output directory.
  168. archive_path is the pathlib.Path to the archive to unpack
  169. output_dir is a pathlib.Path to the directory to unpack. It must already exist.
  170. relative_to is a pathlib.Path for directories that should be stripped relative to the
  171. root of the archive, or None if no path components should be stripped.
  172. extractors is a dictionary of PlatformEnum to a command or path to the
  173. extractor binary. Defaults to 'tar' for tar, and '_use_registry' for 7-Zip and WinRAR.
  174. Raises ExtractionError if unexpected issues arise during unpacking.
  175. """
  176. if extractors is None:
  177. extractors = DEFAULT_EXTRACTORS
  178. current_platform = get_running_platform()
  179. if current_platform == PlatformEnum.WINDOWS:
  180. # Try to use 7-zip first
  181. sevenzip_cmd = extractors.get(ExtractorEnum.SEVENZIP)
  182. if sevenzip_cmd == USE_REGISTRY:
  183. sevenzip_cmd = str(_find_7z_by_registry())
  184. sevenzip_bin = _find_extractor_by_cmd(sevenzip_cmd)
  185. if sevenzip_bin is not None:
  186. _extract_tar_with_7z(sevenzip_bin, archive_path, output_dir, relative_to)
  187. return
  188. # Use WinRAR if 7-zip is not found
  189. winrar_cmd = extractors.get(ExtractorEnum.WINRAR)
  190. if winrar_cmd == USE_REGISTRY:
  191. winrar_cmd = str(_find_winrar_by_registry())
  192. winrar_bin = _find_extractor_by_cmd(winrar_cmd)
  193. if winrar_bin is not None:
  194. _extract_tar_with_winrar(winrar_bin, archive_path, output_dir, relative_to)
  195. return
  196. get_logger().warning(
  197. 'Neither 7-zip nor WinRAR were found. Falling back to Python extractor...')
  198. elif current_platform == PlatformEnum.UNIX:
  199. # NOTE: 7-zip isn't an option because it doesn't preserve file permissions
  200. tar_bin = _find_extractor_by_cmd(extractors.get(ExtractorEnum.TAR))
  201. if not tar_bin is None:
  202. _extract_tar_with_tar(tar_bin, archive_path, output_dir, relative_to)
  203. return
  204. else:
  205. # This is not a normal code path, so make it clear.
  206. raise NotImplementedError(current_platform)
  207. # Fallback to Python-based extractor on all platforms
  208. _extract_tar_with_python(archive_path, output_dir, relative_to)
  209. def extract_with_7z(
  210. archive_path,
  211. output_dir,
  212. relative_to, #pylint: disable=too-many-arguments
  213. extractors=None):
  214. """
  215. Extract archives with 7-zip into the output directory.
  216. Only supports archives with one layer of unpacking, so compressed tar archives don't work.
  217. archive_path is the pathlib.Path to the archive to unpack
  218. output_dir is a pathlib.Path to the directory to unpack. It must already exist.
  219. relative_to is a pathlib.Path for directories that should be stripped relative to the
  220. root of the archive.
  221. extractors is a dictionary of PlatformEnum to a command or path to the
  222. extractor binary. Defaults to 'tar' for tar, and '_use_registry' for 7-Zip.
  223. Raises ExtractionError if unexpected issues arise during unpacking.
  224. """
  225. # TODO: It would be nice to extend this to support arbitrary standard IO chaining of 7z
  226. # instances, so _extract_tar_with_7z and other future formats could use this.
  227. if extractors is None:
  228. extractors = DEFAULT_EXTRACTORS
  229. sevenzip_cmd = extractors.get(ExtractorEnum.SEVENZIP)
  230. if sevenzip_cmd == USE_REGISTRY:
  231. if not get_running_platform() == PlatformEnum.WINDOWS:
  232. get_logger().error('"%s" for 7-zip is only available on Windows', sevenzip_cmd)
  233. raise ExtractionError()
  234. sevenzip_cmd = str(_find_7z_by_registry())
  235. sevenzip_bin = _find_extractor_by_cmd(sevenzip_cmd)
  236. if not relative_to is None and (output_dir / relative_to).exists():
  237. get_logger().error('Temporary unpacking directory already exists: %s',
  238. output_dir / relative_to)
  239. raise ExtractionError()
  240. cmd = (sevenzip_bin, 'x', str(archive_path), '-aoa', '-o{}'.format(str(output_dir)))
  241. get_logger().debug('7z command line: %s', ' '.join(cmd))
  242. result = subprocess.run(cmd)
  243. if result.returncode != 0:
  244. get_logger().error('7z command returned %s', result.returncode)
  245. raise ExtractionError()
  246. _process_relative_to(output_dir, relative_to)
  247. def extract_with_winrar(
  248. archive_path,
  249. output_dir,
  250. relative_to, #pylint: disable=too-many-arguments
  251. extractors=None):
  252. """
  253. Extract archives with WinRAR into the output directory.
  254. Only supports archives with one layer of unpacking, so compressed tar archives don't work.
  255. archive_path is the pathlib.Path to the archive to unpack
  256. output_dir is a pathlib.Path to the directory to unpack. It must already exist.
  257. relative_to is a pathlib.Path for directories that should be stripped relative to the
  258. root of the archive.
  259. extractors is a dictionary of PlatformEnum to a command or path to the
  260. extractor binary. Defaults to 'tar' for tar, and '_use_registry' for WinRAR.
  261. Raises ExtractionError if unexpected issues arise during unpacking.
  262. """
  263. if extractors is None:
  264. extractors = DEFAULT_EXTRACTORS
  265. winrar_cmd = extractors.get(ExtractorEnum.WINRAR)
  266. if winrar_cmd == USE_REGISTRY:
  267. if not get_running_platform() == PlatformEnum.WINDOWS:
  268. get_logger().error('"%s" for WinRAR is only available on Windows', winrar_cmd)
  269. raise ExtractionError()
  270. winrar_cmd = str(_find_winrar_by_registry())
  271. winrar_bin = _find_extractor_by_cmd(winrar_cmd)
  272. if not relative_to is None and (output_dir / relative_to).exists():
  273. get_logger().error('Temporary unpacking directory already exists: %s',
  274. output_dir / relative_to)
  275. raise ExtractionError()
  276. cmd = (winrar_bin, 'x', '-o+', str(archive_path), str(output_dir))
  277. get_logger().debug('WinRAR command line: %s', ' '.join(cmd))
  278. result = subprocess.run(cmd)
  279. if result.returncode != 0:
  280. get_logger().error('WinRAR command returned %s', result.returncode)
  281. raise ExtractionError()
  282. _process_relative_to(output_dir, relative_to)