123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- #!@PYTHON@
- # This file is part of GNUnet.
- # (C) 2010, 2018 Christian Grothoff (and other contributing authors)
- #
- # GNUnet is free software: you can redistribute it and/or modify it
- # under the terms of the GNU Affero General Public License as published
- # by the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # GNUnet is distributed in the hope that it will be useful, but
- # WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- #
- # SPDX-License-Identifier: AGPL3.0-or-later
- #
- # Testcase for gnunet-peerinfo
- import os
- import re
- import subprocess
- import sys
- import shutil
- import time
- class pexpect (object):
- def __init__(self):
- super(pexpect, self).__init__()
- def spawn(self, stdin, arglist, *pargs, **kwargs):
- env = kwargs.pop('env', None)
- if env is None:
- env = os.environ.copy()
- # This messes up some testcases, disable log redirection
- env.pop('GNUNET_FORCE_LOGFILE', None)
- self.proc = subprocess.Popen(arglist, *pargs, env=env, **kwargs)
- if self.proc is None:
- print("Failed to spawn a process {0}".format(arglist))
- sys.exit(1)
- if stdin is not None:
- self.stdo, self.stde = self.proc.communicate(stdin)
- else:
- self.stdo, self.stde = self.proc.communicate()
- return self.proc
- def expect(self, s, r, flags=0):
- stream = self.stdo if s == 'stdout' else self.stde
- if isinstance(r, str):
- if r == "EOF":
- if len(stream) == 0:
- return True
- else:
- print("Failed to find `{1}' in {0}, which is `{2}' ({3})".format(s, r, stream, len(stream)))
- sys.exit(2)
- raise ValueError("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format(r))
- m = r.search(stream.decode(), flags)
- if not m:
- print("Failed to find `{1}' in {0}, which is is `{2}'".format(s, r.pattern, stream))
- sys.exit(2)
- stream = stream[m.end():]
- if s == 'stdout':
- self.stdo = stream
- else:
- self.stde = stream
- return m
- def read(self, s, size=-1):
- stream = self.stdo if s == 'stdout' else self.stde
- result = ""
- if size < 0:
- result = stream
- new_stream = ""
- else:
- result = stream[0:size]
- new_stream = stream[size:]
- if s == 'stdout':
- self.stdo = new_stream
- else:
- self.stde = new_stream
- return result
|