123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- #
- # Project ___| | | | _ \| |
- # / __| | | | |_) | |
- # | (__| |_| | _ <| |___
- # \___|\___/|_| \_\_____|
- #
- # Copyright (C) 2017 - 2021, Daniel Stenberg, <daniel@haxx.se>, et al.
- #
- # This software is licensed as described in the file COPYING, which
- # you should have received as part of this distribution. The terms
- # are also available at https://curl.se/docs/copyright.html.
- #
- # You may opt to use, copy, modify, merge, publish, distribute and/or sell
- # copies of the Software, and permit persons to whom the Software is
- # furnished to do so, under the terms of the COPYING file.
- #
- # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
- # KIND, either express or implied.
- #
- """ A telnet server which negotiates"""
- from __future__ import (absolute_import, division, print_function,
- unicode_literals)
- import argparse
- import logging
- import os
- import sys
- from util import ClosingFileHandler
- if sys.version_info.major >= 3:
- import socketserver
- else:
- import SocketServer as socketserver
- log = logging.getLogger(__name__)
- HOST = "localhost"
- IDENT = "NTEL"
- # The strings that indicate the test framework is checking our aliveness
- VERIFIED_REQ = "verifiedserver"
- VERIFIED_RSP = "WE ROOLZ: {pid}"
- def telnetserver(options):
- """
- Starts up a TCP server with a telnet handler and serves DICT requests
- forever.
- """
- if options.pidfile:
- pid = os.getpid()
- # see tests/server/util.c function write_pidfile
- if os.name == "nt":
- pid += 65536
- with open(options.pidfile, "w") as f:
- f.write(str(pid))
- local_bind = (HOST, options.port)
- log.info("Listening on %s", local_bind)
- # Need to set the allow_reuse on the class, not on the instance.
- socketserver.TCPServer.allow_reuse_address = True
- server = socketserver.TCPServer(local_bind, NegotiatingTelnetHandler)
- server.serve_forever()
- return ScriptRC.SUCCESS
- class NegotiatingTelnetHandler(socketserver.BaseRequestHandler):
- """Handler class for Telnet connections.
- """
- def handle(self):
- """
- Negotiates options before reading data.
- """
- neg = Negotiator(self.request)
- try:
- # Send some initial negotiations.
- neg.send_do("NEW_ENVIRON")
- neg.send_will("NEW_ENVIRON")
- neg.send_dont("NAWS")
- neg.send_wont("NAWS")
- # Get the data passed through the negotiator
- data = neg.recv(1024)
- log.debug("Incoming data: %r", data)
- if VERIFIED_REQ.encode('utf-8') in data:
- log.debug("Received verification request from test framework")
- pid = os.getpid()
- # see tests/server/util.c function write_pidfile
- if os.name == "nt":
- pid += 65536
- response = VERIFIED_RSP.format(pid=pid)
- response_data = response.encode('utf-8')
- else:
- log.debug("Received normal request - echoing back")
- response_data = data.decode('utf-8').strip().encode('utf-8')
- if response_data:
- log.debug("Sending %r", response_data)
- self.request.sendall(response_data)
- except IOError:
- log.exception("IOError hit during request")
- class Negotiator(object):
- NO_NEG = 0
- START_NEG = 1
- WILL = 2
- WONT = 3
- DO = 4
- DONT = 5
- def __init__(self, tcp):
- self.tcp = tcp
- self.state = self.NO_NEG
- def recv(self, bytes):
- """
- Read bytes from TCP, handling negotiation sequences
- :param bytes: Number of bytes to read
- :return: a buffer of bytes
- """
- buffer = bytearray()
- # If we keep receiving negotiation sequences, we won't fill the buffer.
- # Keep looping while we can, and until we have something to give back
- # to the caller.
- while len(buffer) == 0:
- data = self.tcp.recv(bytes)
- if not data:
- # TCP failed to give us any data. Break out.
- break
- for byte_int in bytearray(data):
- if self.state == self.NO_NEG:
- self.no_neg(byte_int, buffer)
- elif self.state == self.START_NEG:
- self.start_neg(byte_int)
- elif self.state in [self.WILL, self.WONT, self.DO, self.DONT]:
- self.handle_option(byte_int)
- else:
- # Received an unexpected byte. Stop negotiations
- log.error("Unexpected byte %s in state %s",
- byte_int,
- self.state)
- self.state = self.NO_NEG
- return buffer
- def no_neg(self, byte_int, buffer):
- # Not negotiating anything thus far. Check to see if we
- # should.
- if byte_int == NegTokens.IAC:
- # Start negotiation
- log.debug("Starting negotiation (IAC)")
- self.state = self.START_NEG
- else:
- # Just append the incoming byte to the buffer
- buffer.append(byte_int)
- def start_neg(self, byte_int):
- # In a negotiation.
- log.debug("In negotiation (%s)",
- NegTokens.from_val(byte_int))
- if byte_int == NegTokens.WILL:
- # Client is confirming they are willing to do an option
- log.debug("Client is willing")
- self.state = self.WILL
- elif byte_int == NegTokens.WONT:
- # Client is confirming they are unwilling to do an
- # option
- log.debug("Client is unwilling")
- self.state = self.WONT
- elif byte_int == NegTokens.DO:
- # Client is indicating they can do an option
- log.debug("Client can do")
- self.state = self.DO
- elif byte_int == NegTokens.DONT:
- # Client is indicating they can't do an option
- log.debug("Client can't do")
- self.state = self.DONT
- else:
- # Received an unexpected byte. Stop negotiations
- log.error("Unexpected byte %s in state %s",
- byte_int,
- self.state)
- self.state = self.NO_NEG
- def handle_option(self, byte_int):
- if byte_int in [NegOptions.BINARY,
- NegOptions.CHARSET,
- NegOptions.SUPPRESS_GO_AHEAD,
- NegOptions.NAWS,
- NegOptions.NEW_ENVIRON]:
- log.debug("Option: %s", NegOptions.from_val(byte_int))
- # No further negotiation of this option needed. Reset the state.
- self.state = self.NO_NEG
- else:
- # Received an unexpected byte. Stop negotiations
- log.error("Unexpected byte %s in state %s",
- byte_int,
- self.state)
- self.state = self.NO_NEG
- def send_message(self, message_ints):
- self.tcp.sendall(bytearray(message_ints))
- def send_iac(self, arr):
- message = [NegTokens.IAC]
- message.extend(arr)
- self.send_message(message)
- def send_do(self, option_str):
- log.debug("Sending DO %s", option_str)
- self.send_iac([NegTokens.DO, NegOptions.to_val(option_str)])
- def send_dont(self, option_str):
- log.debug("Sending DONT %s", option_str)
- self.send_iac([NegTokens.DONT, NegOptions.to_val(option_str)])
- def send_will(self, option_str):
- log.debug("Sending WILL %s", option_str)
- self.send_iac([NegTokens.WILL, NegOptions.to_val(option_str)])
- def send_wont(self, option_str):
- log.debug("Sending WONT %s", option_str)
- self.send_iac([NegTokens.WONT, NegOptions.to_val(option_str)])
- class NegBase(object):
- @classmethod
- def to_val(cls, name):
- return getattr(cls, name)
- @classmethod
- def from_val(cls, val):
- for k in cls.__dict__.keys():
- if getattr(cls, k) == val:
- return k
- return "<unknown>"
- class NegTokens(NegBase):
- # The start of a negotiation sequence
- IAC = 255
- # Confirm willingness to negotiate
- WILL = 251
- # Confirm unwillingness to negotiate
- WONT = 252
- # Indicate willingness to negotiate
- DO = 253
- # Indicate unwillingness to negotiate
- DONT = 254
- # The start of sub-negotiation options.
- SB = 250
- # The end of sub-negotiation options.
- SE = 240
- class NegOptions(NegBase):
- # Binary Transmission
- BINARY = 0
- # Suppress Go Ahead
- SUPPRESS_GO_AHEAD = 3
- # NAWS - width and height of client
- NAWS = 31
- # NEW-ENVIRON - environment variables on client
- NEW_ENVIRON = 39
- # Charset option
- CHARSET = 42
- def get_options():
- parser = argparse.ArgumentParser()
- parser.add_argument("--port", action="store", default=9019,
- type=int, help="port to listen on")
- parser.add_argument("--verbose", action="store", type=int, default=0,
- help="verbose output")
- parser.add_argument("--pidfile", action="store",
- help="file name for the PID")
- parser.add_argument("--logfile", action="store",
- help="file name for the log")
- parser.add_argument("--srcdir", action="store", help="test directory")
- parser.add_argument("--id", action="store", help="server ID")
- parser.add_argument("--ipv4", action="store_true", default=0,
- help="IPv4 flag")
- return parser.parse_args()
- def setup_logging(options):
- """
- Set up logging from the command line options
- """
- root_logger = logging.getLogger()
- add_stdout = False
- formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s "
- "[{ident}] %(message)s"
- .format(ident=IDENT))
- # Write out to a logfile
- if options.logfile:
- handler = ClosingFileHandler(options.logfile)
- handler.setFormatter(formatter)
- handler.setLevel(logging.DEBUG)
- root_logger.addHandler(handler)
- else:
- # The logfile wasn't specified. Add a stdout logger.
- add_stdout = True
- if options.verbose:
- # Add a stdout logger as well in verbose mode
- root_logger.setLevel(logging.DEBUG)
- add_stdout = True
- else:
- root_logger.setLevel(logging.INFO)
- if add_stdout:
- stdout_handler = logging.StreamHandler(sys.stdout)
- stdout_handler.setFormatter(formatter)
- stdout_handler.setLevel(logging.DEBUG)
- root_logger.addHandler(stdout_handler)
- class ScriptRC(object):
- """Enum for script return codes"""
- SUCCESS = 0
- FAILURE = 1
- EXCEPTION = 2
- class ScriptException(Exception):
- pass
- if __name__ == '__main__':
- # Get the options from the user.
- options = get_options()
- # Setup logging using the user options
- setup_logging(options)
- # Run main script.
- try:
- rc = telnetserver(options)
- except Exception as e:
- log.exception(e)
- rc = ScriptRC.EXCEPTION
- if options.pidfile and os.path.isfile(options.pidfile):
- os.unlink(options.pidfile)
- log.info("Returning %d", rc)
- sys.exit(rc)
|