check_auth.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. from synapse.events import FrozenEvent
  2. from synapse.api.auth import Auth
  3. from mock import Mock
  4. import argparse
  5. import itertools
  6. import json
  7. import sys
  8. def check_auth(auth, auth_chain, events):
  9. auth_chain.sort(key=lambda e: e.depth)
  10. auth_map = {
  11. e.event_id: e
  12. for e in auth_chain
  13. }
  14. create_events = {}
  15. for e in auth_chain:
  16. if e.type == "m.room.create":
  17. create_events[e.room_id] = e
  18. for e in itertools.chain(auth_chain, events):
  19. auth_events_list = [auth_map[i] for i, _ in e.auth_events]
  20. auth_events = {
  21. (e.type, e.state_key): e
  22. for e in auth_events_list
  23. }
  24. auth_events[("m.room.create", "")] = create_events[e.room_id]
  25. try:
  26. auth.check(e, auth_events=auth_events)
  27. except Exception as ex:
  28. print "Failed:", e.event_id, e.type, e.state_key
  29. print "Auth_events:", auth_events
  30. print ex
  31. print json.dumps(e.get_dict(), sort_keys=True, indent=4)
  32. # raise
  33. print "Success:", e.event_id, e.type, e.state_key
  34. if __name__ == '__main__':
  35. parser = argparse.ArgumentParser()
  36. parser.add_argument(
  37. 'json',
  38. nargs='?',
  39. type=argparse.FileType('r'),
  40. default=sys.stdin,
  41. )
  42. args = parser.parse_args()
  43. js = json.load(args.json)
  44. auth = Auth(Mock())
  45. check_auth(
  46. auth,
  47. [FrozenEvent(d) for d in js["auth_chain"]],
  48. [FrozenEvent(d) for d in js.get("pdus", [])],
  49. )