check_signature.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import argparse
  2. import json
  3. import logging
  4. import sys
  5. import urllib2
  6. import dns.resolver
  7. from signedjson.key import decode_verify_key_bytes, write_signing_keys
  8. from signedjson.sign import verify_signed_json
  9. from unpaddedbase64 import decode_base64
  10. def get_targets(server_name):
  11. if ":" in server_name:
  12. target, port = server_name.split(":")
  13. yield (target, int(port))
  14. return
  15. try:
  16. answers = dns.resolver.query("_matrix._tcp." + server_name, "SRV")
  17. for srv in answers:
  18. yield (srv.target, srv.port)
  19. except dns.resolver.NXDOMAIN:
  20. yield (server_name, 8448)
  21. def get_server_keys(server_name, target, port):
  22. url = "https://%s:%i/_matrix/key/v1" % (target, port)
  23. keys = json.load(urllib2.urlopen(url))
  24. verify_keys = {}
  25. for key_id, key_base64 in keys["verify_keys"].items():
  26. verify_key = decode_verify_key_bytes(key_id, decode_base64(key_base64))
  27. verify_signed_json(keys, server_name, verify_key)
  28. verify_keys[key_id] = verify_key
  29. return verify_keys
  30. def main():
  31. parser = argparse.ArgumentParser()
  32. parser.add_argument("signature_name")
  33. parser.add_argument(
  34. "input_json", nargs="?", type=argparse.FileType('r'), default=sys.stdin
  35. )
  36. args = parser.parse_args()
  37. logging.basicConfig()
  38. server_name = args.signature_name
  39. keys = {}
  40. for target, port in get_targets(server_name):
  41. try:
  42. keys = get_server_keys(server_name, target, port)
  43. print("Using keys from https://%s:%s/_matrix/key/v1" % (target, port))
  44. write_signing_keys(sys.stdout, keys.values())
  45. break
  46. except Exception:
  47. logging.exception("Error talking to %s:%s", target, port)
  48. json_to_check = json.load(args.input_json)
  49. print("Checking JSON:")
  50. for key_id in json_to_check["signatures"][args.signature_name]:
  51. try:
  52. key = keys[key_id]
  53. verify_signed_json(json_to_check, args.signature_name, key)
  54. print("PASS %s" % (key_id,))
  55. except Exception:
  56. logging.exception("Check for key %s failed" % (key_id,))
  57. print("FAIL %s" % (key_id,))
  58. if __name__ == '__main__':
  59. main()