register_new_matrix_user.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. # Copyright 2018 New Vector
  3. # Copyright 2021 The Matrix.org Foundation C.I.C.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import argparse
  17. import getpass
  18. import hashlib
  19. import hmac
  20. import logging
  21. import sys
  22. from typing import Callable, Optional
  23. import requests
  24. import yaml
  25. def request_registration(
  26. user: str,
  27. password: str,
  28. server_location: str,
  29. shared_secret: str,
  30. admin: bool = False,
  31. user_type: Optional[str] = None,
  32. _print: Callable[[str], None] = print,
  33. exit: Callable[[int], None] = sys.exit,
  34. ) -> None:
  35. url = "%s/_synapse/admin/v1/register" % (server_location.rstrip("/"),)
  36. # Get the nonce
  37. r = requests.get(url, verify=False)
  38. if r.status_code != 200:
  39. _print("ERROR! Received %d %s" % (r.status_code, r.reason))
  40. if 400 <= r.status_code < 500:
  41. try:
  42. _print(r.json()["error"])
  43. except Exception:
  44. pass
  45. return exit(1)
  46. nonce = r.json()["nonce"]
  47. mac = hmac.new(key=shared_secret.encode("utf8"), digestmod=hashlib.sha1)
  48. mac.update(nonce.encode("utf8"))
  49. mac.update(b"\x00")
  50. mac.update(user.encode("utf8"))
  51. mac.update(b"\x00")
  52. mac.update(password.encode("utf8"))
  53. mac.update(b"\x00")
  54. mac.update(b"admin" if admin else b"notadmin")
  55. if user_type:
  56. mac.update(b"\x00")
  57. mac.update(user_type.encode("utf8"))
  58. hex_mac = mac.hexdigest()
  59. data = {
  60. "nonce": nonce,
  61. "username": user,
  62. "password": password,
  63. "mac": hex_mac,
  64. "admin": admin,
  65. "user_type": user_type,
  66. }
  67. _print("Sending registration request...")
  68. r = requests.post(url, json=data, verify=False)
  69. if r.status_code != 200:
  70. _print("ERROR! Received %d %s" % (r.status_code, r.reason))
  71. if 400 <= r.status_code < 500:
  72. try:
  73. _print(r.json()["error"])
  74. except Exception:
  75. pass
  76. return exit(1)
  77. _print("Success!")
  78. def register_new_user(
  79. user: str,
  80. password: str,
  81. server_location: str,
  82. shared_secret: str,
  83. admin: Optional[bool],
  84. user_type: Optional[str],
  85. ) -> None:
  86. if not user:
  87. try:
  88. default_user: Optional[str] = getpass.getuser()
  89. except Exception:
  90. default_user = None
  91. if default_user:
  92. user = input("New user localpart [%s]: " % (default_user,))
  93. if not user:
  94. user = default_user
  95. else:
  96. user = input("New user localpart: ")
  97. if not user:
  98. print("Invalid user name")
  99. sys.exit(1)
  100. if not password:
  101. password = getpass.getpass("Password: ")
  102. if not password:
  103. print("Password cannot be blank.")
  104. sys.exit(1)
  105. confirm_password = getpass.getpass("Confirm password: ")
  106. if password != confirm_password:
  107. print("Passwords do not match")
  108. sys.exit(1)
  109. if admin is None:
  110. admin_inp = input("Make admin [no]: ")
  111. if admin_inp in ("y", "yes", "true"):
  112. admin = True
  113. else:
  114. admin = False
  115. request_registration(
  116. user, password, server_location, shared_secret, bool(admin), user_type
  117. )
  118. def main() -> None:
  119. logging.captureWarnings(True)
  120. parser = argparse.ArgumentParser(
  121. description="Used to register new users with a given homeserver when"
  122. " registration has been disabled. The homeserver must be"
  123. " configured with the 'registration_shared_secret' option"
  124. " set."
  125. )
  126. parser.add_argument(
  127. "-u",
  128. "--user",
  129. default=None,
  130. help="Local part of the new user. Will prompt if omitted.",
  131. )
  132. parser.add_argument(
  133. "-p",
  134. "--password",
  135. default=None,
  136. help="New password for user. Will prompt if omitted.",
  137. )
  138. parser.add_argument(
  139. "-t",
  140. "--user_type",
  141. default=None,
  142. help="User type as specified in synapse.api.constants.UserTypes",
  143. )
  144. admin_group = parser.add_mutually_exclusive_group()
  145. admin_group.add_argument(
  146. "-a",
  147. "--admin",
  148. action="store_true",
  149. help=(
  150. "Register new user as an admin. "
  151. "Will prompt if --no-admin is not set either."
  152. ),
  153. )
  154. admin_group.add_argument(
  155. "--no-admin",
  156. action="store_true",
  157. help=(
  158. "Register new user as a regular user. "
  159. "Will prompt if --admin is not set either."
  160. ),
  161. )
  162. group = parser.add_mutually_exclusive_group(required=True)
  163. group.add_argument(
  164. "-c",
  165. "--config",
  166. type=argparse.FileType("r"),
  167. help="Path to server config file. Used to read in shared secret.",
  168. )
  169. group.add_argument(
  170. "-k", "--shared-secret", help="Shared secret as defined in server config file."
  171. )
  172. parser.add_argument(
  173. "server_url",
  174. default="https://localhost:8448",
  175. nargs="?",
  176. help="URL to use to talk to the homeserver. Defaults to "
  177. " 'https://localhost:8448'.",
  178. )
  179. args = parser.parse_args()
  180. if "config" in args and args.config:
  181. config = yaml.safe_load(args.config)
  182. secret = config.get("registration_shared_secret", None)
  183. if not secret:
  184. print("No 'registration_shared_secret' defined in config.")
  185. sys.exit(1)
  186. else:
  187. secret = args.shared_secret
  188. admin = None
  189. if args.admin or args.no_admin:
  190. admin = args.admin
  191. register_new_user(
  192. args.user, args.password, args.server_url, secret, admin, args.user_type
  193. )
  194. if __name__ == "__main__":
  195. main()