tail-synapse.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. import collections
  2. import json
  3. import sys
  4. import time
  5. import requests
  6. Entry = collections.namedtuple("Entry", "name position rows")
  7. ROW_TYPES = {}
  8. def row_type_for_columns(name, column_names):
  9. column_names = tuple(column_names)
  10. row_type = ROW_TYPES.get((name, column_names))
  11. if row_type is None:
  12. row_type = collections.namedtuple(name, column_names)
  13. ROW_TYPES[(name, column_names)] = row_type
  14. return row_type
  15. def parse_response(content):
  16. streams = json.loads(content)
  17. result = {}
  18. for name, value in streams.items():
  19. row_type = row_type_for_columns(name, value["field_names"])
  20. position = value["position"]
  21. rows = [row_type(*row) for row in value["rows"]]
  22. result[name] = Entry(name, position, rows)
  23. return result
  24. def replicate(server, streams):
  25. return parse_response(
  26. requests.get(
  27. server + "/_synapse/replication", verify=False, params=streams
  28. ).content
  29. )
  30. def main():
  31. server = sys.argv[1]
  32. streams = None
  33. while not streams:
  34. try:
  35. streams = {
  36. row.name: row.position
  37. for row in replicate(server, {"streams": "-1"})["streams"].rows
  38. }
  39. except requests.exceptions.ConnectionError:
  40. time.sleep(0.1)
  41. while True:
  42. try:
  43. results = replicate(server, streams)
  44. except Exception:
  45. sys.stdout.write("connection_lost(" + repr(streams) + ")\n")
  46. break
  47. for update in results.values():
  48. for row in update.rows:
  49. sys.stdout.write(repr(row) + "\n")
  50. streams[update.name] = update.position
  51. if __name__ == "__main__":
  52. main()