123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- #!@PYTHONEXE@
- #
- # This testcase simply checks that the DHT command-line tools work.
- # It launches a single peer, stores a value "testdata" under "testkey",
- # and then gives the system 50 ms to fetch it.
- #
- # This could fail if
- # - command line tool interfaces fail
- # - DHT plugins for storage are not installed / working
- # - block plugins for verification (the test plugin) is not installed
- #
- # The code does NOT depend on DHT routing or any actual P2P functionality.
- #
- import os
- import sys
- import shutil
- import re
- import subprocess
- import time
- import tempfile
- os.environ["PATH"] = "@bindirectory@" + ":" + os.environ["PATH"]
- if os.name == "nt":
- tmp = os.getenv("TEMP")
- else:
- tmp = "/tmp"
- if os.name == 'nt':
- get = './gnunet-dht-get.exe'
- put = './gnunet-dht-put.exe'
- arm = 'gnunet-arm.exe'
- else:
- get = './gnunet-dht-get'
- put = './gnunet-dht-put'
- arm = 'gnunet-arm'
- cfgfile = 'test_dht_api_peer1.conf'
- run_get = [get, '-c', cfgfile]
- run_put = [put, '-c', cfgfile]
- run_arm = [arm, '-c', cfgfile]
- debug = os.getenv('DEBUG')
- if debug:
- run_arm += [debug.split(' ')]
- def cleanup(exitcode):
- sys.exit(exitcode)
- def sub_run(args, want_stdo=True, want_stde=False, nofail=False):
- if want_stdo:
- stdo = subprocess.PIPE
- else:
- stdo = None
- if want_stde:
- stde = subprocess.PIPE
- else:
- stde = None
- p = subprocess.Popen(args, stdout=stdo, stderr=stde)
- stdo, stde = p.communicate()
- if not nofail:
- if p.returncode != 0:
- sys.exit(p.returncode)
- return (p.returncode, stdo, stde)
- def fail(result):
- print(result)
- r_arm(['-e'], want_stdo=False)
- cleanup(1)
- def r_something(to_run, extra_args, failure=None, normal=True, **kw):
- rc, stdo, stde = sub_run(to_run + extra_args, nofail=True, **kw)
- if failure is not None:
- failure(to_run + extra_args, rc, stdo, stde, normal)
- return (rc, stdo, stde)
- def r_arm(extra_args, **kw):
- return r_something(run_arm, extra_args, **kw)
- def r_get(extra_args, **kw):
- return r_something(run_get, extra_args, **kw)
- def r_put(extra_args, **kw):
- return r_something(run_put, extra_args, **kw)
- def end_arm_failure(command, rc, stdo, stde, normal):
- if normal:
- if rc != 0:
- fail(
- "FAIL: error running {}\nCommand output was:\n{}\n{}".format(
- command, stdo, stde
- )
- )
- else:
- if rc == 0:
- fail(
- "FAIL: expected error while running {}\nCommand output was:\n{}\n{}"
- .format(command, stdo, stde)
- )
- def print_only_failure(command, rc, stdo, stde, normal):
- if normal:
- if rc != 0:
- print(
- "FAIL: error running {}\nCommand output was:\n{}\n{}".format(
- command, stdo, stde
- )
- )
- cleanup(1)
- else:
- if rc == 0:
- print(
- "FAIL: expected error while running {}\nCommand output was:\n{}\n{}"
- .format(command, stdo, stde)
- )
- cleanup(1)
- print("TEST: Starting ARM...", end='')
- r_arm(['-s'], failure=end_arm_failure, want_stdo=False, want_stde=False)
- print("PASS")
- time.sleep(1)
- print("TEST: Testing put...", end='')
- r_put(['-k', 'testkey', '-d', 'testdata', '-t', '8'], failure=end_arm_failure)
- print("PASS")
- time.sleep(1)
- print("TEST: Testing get...", end='')
- rc, stdo, stde = r_get(['-k', 'testkey', '-T', '50 ms', '-t', '8'],
- want_stdo=True,
- failure=end_arm_failure)
- stdo = stdo.decode('utf-8').replace('\r', '').splitlines()
- expect = "Result 0, type 8:\ntestdata".splitlines()
- if len(stdo) != 2 or len(expect
- ) != 2 or stdo[0] != expect[0] or stdo[1] != expect[1]:
- fail("output `{}' differs from expected `{}'".format(stdo, expect))
- print("PASS")
- r_arm(['-e', '-d'], failure=print_only_failure)
|