gnunet_pyexpect.py.in 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. #!@PYTHONEXE@
  2. # This file is part of GNUnet.
  3. # (C) 2010, 2018 Christian Grothoff (and other contributing authors)
  4. #
  5. # GNUnet is free software: you can redistribute it and/or modify it
  6. # under the terms of the GNU Affero General Public License as published
  7. # by the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # GNUnet is distributed in the hope that it will be useful, but
  11. # WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. # Affero General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU Affero General Public License
  16. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. #
  18. # SPDX-License-Identifier: AGPL3.0-or-later
  19. #
  20. # Testcase for gnunet-peerinfo
  21. import os
  22. import re
  23. import subprocess
  24. import sys
  25. import shutil
  26. import time
  27. class pexpect(object):
  28. def __init__(self):
  29. super(pexpect, self).__init__()
  30. def spawn(self, stdin, arglist, *pargs, **kwargs):
  31. env = kwargs.pop('env', None)
  32. if env is None:
  33. env = os.environ.copy()
  34. # This messes up some testcases, disable log redirection
  35. env.pop('GNUNET_FORCE_LOGFILE', None)
  36. self.proc = subprocess.Popen(arglist, *pargs, env=env, **kwargs)
  37. if self.proc is None:
  38. print("Failed to spawn a process {0}".format(arglist))
  39. sys.exit(1)
  40. if stdin is not None:
  41. self.stdo, self.stde = self.proc.communicate(stdin)
  42. else:
  43. self.stdo, self.stde = self.proc.communicate()
  44. return self.proc
  45. def expect(self, s, r, flags=0):
  46. stream = self.stdo if s == 'stdout' else self.stde
  47. if isinstance(r, str):
  48. if r == "EOF":
  49. if len(stream) == 0:
  50. return True
  51. else:
  52. print(
  53. "Failed to find `{1}' in {0}, which is `{2}' ({3})".
  54. format(s, r, stream, len(stream))
  55. )
  56. sys.exit(2)
  57. raise ValueError(
  58. "Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'"
  59. .format(r)
  60. )
  61. m = r.search(stream.decode(), flags)
  62. if not m:
  63. print(
  64. "Failed to find `{1}' in {0}, which is is `{2}'".format(
  65. s, r.pattern, stream
  66. )
  67. )
  68. sys.exit(2)
  69. stream = stream[m.end():]
  70. if s == 'stdout':
  71. self.stdo = stream
  72. else:
  73. self.stde = stream
  74. return m
  75. def read(self, s, size=-1):
  76. stream = self.stdo if s == 'stdout' else self.stde
  77. result = ""
  78. if size < 0:
  79. result = stream
  80. new_stream = ""
  81. else:
  82. result = stream[0:size]
  83. new_stream = stream[size:]
  84. if s == 'stdout':
  85. self.stdo = new_stream
  86. else:
  87. self.stde = new_stream
  88. return result