me_cleaner.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. #!/usr/bin/python
  2. # me_cleaner - Tool for partial deblobbing of Intel ME/TXE firmware images
  3. # Copyright (C) 2016, 2017 Nicola Corna <nicola@corna.info>
  4. #
  5. # This program is free software; you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation; either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. import sys
  16. import itertools
  17. import binascii
  18. import hashlib
  19. import argparse
  20. from struct import pack, unpack
  21. new_ftpr_offset = 0x1000
  22. unremovable_modules = ("BUP", "ROMP")
  23. def get_chunks_offsets(llut, me_start):
  24. chunk_count = unpack("<I", llut[0x04:0x08])[0]
  25. huffman_stream_end = sum(unpack("<II", llut[0x10:0x18])) + me_start
  26. nonzero_offsets = [huffman_stream_end]
  27. offsets = []
  28. for i in range(0, chunk_count):
  29. chunk = llut[0x40 + i * 4:0x44 + i * 4]
  30. offset = 0
  31. if chunk[3] != 0x80:
  32. offset = unpack("<I", chunk[0:3] + b"\x00")[0] + me_start
  33. offsets.append([offset, 0])
  34. if offset != 0:
  35. nonzero_offsets.append(offset)
  36. nonzero_offsets.sort()
  37. for i in offsets:
  38. if i[0] != 0:
  39. i[1] = nonzero_offsets[nonzero_offsets.index(i[0]) + 1]
  40. return offsets
  41. def fill_range(f, start, end, fill):
  42. block = fill * 4096
  43. f.seek(start)
  44. f.writelines(itertools.repeat(block, (end - start) // 4096))
  45. f.write(block[:(end - start) % 4096])
  46. def remove_modules(f, mod_headers, ftpr_offset):
  47. comp_str = ("Uncomp.", "Huffman", "LZMA")
  48. unremovable_huff_chunks = []
  49. chunks_offsets = []
  50. base = 0
  51. chunk_size = 0
  52. end_addr = 0
  53. for mod_header in mod_headers:
  54. name = mod_header[0x04:0x14].rstrip(b"\x00").decode("ascii")
  55. offset = unpack("<I", mod_header[0x38:0x3C])[0] + ftpr_offset
  56. size = unpack("<I", mod_header[0x40:0x44])[0]
  57. flags = unpack("<I", mod_header[0x50:0x54])[0]
  58. comp_type = (flags >> 4) & 7
  59. sys.stdout.write(" {:<16} ({:<7}, ".format(name, comp_str[comp_type]))
  60. if comp_type == 0x00 or comp_type == 0x02:
  61. sys.stdout.write("0x{:06x} - 0x{:06x}): "
  62. .format(offset, offset + size))
  63. if name in unremovable_modules:
  64. end_addr = max(end_addr, offset + size)
  65. print("NOT removed, essential")
  66. else:
  67. fill_range(f, offset, offset + size, b"\xff")
  68. print("removed")
  69. elif comp_type == 0x01:
  70. sys.stdout.write("fragmented data ): ")
  71. if not chunks_offsets:
  72. f.seek(offset)
  73. llut = f.read(4)
  74. if llut == b"LLUT":
  75. llut += f.read(0x3c)
  76. chunk_count = unpack("<I", llut[0x4:0x8])[0]
  77. base = unpack("<I", llut[0x8:0xc])[0] + 0x10000000
  78. huff_data_len = unpack("<I", llut[0x10:0x14])[0]
  79. chunk_size = unpack("<I", llut[0x30:0x34])[0]
  80. llut += f.read(chunk_count * 4 + huff_data_len)
  81. chunks_offsets = get_chunks_offsets(llut, me_start)
  82. else:
  83. sys.exit("Huffman modules found, but LLUT is not present")
  84. if name in unremovable_modules:
  85. print("NOT removed, essential")
  86. module_base = unpack("<I", mod_header[0x34:0x38])[0]
  87. module_size = unpack("<I", mod_header[0x3c:0x40])[0]
  88. first_chunk_num = (module_base - base) // chunk_size
  89. last_chunk_num = first_chunk_num + module_size // chunk_size
  90. unremovable_huff_chunks += \
  91. [x for x in chunks_offsets[first_chunk_num:
  92. last_chunk_num + 1] if x[0] != 0]
  93. else:
  94. print("removed")
  95. else:
  96. sys.stdout.write("0x{:06x} - 0x{:06x}): unknown compression, "
  97. "skipping".format(offset, offset + size))
  98. if chunks_offsets:
  99. removable_huff_chunks = []
  100. for chunk in chunks_offsets:
  101. if all(not(unremovable_chk[0] <= chunk[0] < unremovable_chk[1] or
  102. unremovable_chk[0] < chunk[1] <= unremovable_chk[1])
  103. for unremovable_chk in unremovable_huff_chunks):
  104. removable_huff_chunks.append(chunk)
  105. for removable_chunk in removable_huff_chunks:
  106. if removable_chunk[1] > removable_chunk[0]:
  107. fill_range(f, removable_chunk[0], removable_chunk[1], b"\xff")
  108. end_addr = max(end_addr,
  109. max(unremovable_huff_chunks, key=lambda x: x[1])[1])
  110. return end_addr
  111. def check_partition_signature(f, offset):
  112. f.seek(offset)
  113. header = f.read(0x80)
  114. modulus = int(binascii.hexlify(f.read(0x100)[::-1]), 16)
  115. public_exponent = unpack("<I", f.read(4))[0]
  116. signature = int(binascii.hexlify(f.read(0x100)[::-1]), 16)
  117. header_len = unpack("<I", header[0x4:0x8])[0] * 4
  118. manifest_len = unpack("<I", header[0x18:0x1c])[0] * 4
  119. f.seek(offset + header_len)
  120. sha256 = hashlib.sha256()
  121. sha256.update(header)
  122. sha256.update(f.read(manifest_len - header_len))
  123. decrypted_sig = pow(signature, public_exponent, modulus)
  124. return "{:#x}".format(decrypted_sig).endswith(sha256.hexdigest()) # FIXME
  125. def move_range(f, offset_from, size, offset_to, fill):
  126. for i in range(0, size, 4096):
  127. f.seek(offset_from + i, 0)
  128. block = f.read(4096 if size - i >= 4096 else size - i)
  129. f.seek(offset_from + i, 0)
  130. f.write(fill * 4096 if size - i >= 4096 else fill * (size - i))
  131. f.seek(offset_to + i, 0)
  132. f.write(block)
  133. def relocate_partition(f, me_start, partition_header_offset, new_offset,
  134. mod_headers):
  135. f.seek(partition_header_offset)
  136. name = f.read(4).rstrip(b"\x00").decode("ascii")
  137. f.seek(partition_header_offset + 0x8)
  138. old_offset, partition_size = unpack("<II", f.read(0x8))
  139. old_offset += me_start
  140. offset_diff = new_offset - old_offset
  141. print("Relocating {} to {:#x} - {:#x}..."
  142. .format(name, new_offset, new_offset + partition_size))
  143. print(" Adjusting FPT entry...")
  144. f.seek(partition_header_offset + 0x8)
  145. f.write(pack("<I", new_offset - me_start))
  146. llut_start = 0
  147. for mod_header in mod_headers:
  148. if (unpack("<I", mod_header[0x50:0x54])[0] >> 4) & 7 == 0x01:
  149. llut_start = unpack("<I", mod_header[0x38:0x3C])[0] + old_offset
  150. break
  151. if llut_start != 0:
  152. f.seek(llut_start, 0)
  153. if f.read(4) == b"LLUT":
  154. print(" Adjusting LUT start offset...")
  155. f.seek(llut_start + 0x0c, 0)
  156. old_lut_offset = unpack("<I", f.read(4))[0]
  157. f.seek(llut_start + 0x0c, 0)
  158. f.write(pack("<I", old_lut_offset + offset_diff))
  159. print(" Adjusting Huffman start offset...")
  160. f.seek(llut_start + 0x14, 0)
  161. old_huff_offset = unpack("<I", f.read(4))[0]
  162. f.seek(llut_start + 0x14, 0)
  163. f.write(pack("<I", old_huff_offset + offset_diff))
  164. print(" Adjusting chunks offsets...")
  165. f.seek(llut_start + 0x4, 0)
  166. chunk_count = unpack("<I", f.read(4))[0]
  167. f.seek(llut_start + 0x40, 0)
  168. chunks = bytearray(chunk_count * 4)
  169. f.readinto(chunks)
  170. for i in range(0, chunk_count * 4, 4):
  171. if chunks[i + 3] != 0x80:
  172. chunks[i:i + 3] = \
  173. pack("<I", unpack("<I", chunks[i:i + 3] +
  174. b"\x00")[0] + offset_diff)[0:3]
  175. f.seek(llut_start + 0x40, 0)
  176. f.write(chunks)
  177. else:
  178. sys.exit("Huffman modules present but no LLUT found!")
  179. else:
  180. print(" No Huffman modules found")
  181. print(" Moving data...")
  182. move_range(f, old_offset, partition_size, new_offset, b"\xff")
  183. if __name__ == "__main__":
  184. parser = argparse.ArgumentParser(description="Tool to remove as much code "
  185. "as possible from Intel ME/TXE firmwares")
  186. parser.add_argument("file", help="ME/TXE image or full dump")
  187. parser.add_argument("-r", "--relocate", help="relocate the FTPR partition "
  188. "to the top of the ME region", action="store_true")
  189. parser.add_argument("-k", "--keep-modules", help="don't remove the FTPR "
  190. "modules, even when possible", action="store_true")
  191. parser.add_argument("-c", "--check", help="verify the integrity of the "
  192. "fundamental parts of the firmware and exit",
  193. action="store_true")
  194. args = parser.parse_args()
  195. with open(args.file, "rb" if args.check else "r+b") as f:
  196. f.seek(0x10)
  197. magic = f.read(4)
  198. if magic == b"$FPT":
  199. print("ME/TXE image detected")
  200. me_start = 0
  201. f.seek(0, 2)
  202. me_end = f.tell()
  203. elif magic == b"\x5a\xa5\xf0\x0f":
  204. print("Full image detected")
  205. f.seek(0x14)
  206. flmap0 = unpack("<I", f.read(4))[0]
  207. nr = flmap0 >> 24 & 0x7
  208. frba = flmap0 >> 12 & 0xff0
  209. if nr >= 2:
  210. f.seek(frba + 0x8)
  211. flreg2 = unpack("<I", f.read(4))[0]
  212. me_start = (flreg2 & 0x1fff) << 12
  213. me_end = flreg2 >> 4 & 0x1fff000 | 0xfff
  214. if me_start >= me_end:
  215. sys.exit("The ME/TXE region in this image has been "
  216. "disabled")
  217. f.seek(me_start + 0x10)
  218. if f.read(4) != b"$FPT":
  219. sys.exit("The ME/TXE region is corrupted or missing")
  220. print("The ME/TXE region goes from {:#x} to {:#x}"
  221. .format(me_start, me_end))
  222. else:
  223. sys.exit("This image does not contains a ME/TXE firmware "
  224. "(NR = {})".format(nr))
  225. else:
  226. sys.exit("Unknown image")
  227. print("Found FPT header at {:#x}".format(me_start + 0x10))
  228. f.seek(me_start + 0x14)
  229. entries = unpack("<I", f.read(4))[0]
  230. print("Found {} partition(s)".format(entries))
  231. f.seek(me_start + 0x14)
  232. header_len = unpack("B", f.read(1))[0]
  233. f.seek(me_start + 0x30)
  234. partitions = f.read(entries * 0x20)
  235. ftpr_header = b""
  236. for i in range(entries):
  237. if partitions[i * 0x20:(i * 0x20) + 4] == b"FTPR":
  238. ftpr_header = partitions[i * 0x20:(i + 1) * 0x20]
  239. break
  240. if ftpr_header == b"":
  241. sys.exit("FTPR header not found, this image doesn't seem to be "
  242. "valid")
  243. ftpr_offset, ftpr_lenght = unpack("<II", ftpr_header[0x08:0x10])
  244. ftpr_offset += me_start
  245. print("Found FTPR header: FTPR partition spans from {:#x} to {:#x}"
  246. .format(ftpr_offset, ftpr_offset + ftpr_lenght))
  247. f.seek(ftpr_offset)
  248. if f.read(4) == b"$CPD":
  249. me11 = True
  250. num_entries = unpack("<I", f.read(4))[0]
  251. ftpr_mn2_offset = 0x10 + num_entries * 0x18
  252. else:
  253. me11 = False
  254. ftpr_mn2_offset = 0
  255. f.seek(ftpr_offset + ftpr_mn2_offset + 0x24)
  256. version = unpack("<HHHH", f.read(0x08))
  257. print("ME/TXE firmware version {}"
  258. .format('.'.join(str(i) for i in version)))
  259. if not args.check:
  260. print("Removing extra partitions...")
  261. fill_range(f, me_start + 0x30, ftpr_offset, b"\xff")
  262. fill_range(f, ftpr_offset + ftpr_lenght, me_end, b"\xff")
  263. print("Removing extra partition entries in FPT...")
  264. f.seek(me_start + 0x30)
  265. f.write(ftpr_header)
  266. f.seek(me_start + 0x14)
  267. f.write(pack("<I", 1))
  268. print("Removing EFFS presence flag...")
  269. f.seek(me_start + 0x24)
  270. flags = unpack("<I", f.read(4))[0]
  271. flags &= ~(0x00000001)
  272. f.seek(me_start + 0x24)
  273. f.write(pack("<I", flags))
  274. f.seek(me_start, 0)
  275. header = bytearray(f.read(0x30))
  276. checksum = (0x100 - (sum(header) - header[0x1b]) & 0xff) & 0xff
  277. print("Correcting checksum (0x{:02x})...".format(checksum))
  278. # The checksum is just the two's complement of the sum of the
  279. # first 0x30 bytes (except for 0x1b, the checksum itself). In other
  280. # words, the sum of the first 0x30 bytes must be always 0x00.
  281. f.seek(me_start + 0x1b)
  282. f.write(pack("B", checksum))
  283. if not me11:
  284. print("Reading FTPR modules list...")
  285. f.seek(ftpr_offset + 0x1c)
  286. tag = f.read(4)
  287. if tag == b"$MN2":
  288. f.seek(ftpr_offset + 0x20)
  289. num_modules = unpack("<I", f.read(4))[0]
  290. f.seek(ftpr_offset + 0x290)
  291. data = f.read(0x84)
  292. module_header_size = 0
  293. if data[0x0:0x4] == b"$MME":
  294. if data[0x60:0x64] == b"$MME" or num_modules == 1:
  295. module_header_size = 0x60
  296. elif data[0x80:0x84] == b"$MME":
  297. module_header_size = 0x80
  298. if module_header_size != 0:
  299. f.seek(ftpr_offset + 0x290)
  300. mod_headers = [f.read(module_header_size)
  301. for i in range(0, num_modules)]
  302. if all(hdr.startswith(b"$MME") for hdr in mod_headers):
  303. if args.keep_modules:
  304. end_addr = ftpr_offset + ftpr_lenght
  305. else:
  306. end_addr = remove_modules(f, mod_headers,
  307. ftpr_offset)
  308. if args.relocate:
  309. new_ftpr_offset += me_start
  310. relocate_partition(f, me_start,
  311. me_start + 0x30,
  312. new_ftpr_offset,
  313. mod_headers)
  314. end_addr += new_ftpr_offset - ftpr_offset
  315. ftpr_offset = new_ftpr_offset
  316. end_addr = (end_addr // 0x1000 + 1) * 0x1000
  317. print("The ME minimum size is {0} bytes "
  318. "({0:#x} bytes)".format(end_addr - me_start))
  319. if me_start > 0:
  320. print("The ME region can be reduced up to:\n"
  321. " {:08x}:{:08x} me"
  322. .format(me_start, end_addr - 1))
  323. else:
  324. print("Found less modules than expected in the "
  325. "FTPR partition; skipping modules removal")
  326. else:
  327. print("Can't find the module header size; skipping "
  328. "modules removal")
  329. else:
  330. print("Wrong FTPR partition tag ({}); skipping modules "
  331. "removal".format(tag))
  332. else:
  333. print("Modules removal in ME v11 or greater is not yet "
  334. "supported")
  335. sys.stdout.write("Checking FTPR RSA signature... ")
  336. if check_partition_signature(f, ftpr_offset + ftpr_mn2_offset):
  337. print("VALID")
  338. else:
  339. print("INVALID!!")
  340. sys.exit("The FTPR partition signature is not valid. Is the input "
  341. "ME/TXE image valid?")
  342. if not args.check:
  343. print("Done! Good luck!")