check_auth.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. from __future__ import print_function
  2. import argparse
  3. import itertools
  4. import json
  5. import sys
  6. from mock import Mock
  7. from synapse.api.auth import Auth
  8. from synapse.events import FrozenEvent
  9. def check_auth(auth, auth_chain, events):
  10. auth_chain.sort(key=lambda e: e.depth)
  11. auth_map = {e.event_id: e for e in auth_chain}
  12. create_events = {}
  13. for e in auth_chain:
  14. if e.type == "m.room.create":
  15. create_events[e.room_id] = e
  16. for e in itertools.chain(auth_chain, events):
  17. auth_events_list = [auth_map[i] for i, _ in e.auth_events]
  18. auth_events = {(e.type, e.state_key): e for e in auth_events_list}
  19. auth_events[("m.room.create", "")] = create_events[e.room_id]
  20. try:
  21. auth.check(e, auth_events=auth_events)
  22. except Exception as ex:
  23. print("Failed:", e.event_id, e.type, e.state_key)
  24. print("Auth_events:", auth_events)
  25. print(ex)
  26. print(json.dumps(e.get_dict(), sort_keys=True, indent=4))
  27. # raise
  28. print("Success:", e.event_id, e.type, e.state_key)
  29. if __name__ == "__main__":
  30. parser = argparse.ArgumentParser()
  31. parser.add_argument(
  32. "json", nargs="?", type=argparse.FileType("r"), default=sys.stdin
  33. )
  34. args = parser.parse_args()
  35. js = json.load(args.json)
  36. auth = Auth(Mock())
  37. check_auth(
  38. auth,
  39. [FrozenEvent(d) for d in js["auth_chain"]],
  40. [FrozenEvent(d) for d in js.get("pdus", [])],
  41. )