dynamicEndpoints.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. #!/usr/bin/env python2
  2. """
  3. dynamicEndpoints.py: make cjdns reliably connect to remote nodes with dynamic IP
  4. addresses, identified by a DNS name.
  5. Requires a config file with a section for each dynamic-IP node, like this:
  6. [lhjs0njqtvh1z4p2922bbyp2mksmyzf5lb63kvs3ppy78y1dj130.k]
  7. hostname: verge.info.tm
  8. port: 6324
  9. password: ns6vn00hw0buhrtc4wbk8sv230
  10. The section name (in square brackets) is the public key of the node. Then the
  11. hostname, port, and peering password for the node are given.
  12. By default, this program looks up the current Internet IP of each node defined
  13. in the config file, and add that node at that IP to the local cjdns instance.
  14. Unless the --noWait option is given, or the $nowait environment variable is
  15. true, the program then continues running, waiting for cjdns to log messages
  16. about those peers being unresponsive and updating the peers' Internet IP
  17. addresses as needed.
  18. If cjdns dies while the program is monitoring for messages, the program will
  19. hang indefinitely.
  20. Requires that the $HOME/.cjdnsadmin file be correctly set up. See
  21. cjdnsadminmaker.py if that is not the case.
  22. """
  23. # You may redistribute this program and/or modify it under the terms of
  24. # the GNU General Public License as published by the Free Software Foundation,
  25. # either version 3 of the License, or (at your option) any later version.
  26. #
  27. # This program is distributed in the hope that it will be useful,
  28. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  29. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  30. # GNU General Public License for more details.
  31. #
  32. # You should have received a copy of the GNU General Public License
  33. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  34. from cjdnsadmin.cjdnsadmin import connectWithAdminInfo;
  35. from cjdnsadmin.publicToIp6 import PublicToIp6_convert;
  36. from cjdnsadmin.bencode import *
  37. import sys
  38. import socket, re
  39. import select
  40. import time
  41. import os
  42. import pwd
  43. import grp
  44. import logging
  45. import argparse
  46. import atexit
  47. import ConfigParser
  48. # This holds a regex that matches the message we get from the roiuter when it
  49. # sees an unresponsive peer.
  50. IS_UNRESPONSIVE = re.compile(
  51. "Pinging unresponsive peer \\[(.*\\.k)\\] lag \\[.*\\]")
  52. # Make sure that it works
  53. assert(IS_UNRESPONSIVE.match("Pinging unresponsive peer " +
  54. "[6fmmn3qurcjg6d8hplq1rrcsspfhvm1900s13f3p5bv2bb4f4mm0.k] lag [207147]"))
  55. # What file do these messages come from? TODO: don't depend so tightly on the
  56. # other end of the codebase. Use the API to watch peers.
  57. MESSAGE_FILE = "InterfaceController.c"
  58. MESSAGE_LINE = 0 # All lines
  59. class Node(object):
  60. """
  61. Represents a remote peer. A remoter peer has:
  62. - A hostname to repeatedly look up
  63. - A port to connect to
  64. - A password to connect with
  65. - A public key to authenticate the remote peer with
  66. - A last known Internet IP address.
  67. """
  68. __slots__ = ("host","port","password","key","lastAddr")
  69. def __init__(self,host,port,password,key):
  70. self.host = host
  71. self.port = port
  72. self.password = password
  73. self.key = key
  74. self.lastAddr = None
  75. class DynamicEndpointWatcher(object):
  76. """
  77. Encapsulates all the stuff we need to actually keep an eye on our remote
  78. nodes and see if they change IPs. When a node with a dynamic IP is
  79. unresponsive, we look up its IP address and tell cjdns to go connect to it.
  80. """
  81. def __init__(self, cjdns, configuration):
  82. """
  83. Set up a new DynamicEndpointWatcher operating on the given CJDNS admin
  84. connection, using the specified ConfigParser parsed configuration.
  85. """
  86. # Keep the cjdns admin connection
  87. self.cjdns = cjdns
  88. # Holds a dict from public key string to Node object for the remote
  89. # peer, for all known nodes.
  90. self.nodes = dict()
  91. # Holds a dict from public key to Node object for those nodes which are
  92. # unresponsive.
  93. self.unresponsive = dict()
  94. # Holds a cjdns log message subscription to messages about unresponsive
  95. # nodes.
  96. self.sub = self.cjdns.AdminLog_subscribe(MESSAGE_LINE, MESSAGE_FILE,
  97. 'DEBUG')
  98. # Add nodes from the given ConfigParser parser.
  99. for section in configuration.sections():
  100. # Each section is named with a node key, and contains a
  101. # hostname, port, and password.
  102. peerHostname = configuration.get(section, "hostname")
  103. peerPort = configuration.get(section, "port")
  104. peerPassword = configuration.get(section, "password")
  105. # Add the node
  106. self.addNode(peerHostname, peerPort, peerPassword, section)
  107. if self.sub['error'] == 'none':
  108. # We successfully subscribed to messages.
  109. # When we die, try to unsubscribe
  110. atexit.register(self.stop)
  111. # Add all the nodes we're supposed to watch.
  112. for node in self.nodes.values():
  113. self.lookup(node)
  114. logging.info("{} peers added!".format(len(self.nodes)))
  115. else:
  116. logging.error(self.sub)
  117. def run(self):
  118. """
  119. Run forever, monitoring the peers we are responsible for.
  120. """
  121. logging.info("Watching for unresponsive peers")
  122. # Watch for any messages from our log message subscription.
  123. self.recieve(self.sub['txid'])
  124. def stop(self):
  125. """
  126. Unsubscribe from the admin log and close the connection to cjdns because
  127. we are shutting down the program. If we don't do this, cjdns might
  128. crash. If we do do it, cjdns might still crash.
  129. """
  130. # Unsubscribe cleanly
  131. logging.info("Unsubscribing from stream {}".format(
  132. self.sub['streamId']))
  133. unsub = self.cjdns.AdminLog_unsubscribe(self.sub['streamId'])
  134. if unsub['error'] != 'none':
  135. logging.error(unsub['error'])
  136. # Close the connection
  137. logging.info("Closing admin connection")
  138. self.cjdns.disconnect()
  139. def addNode(self, host, port, password, key):
  140. """
  141. Add a new remote peer with the given hostname, port, password, and
  142. public key. Does not automatically try to connect to the remote node.
  143. """
  144. self.nodes[key] = Node(host, port, password, key)
  145. def lookup(self, node):
  146. """
  147. Look up the current IP address for the given Node object, and tell the
  148. cjdns router to try to connect to it.
  149. """
  150. try:
  151. # Use AF_INET here to make sure we don't get an IPv6 address and try
  152. # to connect to it when the cjdns UDPInterface is using only IPv4.
  153. # TODO: Make cjdns bind its UDPInterface to IPv6 as well as IPv4.
  154. for info in socket.getaddrinfo(node.host,node.port,
  155. socket.AF_INET,socket.SOCK_DGRAM):
  156. # For every IP address the node has in DNS, with the port we
  157. # wanted attached...
  158. # Save the address we get in a field in the node.
  159. sockaddr = info[-1]
  160. node.lastAddr = sockaddr
  161. # Grab the IP:port string
  162. sockaddr = sockaddr[0] + ":" + str(sockaddr[1])
  163. # Announce we are going to connect
  164. logging.info("Connecting to {} at {}".format(
  165. PublicToIp6_convert(node.key), sockaddr))
  166. # Tell CJDNS to begin a UDPInterface connection to the given
  167. # IP:port, with the given public key and password. Always use
  168. # the 0th UDPInterface, which is the default.
  169. reply = self.cjdns.UDPInterface_beginConnection(
  170. password=node.password, publicKey=node.key,
  171. address=sockaddr)
  172. if reply["error"] != "none":
  173. # The router didn't like our request. Complain.
  174. logging.error(
  175. "Router refuses to connect to remote peer. {}".format(
  176. reply["error"]))
  177. # Maybe try the next address?
  178. break
  179. # Mark this node as no longer unresponsive
  180. try: del self.unresponsive[node.key]
  181. except KeyError: pass
  182. # Don't try any more addresses. Stop after the first.
  183. return
  184. except socket.gaierror as e:
  185. # The lookup failed at the OS level. Did we put in a bad hostname?
  186. logging.error("Could not resolve DNS name {}: {}".format(
  187. node.host, e))
  188. # If we get here, we found no addresses that worked.
  189. logging.error("No working addresses found for node {}".format(
  190. PublicToIp6_convert(node.key)))
  191. def doLog(self, message):
  192. """
  193. Process a log line from cjdns to see if it indicates that a peer we are
  194. responsible for is unresponsive.
  195. """
  196. logging.debug(message)
  197. # Short-circuit messages that start with the wrong l;etter and can't
  198. # possibly match.
  199. if message[0]!='P': return;
  200. # Test plausible messages against the regex
  201. p = IS_UNRESPONSIVE.match(message)
  202. # If they don't match, ignore them.
  203. if not p: return
  204. # Otherwise, get the key of the unresponsive node from the regex match
  205. # group.
  206. downKey = p.group(1)
  207. # And get the nodfe for that key
  208. node = self.nodes.get(downKey,None)
  209. if not node:
  210. # Complain we aren't responsible for that node.
  211. logging.debug("Unmonitored neighbor {} is down".format(
  212. PublicToIp6_convert(downKey)))
  213. return
  214. # Otherwise, announce we are doing our job.
  215. logging.warning("Monitored neighbor {} is down.".format(
  216. PublicToIp6_convert(downKey)))
  217. # If we are responsible for it, register it as unresponsive.
  218. self.unresponsive[downKey] = node
  219. # Go get its address and try reconnecting.
  220. self.lookup(node)
  221. def recieve(self, txid):
  222. """
  223. Loop forever porcessing messages from the cjdns router. Takes a txid
  224. pointing to the subscription to such messages.
  225. """
  226. while True:
  227. # Repeatedly get and process log messages.
  228. self.doLog(self.cjdns.getMessage(txid)["message"])
  229. def main(argv):
  230. """
  231. Run the program.
  232. """
  233. # Set up logging. See the logging module docs.
  234. logging.basicConfig(format="[%(asctime)s] %(levelname)s: %(message)s",
  235. level=logging.INFO)
  236. # Parse command-line arguments. Make sure to give our docstring as program
  237. # help.
  238. parser = argparse.ArgumentParser(description=__doc__,
  239. formatter_class=argparse.RawDescriptionHelpFormatter)
  240. parser.add_argument("configFile", type=argparse.FileType("r"),
  241. help="configuration file of hosts to read")
  242. parser.add_argument("--noWait", action="store_true",
  243. help="look up dynamic peers once and exit")
  244. parser.add_argument("--adminInfo",
  245. help="use this file to load the cjdns admin password")
  246. # Parse all the command-line arguments
  247. options = parser.parse_args(argv[1:])
  248. while True:
  249. try:
  250. # Connect to the router, using the specified admin info file, if
  251. # given.
  252. cjdns = connectWithAdminInfo(path=options.adminInfo)
  253. break
  254. except socket.error:
  255. # Connection probably refused. Retry in a bit
  256. logging.error("Error connecting to cjdns. Retrying in 1 minute...")
  257. time.sleep(60)
  258. # Drop root if we have it. We can't do it before we load the admin info
  259. # file, for the use case where that file is root-only.
  260. try:
  261. # Switch group to nogroup
  262. os.setgid(grp.getgrnam("nogroup").gr_gid)
  263. # Switch user to nobody
  264. os.setuid(pwd.getpwnam("nobody").pw_uid)
  265. # Announce we dropped privs
  266. logging.info("Dropped privileges: running as {}:{}".format(
  267. pwd.getpwuid(os.getuid())[0], grp.getgrgid(os.getgid())[0]))
  268. except OSError:
  269. # Complain we couldn't drop privs right
  270. logging.warning("Could not drop privileges: running as {}:{}".format(
  271. pwd.getpwuid(os.getuid())[0], grp.getgrgid(os.getgid())[0]))
  272. # Now we can load the config file. It is now required.
  273. # Maker a new parser to parse the config file
  274. parsedConfig = ConfigParser.SafeConfigParser()
  275. # Be case sensitive
  276. parsedConfig.optionxform = str
  277. # Read the config from the file
  278. parsedConfig.readfp(options.configFile)
  279. # Make a new watcher on the cjdroute connection, with the config from the
  280. # config file. This automatically looks up all the peers and tries to
  281. # connect to them once.
  282. watcher = DynamicEndpointWatcher(cjdns, parsedConfig)
  283. if options.noWait or os.environ.get('nowait', False):
  284. # We're not supposed to wait. Quit while we're ahead
  285. sys.exit(0)
  286. else:
  287. # Monitor for unresponsive nodes. This will loop until cjdns restarts,
  288. # at which point it will throw an exception.
  289. watcher.run()
  290. if __name__ == "__main__":
  291. # Run our main method
  292. sys.exit(main(sys.argv))