check_signature.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. from signedjson.sign import verify_signed_json
  2. from signedjson.key import decode_verify_key_bytes, write_signing_keys
  3. from unpaddedbase64 import decode_base64
  4. import urllib2
  5. import json
  6. import sys
  7. import dns.resolver
  8. import pprint
  9. import argparse
  10. import logging
  11. def get_targets(server_name):
  12. if ":" in server_name:
  13. target, port = server_name.split(":")
  14. yield (target, int(port))
  15. return
  16. try:
  17. answers = dns.resolver.query("_matrix._tcp." + server_name, "SRV")
  18. for srv in answers:
  19. yield (srv.target, srv.port)
  20. except dns.resolver.NXDOMAIN:
  21. yield (server_name, 8448)
  22. def get_server_keys(server_name, target, port):
  23. url = "https://%s:%i/_matrix/key/v1" % (target, port)
  24. keys = json.load(urllib2.urlopen(url))
  25. verify_keys = {}
  26. for key_id, key_base64 in keys["verify_keys"].items():
  27. verify_key = decode_verify_key_bytes(key_id, decode_base64(key_base64))
  28. verify_signed_json(keys, server_name, verify_key)
  29. verify_keys[key_id] = verify_key
  30. return verify_keys
  31. def main():
  32. parser = argparse.ArgumentParser()
  33. parser.add_argument("signature_name")
  34. parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'),
  35. default=sys.stdin)
  36. args = parser.parse_args()
  37. logging.basicConfig()
  38. server_name = args.signature_name
  39. keys = {}
  40. for target, port in get_targets(server_name):
  41. try:
  42. keys = get_server_keys(server_name, target, port)
  43. print "Using keys from https://%s:%s/_matrix/key/v1" % (target, port)
  44. write_signing_keys(sys.stdout, keys.values())
  45. break
  46. except:
  47. logging.exception("Error talking to %s:%s", target, port)
  48. json_to_check = json.load(args.input_json)
  49. print "Checking JSON:"
  50. for key_id in json_to_check["signatures"][args.signature_name]:
  51. try:
  52. key = keys[key_id]
  53. verify_signed_json(json_to_check, args.signature_name, key)
  54. print "PASS %s" % (key_id,)
  55. except:
  56. logging.exception("Check for key %s failed" % (key_id,))
  57. print "FAIL %s" % (key_id,)
  58. if __name__ == '__main__':
  59. main()