dynamicEndpoints.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. #!/usr/bin/python2
  2. """
  3. dynamicEndpoints.py: make cjdns reliably connect to remote nodes with dynamic IP
  4. addresses, identified by a DNS name.
  5. Requires a config file with a section for each dynamic-IP node, like this:
  6. [lhjs0njqtvh1z4p2922bbyp2mksmyzf5lb63kvs3ppy78y1dj130.k]
  7. hostname: verge.info.tm
  8. port: 6324
  9. password: ns6vn00hw0buhrtc4wbk8sv230
  10. The section name (in square brackets) is the public key of the node. Then the
  11. hostname, port, and peering password for the node are given.
  12. By default, this program looks up the current Internet IP of each node defined
  13. in the config file, and add that node at that IP to the local cjdns instance.
  14. Unless the --noWait option is given, or the $nowait environment variable is
  15. true, the program then continues running, waiting for cjdns to log messages
  16. about those peers being unresponsive and updating the peers' Internet IP
  17. addresses as needed.
  18. If cjdns dies while the program is monitoring for messages, the program will
  19. hang indefinitely.
  20. Requires that the $HOME/.cjdnsadmin file be correctly set up. See
  21. cjdnsadminmaker.py if that is not the case.
  22. """
  23. # You may redistribute this program and/or modify it under the terms of
  24. # the GNU General Public License as published by the Free Software Foundation,
  25. # either version 3 of the License, or (at your option) any later version.
  26. #
  27. # This program is distributed in the hope that it will be useful,
  28. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  29. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  30. # GNU General Public License for more details.
  31. #
  32. # You should have received a copy of the GNU General Public License
  33. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  34. from cjdnsadmin.cjdnsadmin import connectWithAdminInfo;
  35. from cjdnsadmin.publicToIp6 import PublicToIp6_convert;
  36. from cjdnsadmin.bencode import *
  37. import sys
  38. import socket, re
  39. import select
  40. import time
  41. import os
  42. import logging
  43. import argparse
  44. import ConfigParser
  45. # This holds a regex that matches the message we get from the roiuter when it
  46. # sees an unresponsive peer.
  47. IS_UNRESPONSIVE = re.compile(
  48. "Pinging unresponsive peer \\[(.*\\.k)\\] lag \\[.*\\]")
  49. # Make sure that it works
  50. assert(IS_UNRESPONSIVE.match("Pinging unresponsive peer " +
  51. "[6fmmn3qurcjg6d8hplq1rrcsspfhvm1900s13f3p5bv2bb4f4mm0.k] lag [207147]"))
  52. # What file and line do these messages come from? TODO: don't depend so tightly
  53. # on the other end of the codebase. Use the API to watch peers.
  54. MESSAGE_FILE = "InterfaceController.c"
  55. MESSAGE_LINE = 252
  56. class Node(object):
  57. """
  58. Represents a remote peer. A remoter peer has:
  59. - A hostname to repeatedly look up
  60. - A port to connect to
  61. - A password to connect with
  62. - A public key to authenticate the remote peer with
  63. - A last known Internet IP address.
  64. """
  65. __slots__ = ("host","port","password","key","lastAddr")
  66. def __init__(self,host,port,password,key):
  67. self.host = host
  68. self.port = port
  69. self.password = password
  70. self.key = key
  71. self.lastAddr = None
  72. class DynamicEndpointWatcher(object):
  73. """
  74. Encapsulates all the stuff we need to actually keep an eye on our remote
  75. nodes and see if they change IPs. When a node with a dynamic IP is
  76. unresponsive, we look up its IP address and tell cjdns to go connect to it.
  77. """
  78. def __init__(self, cjdns, configuration):
  79. """
  80. Set up a new DynamicEndpointWatcher operating on the given CJDNS admin
  81. connection, using the specified ConfigParser parsed configuration.
  82. """
  83. # Keep the cjdns admin connection
  84. self.cjdns = cjdns
  85. # Holds a dict from public key string to Node object for the remote
  86. # peer, for all known nodes.
  87. self.nodes = dict()
  88. # Holds a dict from public key to Node object for those nodes which are
  89. # unresponsive.
  90. self.unresponsive = dict()
  91. # Holds a cjdns log message subscription to messages about unresponsive
  92. # nodes. Note that this points specifically to a source line number in
  93. # the cjdns C code and is thus going to break whenever anyone touches
  94. # that file. TODO: check node responsiveness through the API.
  95. self.sub = self.cjdns.AdminLog_subscribe(MESSAGE_LINE, MESSAGE_FILE,
  96. 'DEBUG')
  97. # Add nodes from the given ConfigParser parser.
  98. for section in configuration.sections():
  99. # Each section is named with a node key, and contains a
  100. # hostname, port, and password.
  101. peerHostname = configuration.get(section, "hostname")
  102. peerPort = configuration.get(section, "port")
  103. peerPassword = configuration.get(section, "password")
  104. # Add the node
  105. self.addNode(peerHostname, peerPort, peerPassword, section)
  106. if self.sub['error'] == 'none':
  107. # We successfully subscribed to messages. Add all the nodes we're
  108. # supposed to watch.
  109. for node in self.nodes.values():
  110. self.lookup(node)
  111. logging.info("{} peers added!".format(len(self.nodes)))
  112. else:
  113. logging.error(self.sub)
  114. def run(self):
  115. """
  116. Run forever, monitoring the peers we are responsible for.
  117. """
  118. logging.info("Watching for unresponsive peers")
  119. # Watch for any messages from our log message subscription.
  120. self.recieve(self.sub['txid'])
  121. def addNode(self, host, port, password, key):
  122. """
  123. Add a new remote peer with the given hostname, port, password, and
  124. public key. Does not automatically try to connect to the remote node.
  125. """
  126. self.nodes[key] = Node(host, port, password, key)
  127. def lookup(self, node):
  128. """
  129. Look up the current IP address for the given Node object, and tell the
  130. cjdns router to try to connect to it.
  131. """
  132. try:
  133. # Use AF_INET here to make sure we don't get an IPv6 address and try
  134. # to connect to it when the cjdns UDPInterface is using only IPv4.
  135. # TODO: Make cjdns bind its UDPInterface to IPv6 as well as IPv4.
  136. for info in socket.getaddrinfo(node.host,node.port,
  137. socket.AF_INET,socket.SOCK_DGRAM):
  138. # For every IP address the node has in DNS, with the port we
  139. # wanted attached...
  140. # Save the address we get in a field in the node.
  141. sockaddr = info[-1]
  142. node.lastAddr = sockaddr
  143. # Grab the IP:port string
  144. sockaddr = sockaddr[0] + ":" + str(sockaddr[1])
  145. # Announce we are going to connect
  146. logging.info("Connecting to {} at {}".format(
  147. PublicToIp6_convert(node.key), sockaddr))
  148. # Tell CJDNS to begin a UDPInterface connection to the given
  149. # IP:port, with the given public key and password. Always use
  150. # the 0th UDPInterface, which is the default.
  151. reply = self.cjdns.UDPInterface_beginConnection(
  152. password=node.password, publicKey=node.key,
  153. address=sockaddr)
  154. if reply["error"] != "none":
  155. # The router didn't like our request. Complain.
  156. logging.error(
  157. "Router refuses to connect to remote peer. {}".format(
  158. reply["error"]))
  159. # Maybe try the next address?
  160. break
  161. # Mark this node as no longer unresponsive
  162. try: del self.unresponsive[node.key]
  163. except KeyError: pass
  164. # Don't try any more addresses. Stop after the first.
  165. return
  166. except socket.gaierror as e:
  167. # The lookup failed at the OS level. Did we put in a bad hostname?
  168. logging.error("Could not resolve DNS name {}: {}".format(
  169. node.host, e))
  170. # If we get here, we found no addresses that worked.
  171. logging.error("No working addresses found for node {}".format(
  172. PublicToIp6_convert(node.key)))
  173. def doLog(self, message):
  174. """
  175. Process a log line from cjdns to see if it indicates that a peer we are
  176. responsible for is unresponsive.
  177. """
  178. logging.debug(message)
  179. # Short-circuit messages that start with the wrong l;etter and can't
  180. # possibly match.
  181. if message[0]!='P': return;
  182. # Test plausible messages against the regex
  183. p = IS_UNRESPONSIVE.match(message)
  184. # If they don't match, ignore them.
  185. if not p: return
  186. # Otherwise, get the key of the unresponsive node from the regex match
  187. # group.
  188. downKey = p.group(1)
  189. # And get the nodfe for that key
  190. node = self.nodes.get(downKey,None)
  191. if not node:
  192. # Complain we aren't responsible for that node.
  193. logging.warning("Unmonitored neighbor {} is down".format(
  194. PublicToIp6_convert(downKey)))
  195. return
  196. # Otherwise, announce we are doing our job.
  197. logging.warning("Monitored neighbor {} is down.".format(
  198. PublicToIp6_convert(downKey)))
  199. # If we are responsible for it, register it as unresponsive.
  200. self.unresponsive[downKey] = node
  201. # Go get its address and try reconnecting.
  202. self.lookup(node)
  203. def recieve(self, txid):
  204. """
  205. Loop forever porcessing messages from the cjdns router. Takes a txid
  206. pointing to the subscription to such messages.
  207. """
  208. while True:
  209. # Repeatedly get and process log messages.
  210. self.doLog(self.cjdns.getMessage(txid)["message"])
  211. def main(argv):
  212. """
  213. Run the program.
  214. """
  215. # Set up logging. See the logging module docs.
  216. logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
  217. # Parse command-line arguments. Make sure to give our docstring as program
  218. # help.
  219. parser = argparse.ArgumentParser(description=__doc__,
  220. formatter_class=argparse.RawDescriptionHelpFormatter)
  221. parser.add_argument("configFile", type=argparse.FileType("r"),
  222. help="configuration file of hosts to read")
  223. parser.add_argument("--noWait", action="store_true",
  224. help="look up dynamic peers once and exit")
  225. # Parse all the command-line arguments
  226. options = parser.parse_args(argv[1:])
  227. # Now we can load the config file. It is now required.
  228. # Maker a new parser to parse the config file
  229. parsedConfig = ConfigParser.SafeConfigParser()
  230. # Be case sensitive
  231. parsedConfig.optionxform = str
  232. # Read the config from the file
  233. parsedConfig.readfp(options.configFile)
  234. # Connect to the router
  235. cjdns = connectWithAdminInfo()
  236. # Make a new watcher on that connection, with the config from the config
  237. # file. This automatically looks up all the peers and tries to connect to
  238. # them once.
  239. watcher = DynamicEndpointWatcher(cjdns, parsedConfig)
  240. if options.noWait or os.environ.get('nowait', False):
  241. # We're not supposed to wait. Quit while we're ahead
  242. sys.exit(0)
  243. else:
  244. # Monitor for unresponsive nodes. This will loop until cjdns restarts,
  245. # at which point it will keep looping but won't actually work anymore.
  246. watcher.run()
  247. if __name__ == "__main__":
  248. # Run our main method
  249. sys.exit(main(sys.argv))