123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- import re
- import urllib2
- import httplib
- import logging
- from urlparse import urlparse
- from xml.dom.minidom import parseString
- from xml.parsers.expat import ExpatError
- from gevent import socket
- import gevent
- # Relevant UPnP spec:
- # http://www.upnp.org/specs/gw/UPnP-gw-WANIPConnection-v1-Service.pdf
- # General TODOs:
- # Handle 0 or >1 IGDs
- class UpnpError(Exception):
- pass
- class IGDError(UpnpError):
- """
- Signifies a problem with the IGD.
- """
- pass
- REMOVE_WHITESPACE = re.compile(r'>\s*<')
- def perform_m_search(local_ip):
- """
- Broadcast a UDP SSDP M-SEARCH packet and return response.
- """
- search_target = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
- ssdp_request = ''.join(
- ['M-SEARCH * HTTP/1.1\r\n',
- 'HOST: 239.255.255.250:1900\r\n',
- 'MAN: "ssdp:discover"\r\n',
- 'MX: 2\r\n',
- 'ST: {0}\r\n'.format(search_target),
- '\r\n']
- )
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- sock.bind((local_ip, 0))
- sock.sendto(ssdp_request, ('239.255.255.250', 1900))
- if local_ip == "127.0.0.1":
- sock.settimeout(1)
- else:
- sock.settimeout(5)
- try:
- return sock.recv(2048)
- except socket.error:
- raise UpnpError("No reply from IGD using {} as IP".format(local_ip))
- finally:
- sock.close()
- def _retrieve_location_from_ssdp(response):
- """
- Parse raw HTTP response to retrieve the UPnP location header
- and return a ParseResult object.
- """
- parsed_headers = re.findall(r'(?P<name>.*?): (?P<value>.*?)\r\n', response)
- header_locations = [header[1]
- for header in parsed_headers
- if header[0].lower() == 'location']
- if len(header_locations) < 1:
- raise IGDError('IGD response does not contain a "location" header.')
- return urlparse(header_locations[0])
- def _retrieve_igd_profile(url):
- """
- Retrieve the device's UPnP profile.
- """
- try:
- return urllib2.urlopen(url.geturl(), timeout=5).read().decode('utf-8')
- except socket.error:
- raise IGDError('IGD profile query timed out')
- def _get_first_child_data(node):
- """
- Get the text value of the first child text node of a node.
- """
- return node.childNodes[0].data
- def _parse_igd_profile(profile_xml):
- """
- Traverse the profile xml DOM looking for either
- WANIPConnection or WANPPPConnection and return
- the 'controlURL' and the service xml schema.
- """
- try:
- dom = parseString(profile_xml)
- except ExpatError as e:
- raise IGDError(
- 'Unable to parse IGD reply: {0} \n\n\n {1}'.format(profile_xml, e))
- service_types = dom.getElementsByTagName('serviceType')
- for service in service_types:
- if _get_first_child_data(service).find('WANIPConnection') > 0 or \
- _get_first_child_data(service).find('WANPPPConnection') > 0:
- try:
- control_url = _get_first_child_data(
- service.parentNode.getElementsByTagName('controlURL')[0])
- upnp_schema = _get_first_child_data(service).split(':')[-2]
- return control_url, upnp_schema
- except IndexError:
- # Pass the error because any error here should raise the
- # that's specified outside the for loop.
- pass
- raise IGDError(
- 'Could not find a control url or UPNP schema in IGD response.')
- # add description
- def _get_local_ips():
- local_ips = []
- try:
- # get local ip using UDP and a broadcast address
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
- # Not using <broadcast> because gevents getaddrinfo doesn't like that
- # using port 1 as per hobbldygoop's comment about port 0 not working on osx:
- # https://github.com/sirMackk/ZeroNet/commit/fdcd15cf8df0008a2070647d4d28ffedb503fba2#commitcomment-9863928
- s.connect(('239.255.255.250', 1))
- local_ips.append(s.getsockname()[0])
- except:
- pass
- # Get ip by using UDP and a normal address (google dns ip)
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(('8.8.8.8', 0))
- local_ips.append(s.getsockname()[0])
- except:
- pass
- # Get ip by '' hostname . Not supported on all platforms.
- try:
- local_ips += socket.gethostbyname_ex('')[2]
- except:
- pass
- # Delete duplicates
- local_ips = list(set(local_ips))
- # Probably we looking for an ip starting with 192
- local_ips = sorted(local_ips, key=lambda a: a.startswith("192"), reverse=True)
- return local_ips
- def _create_open_message(local_ip,
- port,
- description="UPnPPunch",
- protocol="TCP",
- upnp_schema='WANIPConnection'):
- """
- Build a SOAP AddPortMapping message.
- """
- soap_message = """<?xml version="1.0"?>
- <s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
- <s:Body>
- <u:AddPortMapping xmlns:u="urn:schemas-upnp-org:service:{upnp_schema}:1">
- <NewRemoteHost></NewRemoteHost>
- <NewExternalPort>{port}</NewExternalPort>
- <NewProtocol>{protocol}</NewProtocol>
- <NewInternalPort>{port}</NewInternalPort>
- <NewInternalClient>{host_ip}</NewInternalClient>
- <NewEnabled>1</NewEnabled>
- <NewPortMappingDescription>{description}</NewPortMappingDescription>
- <NewLeaseDuration>0</NewLeaseDuration>
- </u:AddPortMapping>
- </s:Body>
- </s:Envelope>""".format(port=port,
- protocol=protocol,
- host_ip=local_ip,
- description=description,
- upnp_schema=upnp_schema)
- return (REMOVE_WHITESPACE.sub('><', soap_message), 'AddPortMapping')
- def _create_close_message(local_ip,
- port,
- description=None,
- protocol='TCP',
- upnp_schema='WANIPConnection'):
- soap_message = """<?xml version="1.0"?>
- <s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
- <s:Body>
- <u:DeletePortMapping xmlns:u="urn:schemas-upnp-org:service:{upnp_schema}:1">
- <NewRemoteHost></NewRemoteHost>
- <NewExternalPort>{port}</NewExternalPort>
- <NewProtocol>{protocol}</NewProtocol>
- </u:DeletePortMapping>
- </s:Body>
- </s:Envelope>""".format(port=port,
- protocol=protocol,
- upnp_schema=upnp_schema)
- return (REMOVE_WHITESPACE.sub('><', soap_message), 'DeletePortMapping')
- def _parse_for_errors(soap_response):
- logging.debug(soap_response.status)
- if soap_response.status >= 400:
- response_data = soap_response.read()
- logging.debug(response_data)
- try:
- err_dom = parseString(response_data)
- err_code = _get_first_child_data(err_dom.getElementsByTagName(
- 'errorCode')[0])
- err_msg = _get_first_child_data(
- err_dom.getElementsByTagName('errorDescription')[0]
- )
- except Exception as err:
- raise IGDError(
- 'Unable to parse SOAP error: {0}. Got: "{1}"'.format(
- err, response_data))
- raise IGDError(
- 'SOAP request error: {0} - {1}'.format(err_code, err_msg)
- )
- return soap_response
- def _send_soap_request(location, upnp_schema, control_path, soap_fn,
- soap_message):
- """
- Send out SOAP request to UPnP device and return a response.
- """
- headers = {
- 'SOAPAction': (
- '"urn:schemas-upnp-org:service:{schema}:'
- '1#{fn_name}"'.format(schema=upnp_schema, fn_name=soap_fn)
- ),
- 'Content-Type': 'text/xml'
- }
- logging.debug("Sending UPnP request to {0}:{1}...".format(
- location.hostname, location.port))
- conn = httplib.HTTPConnection(location.hostname, location.port)
- conn.request('POST', control_path, soap_message, headers)
- response = conn.getresponse()
- conn.close()
- return _parse_for_errors(response)
- def _collect_idg_data(ip_addr):
- idg_data = {}
- idg_response = perform_m_search(ip_addr)
- idg_data['location'] = _retrieve_location_from_ssdp(idg_response)
- idg_data['control_path'], idg_data['upnp_schema'] = _parse_igd_profile(
- _retrieve_igd_profile(idg_data['location']))
- return idg_data
- def _send_requests(messages, location, upnp_schema, control_path):
- responses = [_send_soap_request(location, upnp_schema, control_path,
- message_tup[1], message_tup[0])
- for message_tup in messages]
- if all(rsp.status == 200 for rsp in responses):
- return
- raise UpnpError('Sending requests using UPnP failed.')
- def _orchestrate_soap_request(ip, port, msg_fn, desc=None, protos=("TCP", "UDP")):
- logging.debug("Trying using local ip: %s" % ip)
- idg_data = _collect_idg_data(ip)
- soap_messages = [
- msg_fn(ip, port, desc, proto, idg_data['upnp_schema'])
- for proto in protos
- ]
- _send_requests(soap_messages, **idg_data)
- def _communicate_with_igd(port=15441,
- desc="UpnpPunch",
- retries=3,
- fn=_create_open_message,
- protos=("TCP", "UDP")):
- """
- Manage sending a message generated by 'fn'.
- """
- local_ips = _get_local_ips()
- success = False
- def job(local_ip):
- for retry in range(retries):
- try:
- _orchestrate_soap_request(local_ip, port, fn, desc, protos)
- return True
- except Exception as e:
- logging.debug('Upnp request using "{0}" failed: {1}'.format(local_ip, e))
- gevent.sleep(1)
- return False
- threads = []
- for local_ip in local_ips:
- job_thread = gevent.spawn(job, local_ip)
- threads.append(job_thread)
- gevent.sleep(0.1)
- if any([thread.value for thread in threads]):
- success = True
- break
- # Wait another 10sec for competition or any positibe result
- for _ in range(10):
- all_done = all([thread.value is not None for thread in threads])
- any_succeed = any([thread.value for thread in threads])
- if all_done or any_succeed:
- break
- gevent.sleep(1)
- if any([thread.value for thread in threads]):
- success = True
- if not success:
- raise UpnpError(
- 'Failed to communicate with igd using port {0} on local machine after {1} tries.'.format(
- port, retries))
- def ask_to_open_port(port=15441, desc="UpnpPunch", retries=3, protos=("TCP", "UDP")):
- logging.debug("Trying to open port %d." % port)
- _communicate_with_igd(port=port,
- desc=desc,
- retries=retries,
- fn=_create_open_message,
- protos=protos)
- def ask_to_close_port(port=15441, desc="UpnpPunch", retries=3, protos=("TCP", "UDP")):
- logging.debug("Trying to close port %d." % port)
- # retries=1 because multiple successes cause 500 response and failure
- _communicate_with_igd(port=port,
- desc=desc,
- retries=retries,
- fn=_create_close_message,
- protos=protos)
- if __name__ == "__main__":
- from gevent import monkey
- monkey.patch_all()
- logging.getLogger().setLevel(logging.DEBUG)
- import time
- s = time.time()
- print "Opening port..."
- print ask_to_open_port(15443, "ZeroNet", protos=["TCP"])
- print "Done in", time.time() - s
- print "Closing port..."
- print ask_to_close_port(15443, "ZeroNet", protos=["TCP"])
- print "Done in", time.time() - s
|