check_signature.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. from syutil.crypto.jsonsign import verify_signed_json
  2. from syutil.crypto.signing_key import (
  3. decode_verify_key_bytes, write_signing_keys
  4. )
  5. from syutil.base64util import decode_base64
  6. import urllib2
  7. import json
  8. import sys
  9. import dns.resolver
  10. import pprint
  11. import argparse
  12. import logging
  13. def get_targets(server_name):
  14. if ":" in server_name:
  15. target, port = server_name.split(":")
  16. yield (target, int(port))
  17. return
  18. try:
  19. answers = dns.resolver.query("_matrix._tcp." + server_name, "SRV")
  20. for srv in answers:
  21. yield (srv.target, srv.port)
  22. except dns.resolver.NXDOMAIN:
  23. yield (server_name, 8448)
  24. def get_server_keys(server_name, target, port):
  25. url = "https://%s:%i/_matrix/key/v1" % (target, port)
  26. keys = json.load(urllib2.urlopen(url))
  27. verify_keys = {}
  28. for key_id, key_base64 in keys["verify_keys"].items():
  29. verify_key = decode_verify_key_bytes(key_id, decode_base64(key_base64))
  30. verify_signed_json(keys, server_name, verify_key)
  31. verify_keys[key_id] = verify_key
  32. return verify_keys
  33. def main():
  34. parser = argparse.ArgumentParser()
  35. parser.add_argument("signature_name")
  36. parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'),
  37. default=sys.stdin)
  38. args = parser.parse_args()
  39. logging.basicConfig()
  40. server_name = args.signature_name
  41. keys = {}
  42. for target, port in get_targets(server_name):
  43. try:
  44. keys = get_server_keys(server_name, target, port)
  45. print "Using keys from https://%s:%s/_matrix/key/v1" % (target, port)
  46. write_signing_keys(sys.stdout, keys.values())
  47. break
  48. except:
  49. logging.exception("Error talking to %s:%s", target, port)
  50. json_to_check = json.load(args.input_json)
  51. print "Checking JSON:"
  52. for key_id in json_to_check["signatures"][args.signature_name]:
  53. try:
  54. key = keys[key_id]
  55. verify_signed_json(json_to_check, args.signature_name, key)
  56. print "PASS %s" % (key_id,)
  57. except:
  58. logging.exception("Check for key %s failed" % (key_id,))
  59. print "FAIL %s" % (key_id,)
  60. if __name__ == '__main__':
  61. main()