cmd_fsck.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. #!/usr/bin/env python3
  2. """Test 'tinc fsck' command."""
  3. import os
  4. import sys
  5. import typing as T
  6. from testlib import check
  7. from testlib.log import log
  8. from testlib.proc import Tinc, Feature
  9. from testlib.util import read_text, read_lines, write_lines, append_line, write_text
  10. run_legacy_checks = Feature.LEGACY_PROTOCOL in Tinc().features
  11. run_access_checks = os.name != "nt" and os.geteuid() != 0
  12. run_executability_checks = os.name != "nt"
  13. run_permission_checks = run_executability_checks
  14. # Sample RSA key pair (old format). Uses e = 0xFFFF.
  15. RSA_N = """
  16. BB82C3A9B906E98ABF2D99FF9B320B229F5C1E58EC784762DA1F4D3509FFF78ECA7FFF19BA17073\
  17. 6CDE458EC8E732DDE2C02009632DF731B4A6BD6C504E50B7B875484506AC1E49FD0DF624F6612F5\
  18. 64C562BD20F870592A49195023D744963229C35081C8AE48BE2EBB5CC9A0D64924022DC0EB782A3\
  19. A8F3EABCA04AA42B24B2A6BD2353A6893A73AE01FA54891DD24BF36CA032F19F7E78C01273334BA\
  20. A2ECF36B6998754CB012BC985C975503D945E4D925F6F719ACC8FBA7B18C810FF850C3CCACD6056\
  21. 5D4FCFE02A98FE793E2D45D481A34D1F90584D096561FF3184C462C606535F3F9BB260541DF0D1F\
  22. EB16938FFDEC2FF96ACCC6BD5BFBC19471F6AB
  23. """.strip()
  24. RSA_D = """
  25. 8CEC9A4316FE45E07900197D8FBB52D3AF01A51C4F8BD08A1E21A662E3CFCF7792AD7680673817B\
  26. 70AC1888A08B49E8C5835357016D9BF56A0EBDE8B5DF214EC422809BC8D88177F273419116EF2EC\
  27. 7951453F129768DE9BC31D963515CC7481559E4C0E65C549169F2B94AE68DB944171189DD654DC6\
  28. 970F2F5843FB7C8E9D057E2B5716752F1F5686811AC075ED3D3CBD06B5D35AE33D01260D9E0560A\
  29. F545D0C9D89A31D5EAF96D5422F6567FE8A90E23906B840545805644DFD656E526A686D3B978DD2\
  30. 71578CA3DA0F7D23FC1252A702A5D597CAE9D4A5BBF6398A75AF72582C7538A7937FB71A2610DCB\
  31. C39625B77103FA3B7D0A55177FD98C39CD4A27
  32. """.strip()
  33. class Context:
  34. """Test context. Used to store paths to configuration files."""
  35. def __init__(self) -> None:
  36. node = Tinc()
  37. node.cmd("init", node.name)
  38. self.node = node
  39. self.host = node.sub("hosts", node.name)
  40. self.conf = node.sub("tinc.conf")
  41. self.rsa_priv = node.sub("rsa_key.priv")
  42. self.ec_priv = node.sub("ed25519_key.priv")
  43. self.tinc_up = node.sub("tinc-up")
  44. self.host_up = node.sub("host-up")
  45. if os.name == "nt":
  46. self.tinc_up = f"{self.tinc_up}.cmd"
  47. self.host_up = f"{self.host_up}.cmd"
  48. def expect_msg(
  49. self, msg: str, force: bool = False, code: int = 1, present: bool = True
  50. ) -> None:
  51. """Checks that tinc output contains (or does not contain) the expected message."""
  52. args = ["fsck"]
  53. if force:
  54. args.insert(0, "--force")
  55. out, err = self.node.cmd(*args, code=code)
  56. if present:
  57. check.is_in(msg, out, err)
  58. else:
  59. check.not_in(msg, out, err)
  60. def test(msg: str) -> Context:
  61. """Create test context."""
  62. context = Context()
  63. log.info("TEST: %s", msg)
  64. return context
  65. def remove_pem(config: str) -> T.List[str]:
  66. """Remove PEM from a config file, leaving everything else untouched."""
  67. key, result = False, []
  68. for line in read_lines(config):
  69. if line.startswith("-----BEGIN"):
  70. key = True
  71. continue
  72. if line.startswith("-----END"):
  73. key = False
  74. continue
  75. if not key:
  76. result.append(line)
  77. write_lines(config, result)
  78. return result
  79. def extract_pem(config: str) -> T.List[str]:
  80. """Extract PEM from a config file, ignoring everything else."""
  81. key = False
  82. result: T.List[str] = []
  83. for line in read_lines(config):
  84. if line.startswith("-----BEGIN"):
  85. key = True
  86. continue
  87. if line.startswith("-----END"):
  88. return result
  89. if key:
  90. result.append(line)
  91. raise Exception("key not found")
  92. def replace_line(file_path: str, prefix: str, replace: str = "") -> None:
  93. """Replace lines in a file that start with the prefix."""
  94. lines = read_lines(file_path)
  95. lines = [replace if line.startswith(prefix) else line for line in lines]
  96. write_lines(file_path, lines)
  97. def test_private_key_var(var: str, file: str) -> None:
  98. """Test inline private keys with variable var."""
  99. context = test(f"private key variable {var} in file {file}")
  100. renamed = os.path.realpath(context.node.sub("renamed_key"))
  101. os.rename(src=context.node.sub(file), dst=renamed)
  102. append_line(context.host, f"{var} = {renamed}")
  103. context.expect_msg("key was found but no private key", present=False, code=0)
  104. def test_private_keys(keyfile: str) -> None:
  105. """Test private keys in file keyfile."""
  106. context = test(f"fail on broken {keyfile}")
  107. keyfile_path = context.node.sub(keyfile)
  108. os.truncate(keyfile_path, 0)
  109. if run_legacy_checks:
  110. context.expect_msg("no private key is known", code=0)
  111. else:
  112. context.expect_msg("No Ed25519 private key found")
  113. if run_access_checks:
  114. context = test(f"fail on inaccessible {keyfile}")
  115. keyfile_path = context.node.sub(keyfile)
  116. os.chmod(keyfile_path, 0)
  117. context.expect_msg("Error reading", code=0 if run_legacy_checks else 1)
  118. if run_permission_checks:
  119. context = test(f"warn about unsafe permissions on {keyfile}")
  120. keyfile_path = context.node.sub(keyfile)
  121. os.chmod(keyfile_path, 0o666)
  122. context.expect_msg("unsafe file permissions", code=0)
  123. if run_legacy_checks:
  124. context = test(f"pass on missing {keyfile} when the other key is present")
  125. keyfile_path = context.node.sub(keyfile)
  126. os.remove(keyfile_path)
  127. context.node.cmd("fsck")
  128. def test_ec_public_key_file_var(context: Context, *paths: str) -> None:
  129. """Test EC public keys in config *paths."""
  130. ec_pubkey = os.path.realpath(context.node.sub("ec_pubkey"))
  131. ec_key = ""
  132. for line in read_lines(context.host):
  133. if line.startswith("Ed25519PublicKey"):
  134. _, _, ec_key = line.split()
  135. break
  136. assert ec_key
  137. pem = f"""
  138. -----BEGIN ED25519 PUBLIC KEY-----
  139. {ec_key}
  140. -----END ED25519 PUBLIC KEY-----
  141. """
  142. write_text(ec_pubkey, pem)
  143. replace_line(context.host, "Ed25519PublicKey")
  144. config = context.node.sub(*paths)
  145. append_line(config, f"Ed25519PublicKeyFile = {ec_pubkey}")
  146. context.expect_msg("No (usable) public Ed25519", code=0, present=False)
  147. ###############################################################################
  148. # Common tests
  149. ###############################################################################
  150. ctx = test("pass freshly created configuration")
  151. ctx.node.cmd("fsck")
  152. ctx = test("fail on missing tinc.conf")
  153. os.remove(ctx.conf)
  154. ctx.expect_msg("No tinc configuration found")
  155. for suffix in "up", "down":
  156. ctx = test(f"unknown -{suffix} script warning")
  157. fake_path = ctx.node.sub(f"fake-{suffix}")
  158. write_text(fake_path, "")
  159. ctx.expect_msg("Unknown script", code=0)
  160. ctx = test("fix broken Ed25519 public key with --force")
  161. replace_line(ctx.host, "Ed25519PublicKey", "Ed25519PublicKey = foobar")
  162. ctx.expect_msg("No (usable) public Ed25519 key", force=True, code=0)
  163. ctx.node.cmd("fsck")
  164. ctx = test("fix missing Ed25519 public key with --force")
  165. replace_line(ctx.host, "Ed25519PublicKey")
  166. ctx.expect_msg("No (usable) public Ed25519 key", force=True, code=0)
  167. ctx.node.cmd("fsck")
  168. ctx = test("fail when all private keys are missing")
  169. os.remove(ctx.ec_priv)
  170. if run_legacy_checks:
  171. os.remove(ctx.rsa_priv)
  172. ctx.expect_msg("Neither RSA or Ed25519 private")
  173. else:
  174. ctx.expect_msg("No Ed25519 private")
  175. ctx = test("warn about missing EC public key and NOT fix without --force")
  176. replace_line(ctx.host, "Ed25519PublicKey")
  177. ctx.expect_msg("No (usable) public Ed25519", code=0)
  178. host = read_text(ctx.host)
  179. check.not_in("ED25519 PUBLIC KEY", host)
  180. ctx = test("fix missing EC public key on --force")
  181. replace_line(ctx.host, "Ed25519PublicKey")
  182. ctx.expect_msg("Wrote Ed25519 public key", force=True, code=0)
  183. host = read_text(ctx.host)
  184. check.is_in("ED25519 PUBLIC KEY", host)
  185. ctx = test("warn about obsolete variables")
  186. append_line(ctx.host, "GraphDumpFile = /dev/null")
  187. ctx.expect_msg("obsolete variable GraphDumpFile", code=0)
  188. ctx = test("warn about missing values")
  189. append_line(ctx.host, "Weight = ")
  190. ctx.expect_msg("No value for variable `Weight")
  191. ctx = test("warn about duplicate variables")
  192. append_line(ctx.host, f"Weight = 0{os.linesep}Weight = 1")
  193. ctx.expect_msg("multiple instances of variable Weight", code=0)
  194. ctx = test("warn about server variables in host config")
  195. append_line(ctx.host, "Interface = fake0")
  196. ctx.expect_msg("server variable Interface found", code=0)
  197. ctx = test("warn about host variables in server config")
  198. append_line(ctx.conf, "Port = 1337")
  199. ctx.expect_msg("host variable Port found", code=0)
  200. ctx = test("warn about missing Name")
  201. replace_line(ctx.conf, "Name =")
  202. ctx.expect_msg("without a valid Name")
  203. test_private_keys("ed25519_key.priv")
  204. test_private_key_var("Ed25519PrivateKeyFile", "ed25519_key.priv")
  205. ctx = test("test EC public key in tinc.conf")
  206. test_ec_public_key_file_var(ctx, "tinc.conf")
  207. ctx = test("test EC public key in hosts/")
  208. test_ec_public_key_file_var(ctx, "hosts", ctx.node.name)
  209. if run_access_checks:
  210. ctx = test("fail on inaccessible tinc.conf")
  211. os.chmod(ctx.conf, 0)
  212. ctx.expect_msg("not running tinc as root")
  213. ctx = test("fail on inaccessible hosts/foo")
  214. os.chmod(ctx.host, 0)
  215. ctx.expect_msg("Cannot open config file")
  216. if run_executability_checks:
  217. ctx = test("non-executable tinc-up MUST be fixed by tinc --force")
  218. os.chmod(ctx.tinc_up, 0o644)
  219. ctx.expect_msg("cannot read and execute", force=True, code=0)
  220. assert os.access(ctx.tinc_up, os.X_OK)
  221. ctx = test("non-executable tinc-up MUST NOT be fixed by tinc without --force")
  222. os.chmod(ctx.tinc_up, 0o644)
  223. ctx.expect_msg("cannot read and execute", code=0)
  224. assert not os.access(ctx.tinc_up, os.X_OK)
  225. ctx = test("non-executable foo-up MUST be fixed by tinc --force")
  226. write_text(ctx.host_up, "")
  227. os.chmod(ctx.host_up, 0o644)
  228. ctx.expect_msg("cannot read and execute", force=True, code=0)
  229. assert os.access(ctx.tinc_up, os.X_OK)
  230. ctx = test("non-executable bar-up MUST NOT be fixed by tinc")
  231. path = ctx.node.sub("hosts", "bar-up")
  232. write_text(path, "")
  233. os.chmod(path, 0o644)
  234. ctx.expect_msg("cannot read and execute", code=0)
  235. assert not os.access(path, os.X_OK)
  236. ###############################################################################
  237. # Legacy protocol
  238. ###############################################################################
  239. if not run_legacy_checks:
  240. log.info("skipping legacy protocol tests")
  241. sys.exit(0)
  242. def test_rsa_public_key_file_var(context: Context, *paths: str) -> None:
  243. """Test RSA public keys in config *paths."""
  244. key = extract_pem(context.host)
  245. remove_pem(context.host)
  246. rsa_pub = os.path.realpath(context.node.sub("rsa_pubkey"))
  247. write_lines(rsa_pub, key)
  248. config = context.node.sub(*paths)
  249. append_line(config, f"PublicKeyFile = {rsa_pub}")
  250. context.expect_msg("Error reading RSA public key", code=0, present=False)
  251. test_private_keys("rsa_key.priv")
  252. test_private_key_var("PrivateKeyFile", "rsa_key.priv")
  253. ctx = test("test rsa public key in tinc.conf")
  254. test_rsa_public_key_file_var(ctx, "tinc.conf")
  255. ctx = test("test rsa public key in hosts/")
  256. test_rsa_public_key_file_var(ctx, "hosts", ctx.node.name)
  257. ctx = test("warn about missing RSA private key if public key is present")
  258. os.remove(ctx.rsa_priv)
  259. ctx.expect_msg("public RSA key was found but no private key", code=0)
  260. ctx = test("warn about missing RSA public key")
  261. remove_pem(ctx.host)
  262. ctx.expect_msg("No (usable) public RSA", code=0)
  263. check.not_in("BEGIN RSA PUBLIC KEY", read_text(ctx.host))
  264. ctx = test("fix missing RSA public key on --force")
  265. remove_pem(ctx.host)
  266. ctx.expect_msg("Wrote RSA public key", force=True, code=0)
  267. check.is_in("BEGIN RSA PUBLIC KEY", read_text(ctx.host))
  268. ctx = test("RSA PublicKey + PrivateKey must work")
  269. os.remove(ctx.rsa_priv)
  270. remove_pem(ctx.host)
  271. append_line(ctx.conf, f"PrivateKey = {RSA_D}")
  272. append_line(ctx.host, f"PublicKey = {RSA_N}")
  273. ctx.expect_msg("no (usable) public RSA", code=0, present=False)
  274. ctx = test("RSA PrivateKey without PublicKey must warn")
  275. os.remove(ctx.rsa_priv)
  276. remove_pem(ctx.host)
  277. append_line(ctx.conf, f"PrivateKey = {RSA_D}")
  278. ctx.expect_msg("PrivateKey used but no PublicKey found", code=0)
  279. ctx = test("warn about missing EC private key if public key is present")
  280. os.remove(ctx.ec_priv)
  281. ctx.expect_msg("public Ed25519 key was found but no private key", code=0)
  282. ctx = test("fix broken RSA public key with --force")
  283. host_lines = read_lines(ctx.host)
  284. del host_lines[1]
  285. write_lines(ctx.host, host_lines)
  286. ctx.expect_msg("old key(s) found and disabled", force=True, code=0)
  287. ctx.node.cmd("fsck")
  288. ctx = test("fix missing RSA public key with --force")
  289. remove_pem(ctx.host)
  290. ctx.expect_msg("No (usable) public RSA key found", force=True, code=0)
  291. ctx.node.cmd("fsck")
  292. if run_permission_checks:
  293. ctx = test("warn about unsafe permissions on tinc.conf with PrivateKey")
  294. os.remove(ctx.rsa_priv)
  295. append_line(ctx.conf, f"PrivateKey = {RSA_D}")
  296. append_line(ctx.host, f"PublicKey = {RSA_N}")
  297. os.chmod(ctx.conf, 0o666)
  298. ctx.expect_msg("unsafe file permissions", code=0)