123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278 |
- # -*- coding: utf-8 -*-
- """
- (c) 2015-2018 - Copyright Red Hat Inc
- Authors:
- Pierre-Yves Chibon <pingou@pingoured.fr>
- """
- from __future__ import unicode_literals, absolute_import
- import imp
- import json
- import logging
- import os
- import re
- import resource
- import shutil
- import subprocess
- import sys
- import tempfile
- import time
- import unittest
- from io import open, StringIO
- logging.basicConfig(stream=sys.stderr)
- from bs4 import BeautifulSoup
- from contextlib import contextmanager
- from datetime import date
- from datetime import datetime
- from datetime import timedelta
- from functools import wraps
- from six.moves.urllib.parse import urlparse, parse_qs
- import mock
- import pygit2
- import redis
- import six
- from bs4 import BeautifulSoup
- from celery.app.task import EagerResult
- from sqlalchemy import create_engine
- from sqlalchemy.orm import sessionmaker
- from sqlalchemy.orm import scoped_session
- if six.PY2:
- # Always enable performance counting for tests
- os.environ["PAGURE_PERFREPO"] = "true"
- sys.path.insert(
- 0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")
- )
- import pagure
- import pagure.api
- from pagure.api.ci import jenkins
- import pagure.flask_app
- import pagure.lib.git
- import pagure.lib.model
- import pagure.lib.query
- import pagure.lib.tasks_mirror
- import pagure.perfrepo as perfrepo
- from pagure.config import config as pagure_config, reload_config
- from pagure.lib.repo import PagureRepo
- HERE = os.path.join(os.path.dirname(os.path.abspath(__file__)))
- LOG = logging.getLogger(__name__)
- LOG.setLevel(logging.INFO)
- PAGLOG = logging.getLogger("pagure")
- PAGLOG.setLevel(logging.CRITICAL)
- PAGLOG.handlers = []
- if "PYTHONPATH" not in os.environ:
- os.environ["PYTHONPATH"] = os.path.normpath(os.path.join(HERE, "../"))
- CONFIG_TEMPLATE = """
- GIT_FOLDER = '%(path)s/repos'
- ENABLE_DOCS = %(enable_docs)s
- ENABLE_TICKETS = %(enable_tickets)s
- REMOTE_GIT_FOLDER = '%(path)s/remotes'
- DB_URL = '%(dburl)s'
- ALLOW_PROJECT_DOWAIT = True
- PAGURE_CI_SERVICES = ['jenkins']
- EMAIL_SEND = False
- TESTING = True
- GIT_FOLDER = '%(path)s/repos'
- REQUESTS_FOLDER = '%(path)s/repos/requests'
- TICKETS_FOLDER = %(tickets_folder)r
- DOCS_FOLDER = %(docs_folder)r
- REPOSPANNER_PSEUDO_FOLDER = '%(path)s/repos/pseudo'
- ATTACHMENTS_FOLDER = '%(path)s/attachments'
- BROKER_URL = 'redis+socket://%(global_path)s/broker'
- CELERY_CONFIG = {
- "task_always_eager": True,
- #"task_eager_propagates": True,
- }
- GIT_AUTH_BACKEND = '%(authbackend)s'
- TEST_AUTH_STATUS = '%(path)s/testauth_status.json'
- REPOBRIDGE_BINARY = '%(repobridge_binary)s'
- REPOSPANNER_NEW_REPO = %(repospanner_new_repo)s
- REPOSPANNER_NEW_REPO_ADMIN_OVERRIDE = %(repospanner_admin_override)s
- REPOSPANNER_NEW_FORK = %(repospanner_new_fork)s
- REPOSPANNER_ADMIN_MIGRATION = %(repospanner_admin_migration)s
- REPOSPANNER_REGIONS = {
- 'default': {'url': 'https://repospanner.localhost.localdomain:%(repospanner_gitport)s',
- 'repo_prefix': 'pagure/',
- 'hook': None,
- 'ca': '%(path)s/repospanner/pki/ca.crt',
- 'admin_cert': {'cert': '%(path)s/repospanner/pki/admin.crt',
- 'key': '%(path)s/repospanner/pki/admin.key'},
- 'push_cert': {'cert': '%(path)s/repospanner/pki/pagure.crt',
- 'key': '%(path)s/repospanner/pki/pagure.key'}}
- }
- """
- # The Celery docs warn against using task_always_eager:
- # http://docs.celeryproject.org/en/latest/userguide/testing.html
- # but that warning is only valid when testing the async nature of the task, not
- # what the task actually does.
- LOG.info("BUILD_ID: %s", os.environ.get("BUILD_ID"))
- WAIT_REGEX = re.compile(r"""var _url = '(\/wait\/[a-z0-9-]+\??.*)'""")
- def get_wait_target(html):
- """ This parses the window.location out of the HTML for the wait page. """
- found = WAIT_REGEX.findall(html)
- if len(found) == 0:
- raise Exception("Not able to get wait target in %s" % html)
- return found[-1]
- def get_post_target(html):
- """ This parses the wait page form to get the POST url. """
- soup = BeautifulSoup(html, "html.parser")
- form = soup.find(id="waitform")
- if not form:
- raise Exception("Not able to get the POST url in %s" % html)
- return form.get("action")
- def get_post_args(html):
- """ This parses the wait page for the hidden arguments of the form. """
- soup = BeautifulSoup(html, "html.parser")
- output = {}
- inputs = soup.find_all("input")
- if not inputs:
- raise Exception("Not able to get the POST arguments in %s" % html)
- for inp in inputs:
- if inp.get("type") == "hidden":
- output[inp.get("name")] = inp.get("value")
- return output
- def create_maybe_waiter(method, getter):
- def maybe_waiter(*args, **kwargs):
- """ A wrapper for self.app.get()/.post() that will resolve wait's """
- result = method(*args, **kwargs)
- # Handle the POST wait case
- form_url = None
- form_args = None
- try:
- result_text = result.get_data(as_text=True)
- except UnicodeDecodeError:
- return result
- if 'id="waitform"' in result_text:
- form_url = get_post_target(result_text)
- form_args = get_post_args(result_text)
- form_args["csrf_token"] = result_text.split(
- 'name="csrf_token" type="hidden" value="'
- )[1].split('">')[0]
- count = 0
- while "We are waiting for your task to finish." in result_text:
- # Resolve wait page
- target_url = get_wait_target(result_text)
- if count > 10:
- time.sleep(0.5)
- else:
- time.sleep(0.1)
- result = getter(target_url, follow_redirects=True)
- try:
- result_text = result.get_data(as_text=True)
- except UnicodeDecodeError:
- return result
- if count > 50:
- raise Exception("Had to wait too long")
- else:
- if form_url and form_args:
- return method(form_url, data=form_args, follow_redirects=True)
- return result
- return maybe_waiter
- @contextmanager
- def user_set(APP, user, keep_get_user=False):
- """ Set the provided user as fas_user in the provided application."""
- # Hack used to remove the before_request function set by
- # flask.ext.fas_openid.FAS which otherwise kills our effort to set a
- # flask.g.fas_user.
- from flask import appcontext_pushed, g
- keep = []
- for meth in APP.before_request_funcs[None]:
- if "flask_fas_openid.FAS" in str(meth):
- continue
- keep.append(meth)
- APP.before_request_funcs[None] = keep
- def handler(sender, **kwargs):
- g.fas_user = user
- g.fas_session_id = b"123"
- g.authenticated = True
- old_get_user = pagure.flask_app._get_user
- if not keep_get_user:
- pagure.flask_app._get_user = mock.MagicMock(
- return_value=pagure.lib.model.User()
- )
- with appcontext_pushed.connected_to(handler, APP):
- yield
- pagure.flask_app._get_user = old_get_user
- tests_state = {
- "path": tempfile.mkdtemp(prefix="pagure-tests-"),
- "broker": None,
- "broker_client": None,
- "results": {},
- }
- def create_user(session, username, fullname, emails):
- """ Create an user with the provided information.
- Note that `emails` should be a list of emails.
- """
- user = pagure.lib.model.User(
- user=username,
- fullname=fullname,
- password=b"foo",
- default_email=emails[0],
- )
- session.add(user)
- session.flush()
- for email in emails:
- item = pagure.lib.model.UserEmail(user_id=user.id, email=email)
- session.add(item)
- session.commit()
- def _populate_db(session):
- # Create a couple of users
- create_user(
- session, "pingou", "PY C", ["bar@pingou.com", "foo@pingou.com"]
- )
- create_user(session, "foo", "foo bar", ["foo@bar.com"])
- def store_eager_results(*args, **kwargs):
- """A wrapper for EagerResult that stores the instance."""
- result = EagerResult(*args, **kwargs)
- tests_state["results"][result.id] = result
- return result
- def setUp():
- # In order to save time during local test execution, we create sqlite DB
- # file only once and then we populate it and empty it for every test case
- # (as opposed to creating DB file for every test case).
- session = pagure.lib.model.create_tables(
- "sqlite:///%s/db.sqlite" % tests_state["path"],
- acls=pagure_config.get("ACLS", {}),
- )
- tests_state["db_session"] = session
- # Create a broker
- broker_url = os.path.join(tests_state["path"], "broker")
- tests_state["broker"] = broker = subprocess.Popen(
- [
- "/usr/bin/redis-server",
- "--unixsocket",
- broker_url,
- "--port",
- "0",
- "--loglevel",
- "warning",
- "--logfile",
- "/dev/null",
- ],
- stdout=None,
- stderr=None,
- )
- broker.poll()
- if broker.returncode is not None:
- raise Exception("Broker failed to start")
- tests_state["broker_client"] = redis.Redis(unix_socket_path=broker_url)
- # Store the EagerResults to be able to retrieve them later
- tests_state["eg_patcher"] = mock.patch("celery.app.task.EagerResult")
- eg_mock = tests_state["eg_patcher"].start()
- eg_mock.side_effect = store_eager_results
- def tearDown():
- tests_state["db_session"].close()
- tests_state["eg_patcher"].stop()
- broker = tests_state["broker"]
- broker.kill()
- broker.wait()
- shutil.rmtree(tests_state["path"])
- class SimplePagureTest(unittest.TestCase):
- """
- Simple Test class that does not set a broker/worker
- """
- populate_db = True
- config_values = {}
- @mock.patch("pagure.lib.notify.fedmsg_publish", mock.MagicMock())
- def __init__(self, method_name="runTest"):
- """ Constructor. """
- unittest.TestCase.__init__(self, method_name)
- self.session = None
- self.path = None
- self.gitrepo = None
- self.gitrepos = None
- def perfMaxWalks(self, max_walks, max_steps):
- """ Check that we have not performed too many walks/steps. """
- num_walks = 0
- num_steps = 0
- for reqstat in perfrepo.REQUESTS:
- for walk in reqstat["walks"].values():
- num_walks += 1
- num_steps += walk["steps"]
- self.assertLessEqual(
- num_walks,
- max_walks,
- "%s git repo walks performed, at most %s allowed"
- % (num_walks, max_walks),
- )
- self.assertLessEqual(
- num_steps,
- max_steps,
- "%s git repo steps performed, at most %s allowed"
- % (num_steps, max_steps),
- )
- def perfReset(self):
- """ Reset perfrepo stats. """
- perfrepo.reset_stats()
- perfrepo.REQUESTS = []
- def setUp(self):
- if self.path:
- # This prevents test state leakage.
- # This should be None if the previous runs' tearDown didn't finish,
- # leaving behind a self.path.
- # If we continue in this case, not only did the previous worker and
- # redis instances not exit, we also might accidentally use the
- # old database connection.
- # @pingou, don't delete this again... :)
- raise Exception("Previous test failed!")
- self.perfReset()
- self.path = tempfile.mkdtemp(prefix="pagure-tests-path-")
- LOG.debug("Testdir: %s", self.path)
- for folder in ["repos", "forks", "releases", "remotes", "attachments"]:
- os.mkdir(os.path.join(self.path, folder))
- if hasattr(pagure.lib.query, "REDIS") and pagure.lib.query.REDIS:
- pagure.lib.query.REDIS.connection_pool.disconnect()
- pagure.lib.query.REDIS = None
- # Database
- self._prepare_db()
- # Write a config file
- config_values = {
- "path": self.path,
- "dburl": self.dbpath,
- "enable_docs": True,
- "docs_folder": "%s/repos/docs" % self.path,
- "enable_tickets": True,
- "tickets_folder": "%s/repos/tickets" % self.path,
- "global_path": tests_state["path"],
- "authbackend": "gitolite3",
- "repobridge_binary": "/usr/libexec/repobridge",
- "repospanner_gitport": str(8443 + sys.version_info.major),
- "repospanner_new_repo": "None",
- "repospanner_admin_override": "False",
- "repospanner_new_fork": "True",
- "repospanner_admin_migration": "False",
- }
- config_values.update(self.config_values)
- self.config_values = config_values
- config_path = os.path.join(self.path, "config")
- if not os.path.exists(config_path):
- with open(config_path, "w") as f:
- f.write(CONFIG_TEMPLATE % self.config_values)
- os.environ["PAGURE_CONFIG"] = config_path
- pagure_config.update(reload_config())
- imp.reload(pagure.lib.tasks)
- imp.reload(pagure.lib.tasks_mirror)
- imp.reload(pagure.lib.tasks_services)
- self._app = pagure.flask_app.create_app({"DB_URL": self.dbpath})
- self.app = self._app.test_client()
- self.gr_patcher = mock.patch("pagure.lib.tasks.get_result")
- gr_mock = self.gr_patcher.start()
- gr_mock.side_effect = lambda tid: tests_state["results"][tid]
- # Refresh the DB session
- self.session = pagure.lib.query.create_session(self.dbpath)
- def tearDown(self):
- self.gr_patcher.stop()
- self.session.rollback()
- self._clear_database()
- # Remove testdir
- try:
- shutil.rmtree(self.path)
- except:
- # Sometimes there is a race condition that makes deleting the folder
- # fail during the first attempt. So just try a second time if that's
- # the case.
- shutil.rmtree(self.path)
- self.path = None
- del self.app
- del self._app
- def shortDescription(self):
- doc = self.__str__() + ": " + self._testMethodDoc
- return doc or None
- def _prepare_db(self):
- self.dbpath = "sqlite:///%s" % os.path.join(
- tests_state["path"], "db.sqlite"
- )
- self.session = tests_state["db_session"]
- pagure.lib.model.create_default_status(
- self.session, acls=pagure_config.get("ACLS", {})
- )
- if self.populate_db:
- _populate_db(self.session)
- def _clear_database(self):
- tables = reversed(pagure.lib.model_base.BASE.metadata.sorted_tables)
- if self.dbpath.startswith("postgresql"):
- self.session.execute(
- "TRUNCATE %s CASCADE" % ", ".join([t.name for t in tables])
- )
- elif self.dbpath.startswith("sqlite"):
- for table in tables:
- self.session.execute("DELETE FROM %s" % table.name)
- elif self.dbpath.startswith("mysql"):
- self.session.execute("SET FOREIGN_KEY_CHECKS = 0")
- for table in tables:
- self.session.execute("TRUNCATE %s" % table.name)
- self.session.execute("SET FOREIGN_KEY_CHECKS = 1")
- self.session.commit()
- def set_auth_status(self, value):
- """ Set the return value for the test auth """
- with open(
- os.path.join(self.path, "testauth_status.json"), "w"
- ) as statusfile:
- statusfile.write(six.u(json.dumps(value)))
- def get_csrf(self, url="/new", output=None):
- """Retrieve a CSRF token from given URL."""
- if output is None:
- output = self.app.get(url)
- self.assertEqual(output.status_code, 200)
- return (
- output.get_data(as_text=True)
- .split('name="csrf_token" type="hidden" value="')[1]
- .split('">')[0]
- )
- def get_wtforms_version(self):
- """Returns the wtforms version as a tuple."""
- import wtforms
- wtforms_v = wtforms.__version__.split(".")
- for idx, val in enumerate(wtforms_v):
- try:
- val = int(val)
- except ValueError:
- pass
- wtforms_v[idx] = val
- return tuple(wtforms_v)
- def get_arrow_version(self):
- """ Returns the arrow version as a tuple."""
- import arrow
- arrow_v = arrow.__version__.split(".")
- for idx, val in enumerate(arrow_v):
- try:
- val = int(val)
- except ValueError:
- pass
- arrow_v[idx] = val
- return tuple(arrow_v)
- def assertURLEqual(self, url_1, url_2):
- url_parsed_1 = list(urlparse(url_1))
- url_parsed_1[4] = parse_qs(url_parsed_1[4])
- url_parsed_2 = list(urlparse(url_2))
- url_parsed_2[4] = parse_qs(url_parsed_2[4])
- return self.assertListEqual(url_parsed_1, url_parsed_2)
- def assertJSONEqual(self, json_1, json_2):
- return self.assertEqual(json.loads(json_1), json.loads(json_2))
- class Modeltests(SimplePagureTest):
- """ Model tests. """
- def setUp(self): # pylint: disable=invalid-name
- """ Set up the environnment, ran before every tests. """
- # Clean up test performance info
- super(Modeltests, self).setUp()
- self.app.get = create_maybe_waiter(self.app.get, self.app.get)
- self.app.post = create_maybe_waiter(self.app.post, self.app.get)
- # Refresh the DB session
- self.session = pagure.lib.query.create_session(self.dbpath)
- def tearDown(self): # pylint: disable=invalid-name
- """ Remove the test.db database if there is one. """
- tests_state["broker_client"].flushall()
- super(Modeltests, self).tearDown()
- def create_project_full(self, projectname, extra=None):
- """ Create a project via the API.
- This makes sure that the repo is fully setup the way a normal new
- project would be, with hooks and all setup.
- """
- headers = {"Authorization": "token aaabbbcccddd"}
- data = {"name": projectname, "description": "A test repo"}
- if extra:
- data.update(extra)
- # Valid request
- output = self.app.post("/api/0/new/", data=data, headers=headers)
- self.assertEqual(output.status_code, 200)
- data = json.loads(output.get_data(as_text=True))
- self.assertDictEqual(
- data, {"message": 'Project "%s" created' % projectname}
- )
- class FakeGroup(object): # pylint: disable=too-few-public-methods
- """ Fake object used to make the FakeUser object closer to the
- expectations.
- """
- def __init__(self, name):
- """ Constructor.
- :arg name: the name given to the name attribute of this object.
- """
- self.name = name
- self.group_type = "cla"
- class FakeUser(object): # pylint: disable=too-few-public-methods
- """ Fake user used to test the fedocallib library. """
- def __init__(
- self, groups=None, username="username", cla_done=True, id=None
- ):
- """ Constructor.
- :arg groups: list of the groups in which this fake user is
- supposed to be.
- """
- if isinstance(groups, six.string_types):
- groups = [groups]
- self.id = id
- self.groups = groups or []
- self.user = username
- self.username = username
- self.name = username
- self.email = "foo@bar.com"
- self.default_email = "foo@bar.com"
- self.approved_memberships = [
- FakeGroup("packager"),
- FakeGroup("design-team"),
- ]
- self.dic = {}
- self.dic["timezone"] = "Europe/Paris"
- self.login_time = datetime.utcnow()
- self.cla_done = cla_done
- def __getitem__(self, key):
- return self.dic[key]
- def create_locks(session, project):
- for ltype in ("WORKER", "WORKER_TICKET", "WORKER_REQUEST"):
- lock = pagure.lib.model.ProjectLock(
- project_id=project.id, lock_type=ltype
- )
- session.add(lock)
- def create_projects(session, is_fork=False, user_id=1, hook_token_suffix=""):
- """ Create some projects in the database. """
- item = pagure.lib.model.Project(
- user_id=user_id, # pingou
- name="test",
- is_fork=is_fork,
- parent_id=1 if is_fork else None,
- description="test project #1",
- hook_token="aaabbbccc" + hook_token_suffix,
- )
- item.close_status = ["Invalid", "Insufficient data", "Fixed", "Duplicate"]
- session.add(item)
- session.flush()
- create_locks(session, item)
- item = pagure.lib.model.Project(
- user_id=user_id, # pingou
- name="test2",
- is_fork=is_fork,
- parent_id=2 if is_fork else None,
- description="test project #2",
- hook_token="aaabbbddd" + hook_token_suffix,
- )
- item.close_status = ["Invalid", "Insufficient data", "Fixed", "Duplicate"]
- session.add(item)
- session.flush()
- create_locks(session, item)
- item = pagure.lib.model.Project(
- user_id=user_id, # pingou
- name="test3",
- is_fork=is_fork,
- parent_id=3 if is_fork else None,
- description="namespaced test project",
- hook_token="aaabbbeee" + hook_token_suffix,
- namespace="somenamespace",
- )
- item.close_status = ["Invalid", "Insufficient data", "Fixed", "Duplicate"]
- session.add(item)
- session.flush()
- create_locks(session, item)
- session.commit()
- def create_projects_git(folder, bare=False):
- """ Create some projects in the database. """
- repos = []
- for project in [
- "test.git",
- "test2.git",
- os.path.join("somenamespace", "test3.git"),
- ]:
- repo_path = os.path.join(folder, project)
- repos.append(repo_path)
- if not os.path.exists(repo_path):
- os.makedirs(repo_path)
- pygit2.init_repository(repo_path, bare=bare)
- return repos
- def create_tokens(session, user_id=1, project_id=1, suffix=None):
- """ Create some tokens for the project in the database. """
- token = "aaabbbcccddd"
- if suffix:
- token += suffix
- item = pagure.lib.model.Token(
- id=token,
- user_id=user_id,
- project_id=project_id,
- expiration=datetime.utcnow() + timedelta(days=30),
- )
- session.add(item)
- token = "foo_token"
- if suffix:
- token += suffix
- item = pagure.lib.model.Token(
- id=token,
- user_id=user_id,
- project_id=project_id,
- expiration=datetime.utcnow() + timedelta(days=30),
- )
- session.add(item)
- token = "expired_token"
- if suffix:
- token += suffix
- item = pagure.lib.model.Token(
- id=token,
- user_id=user_id,
- project_id=project_id,
- expiration=datetime.utcnow() - timedelta(days=1),
- )
- session.add(item)
- session.commit()
- def create_tokens_acl(session, token_id="aaabbbcccddd", acl_name=None):
- """ Create some ACLs for the token. If acl_name is not set, the token will
- have all the ACLs enabled.
- """
- if acl_name is None:
- for aclid in range(len(pagure_config["ACLS"])):
- token_acl = pagure.lib.model.TokenAcl(
- token_id=token_id, acl_id=aclid + 1
- )
- session.add(token_acl)
- else:
- acl = (
- session.query(pagure.lib.model.ACL).filter_by(name=acl_name).one()
- )
- token_acl = pagure.lib.model.TokenAcl(token_id=token_id, acl_id=acl.id)
- session.add(token_acl)
- session.commit()
- def _clone_and_top_commits(folder, branch, branch_ref=False):
- """ Clone the repository, checkout the specified branch and return
- the top commit of that branch if there is one.
- Returns the repo, the path to the clone and the top commit(s) in a tuple
- or the repo, the path to the clone and the reference to the branch
- object if branch_ref is True.
- """
- if not os.path.exists(folder):
- os.makedirs(folder)
- brepo = pygit2.init_repository(folder, bare=True)
- newfolder = tempfile.mkdtemp(prefix="pagure-tests")
- repo = pygit2.clone_repository(folder, newfolder)
- branch_ref_obj = None
- if "origin/%s" % branch in repo.listall_branches(pygit2.GIT_BRANCH_ALL):
- branch_ref_obj = pagure.lib.git.get_branch_ref(repo, branch)
- repo.checkout(branch_ref_obj)
- if branch_ref:
- return (repo, newfolder, branch_ref_obj)
- parents = []
- commit = None
- try:
- if branch_ref_obj:
- commit = repo[branch_ref_obj.peel().hex]
- else:
- commit = repo.revparse_single("HEAD")
- except KeyError:
- pass
- if commit:
- parents = [commit.oid.hex]
- return (repo, newfolder, parents)
- def add_content_git_repo(folder, branch="master", append=None):
- """ Create some content for the specified git repo. """
- repo, newfolder, parents = _clone_and_top_commits(folder, branch)
- # Create a file in that git repo
- filename = os.path.join(newfolder, "sources")
- content = "foo\n bar"
- if os.path.exists(filename):
- content = "foo\n bar\nbaz"
- if append:
- content += append
- with open(filename, "w") as stream:
- stream.write(content)
- repo.index.add("sources")
- repo.index.write()
- # Commits the files added
- tree = repo.index.write_tree()
- author = pygit2.Signature("Alice Author", "alice@authors.tld")
- committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
- commit = repo.create_commit(
- "refs/heads/%s" % branch, # the name of the reference to update
- author,
- committer,
- "Add sources file for testing",
- # binary string representing the tree object ID
- tree,
- # list of binary strings representing parents of the new commit
- parents,
- )
- if commit:
- parents = [commit.hex]
- subfolder = os.path.join("folder1", "folder2")
- if not os.path.exists(os.path.join(newfolder, subfolder)):
- os.makedirs(os.path.join(newfolder, subfolder))
- # Create a file in that git repo
- with open(os.path.join(newfolder, subfolder, "file"), "w") as stream:
- stream.write("foo\n bar\nbaz")
- repo.index.add(os.path.join(subfolder, "file"))
- with open(os.path.join(newfolder, subfolder, "fileŠ"), "w") as stream:
- stream.write("foo\n bar\nbaz")
- repo.index.add(os.path.join(subfolder, "fileŠ"))
- repo.index.write()
- # Commits the files added
- tree = repo.index.write_tree()
- author = pygit2.Signature("Alice Author", "alice@authors.tld")
- committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
- commit = repo.create_commit(
- "refs/heads/%s" % branch, # the name of the reference to update
- author,
- committer,
- "Add some directory and a file for more testing",
- # binary string representing the tree object ID
- tree,
- # list of binary strings representing parents of the new commit
- parents,
- )
- # Push to origin
- ori_remote = repo.remotes[0]
- master_ref = repo.lookup_reference(
- "HEAD" if branch == "master" else "refs/heads/%s" % branch
- ).resolve()
- refname = "%s:%s" % (master_ref.name, master_ref.name)
- PagureRepo.push(ori_remote, refname)
- shutil.rmtree(newfolder)
- def add_readme_git_repo(folder, readme_name="README.rst", branch="master"):
- """ Create a README file for the specified git repo. """
- repo, newfolder, parents = _clone_and_top_commits(folder, branch)
- if readme_name == "README.rst":
- content = """Pagure
- ======
- :Author: Pierre-Yves Chibon <pingou@pingoured.fr>
- Pagure is a light-weight git-centered forge based on pygit2.
- Currently, Pagure offers a web-interface for git repositories, a ticket
- system and possibilities to create new projects, fork existing ones and
- create/merge pull-requests across or within projects.
- Homepage: https://github.com/pypingou/pagure
- Dev instance: http://209.132.184.222/ (/!\\ May change unexpectedly, it's a dev instance ;-))
- """
- else:
- content = (
- """Pagure
- ======
- This is a placeholder """
- + readme_name
- + """
- that should never get displayed on the website if there is a README.rst in the repo.
- """
- )
- # Create a file in that git repo
- with open(os.path.join(newfolder, readme_name), "w") as stream:
- stream.write(content)
- repo.index.add(readme_name)
- repo.index.write()
- # Commits the files added
- tree = repo.index.write_tree()
- author = pygit2.Signature("Alice Author", "alice@authors.tld")
- committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
- branch_ref = "refs/heads/%s" % branch
- repo.create_commit(
- branch_ref, # the name of the reference to update
- author,
- committer,
- "Add a README file",
- # binary string representing the tree object ID
- tree,
- # list of binary strings representing parents of the new commit
- parents,
- )
- # Push to origin
- ori_remote = repo.remotes[0]
- PagureRepo.push(ori_remote, "%s:%s" % (branch_ref, branch_ref))
- shutil.rmtree(newfolder)
- def add_commit_git_repo(
- folder, ncommits=10, filename="sources", branch="master", symlink_to=None
- ):
- """ Create some more commits for the specified git repo. """
- repo, newfolder, branch_ref_obj = _clone_and_top_commits(
- folder, branch, branch_ref=True
- )
- for index in range(ncommits):
- # Create a file in that git repo
- if symlink_to:
- os.symlink(symlink_to, os.path.join(newfolder, filename))
- else:
- with open(os.path.join(newfolder, filename), "a") as stream:
- stream.write("Row %s\n" % index)
- repo.index.add(filename)
- repo.index.write()
- parents = []
- commit = None
- try:
- if branch_ref_obj:
- commit = repo[branch_ref_obj.peel().hex]
- else:
- commit = repo.revparse_single("HEAD")
- except (KeyError, AttributeError):
- pass
- if commit:
- parents = [commit.oid.hex]
- # Commits the files added
- tree = repo.index.write_tree()
- author = pygit2.Signature("Alice Author", "alice@authors.tld")
- committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
- branch_ref = "refs/heads/%s" % branch
- repo.create_commit(
- branch_ref,
- author,
- committer,
- "Add row %s to %s file" % (index, filename),
- # binary string representing the tree object ID
- tree,
- # list of binary strings representing parents of the new commit
- parents,
- )
- branch_ref_obj = pagure.lib.git.get_branch_ref(repo, branch)
- # Push to origin
- ori_remote = repo.remotes[0]
- PagureRepo.push(ori_remote, "%s:%s" % (branch_ref, branch_ref))
- shutil.rmtree(newfolder)
- def add_tag_git_repo(folder, tagname, obj_hash, message):
- """ Add a tag to the given object of the given repo annotated by given message. """
- repo, newfolder, branch_ref_obj = _clone_and_top_commits(
- folder, "master", branch_ref=True
- )
- tag_sha = repo.create_tag(
- tagname,
- obj_hash,
- repo.get(obj_hash).type,
- pygit2.Signature("Alice Author", "alice@authors.tld"),
- message,
- )
- # Push to origin
- ori_remote = repo.remotes[0]
- PagureRepo.push(
- ori_remote, "refs/tags/%s:refs/tags/%s" % (tagname, tagname)
- )
- shutil.rmtree(newfolder)
- return tag_sha
- def add_content_to_git(
- folder,
- branch="master",
- folders=None,
- filename="sources",
- content="foo",
- message=None,
- author=("Alice Author", "alice@authors.tld"),
- commiter=("Cecil Committer", "cecil@committers.tld"),
- ):
- """ Create some more commits for the specified git repo. """
- repo, newfolder, branch_ref_obj = _clone_and_top_commits(
- folder, branch, branch_ref=True
- )
- # Create a file in that git repo
- if folders:
- if not os.path.exists(os.path.join(newfolder, folders)):
- os.makedirs(os.path.join(newfolder, folders))
- filename = os.path.join(folders, filename)
- filepath = os.path.join(newfolder, filename)
- with open(filepath, "a", encoding="utf-8") as stream:
- stream.write("%s\n" % content)
- repo.index.add(filename)
- repo.index.write()
- parents = []
- commit = None
- try:
- if branch_ref_obj:
- commit = repo[branch_ref_obj.peel().hex]
- else:
- commit = repo.revparse_single("HEAD")
- except (KeyError, AttributeError):
- pass
- if commit:
- parents = [commit.oid.hex]
- # Commits the files added
- tree = repo.index.write_tree()
- author = pygit2.Signature(*author)
- committer = pygit2.Signature(*commiter)
- branch_ref = "refs/heads/%s" % branch
- message = message or "Add content to file %s" % (filename)
- repo.create_commit(
- branch_ref, # the name of the reference to update
- author,
- committer,
- message,
- # binary string representing the tree object ID
- tree,
- # list of binary strings representing parents of the new commit
- parents,
- )
- # Push to origin
- ori_remote = repo.remotes[0]
- PagureRepo.push(ori_remote, "%s:%s" % (branch_ref, branch_ref))
- shutil.rmtree(newfolder)
- def add_binary_git_repo(folder, filename):
- """ Create a fake image file for the specified git repo. """
- repo, newfolder, parents = _clone_and_top_commits(folder, "master")
- content = b"""\x00\x00\x01\x00\x01\x00\x18\x18\x00\x00\x01\x00 \x00\x88
- \t\x00\x00\x16\x00\x00\x00(\x00\x00\x00\x18\x00x00\x00\x01\x00 \x00\x00\x00
- \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
- 00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa7lM\x01\xa6kM\t\xa6kM\x01
- \xa4fF\x04\xa2dE\x95\xa2cD8\xa1a
- """
- # Create a file in that git repo
- with open(os.path.join(newfolder, filename), "wb") as stream:
- stream.write(content)
- repo.index.add(filename)
- repo.index.write()
- # Commits the files added
- tree = repo.index.write_tree()
- author = pygit2.Signature("Alice Author", "alice@authors.tld")
- committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
- repo.create_commit(
- "refs/heads/master", # the name of the reference to update
- author,
- committer,
- "Add a fake image file",
- # binary string representing the tree object ID
- tree,
- # list of binary strings representing parents of the new commit
- parents,
- )
- # Push to origin
- ori_remote = repo.remotes[0]
- master_ref = repo.lookup_reference("HEAD").resolve()
- refname = "%s:%s" % (master_ref.name, master_ref.name)
- PagureRepo.push(ori_remote, refname)
- shutil.rmtree(newfolder)
- def remove_file_git_repo(folder, filename, branch="master"):
- """ Delete the specified file on the give git repo and branch. """
- repo, newfolder, parents = _clone_and_top_commits(folder, branch)
- # Remove file
- repo.index.remove(filename)
- # Write the change and commit it
- tree = repo.index.write_tree()
- author = pygit2.Signature("Alice Author", "alice@authors.tld")
- committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
- branch_ref = "refs/heads/%s" % branch
- repo.create_commit(
- branch_ref, # the name of the reference to update
- author,
- committer,
- "Remove file %s" % filename,
- # binary string representing the tree object ID
- tree,
- # list of binary strings representing parents of the new commit
- parents,
- )
- # Push to origin
- ori_remote = repo.remotes[0]
- PagureRepo.push(ori_remote, "%s:%s" % (branch_ref, branch_ref))
- shutil.rmtree(newfolder)
- def add_pull_request_git_repo(
- folder,
- session,
- repo,
- fork,
- branch_from="feature",
- user="pingou",
- allow_rebase=False,
- ):
- """ Set up the git repo and create the corresponding PullRequest
- object.
- """
- # Clone the main repo
- gitrepo = os.path.join(folder, "repos", repo.path)
- newpath = tempfile.mkdtemp(prefix="pagure-fork-test")
- repopath = os.path.join(newpath, "test")
- clone_repo = pygit2.clone_repository(gitrepo, repopath)
- # Create a file in that git repo
- with open(os.path.join(repopath, "sources"), "w") as stream:
- stream.write("foo\n bar")
- clone_repo.index.add("sources")
- clone_repo.index.write()
- # Commits the files added
- tree = clone_repo.index.write_tree()
- author = pygit2.Signature("Alice Author", "alice@authors.tld")
- committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
- clone_repo.create_commit(
- "refs/heads/master", # the name of the reference to update
- author,
- committer,
- "Add sources file for testing",
- # binary string representing the tree object ID
- tree,
- # list of binary strings representing parents of the new commit
- [],
- )
- refname = "refs/heads/master:refs/heads/master"
- ori_remote = clone_repo.remotes[0]
- PagureRepo.push(ori_remote, refname)
- first_commit = clone_repo.revparse_single("HEAD")
- # Set the second repo
- repopath = os.path.join(folder, "repos", fork.path)
- new_gitrepo = os.path.join(newpath, "fork_test")
- clone_repo = pygit2.clone_repository(repopath, new_gitrepo)
- # Add the main project as remote repo
- upstream_path = os.path.join(folder, "repos", repo.path)
- remote = clone_repo.create_remote("upstream", upstream_path)
- remote.fetch()
- # Edit the sources file again
- with open(os.path.join(new_gitrepo, "sources"), "w") as stream:
- stream.write("foo\n bar\nbaz\n boose")
- clone_repo.index.add("sources")
- clone_repo.index.write()
- # Commits the files added
- tree = clone_repo.index.write_tree()
- author = pygit2.Signature("Alice Author", "alice@authors.tld")
- committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
- clone_repo.create_commit(
- "refs/heads/%s" % branch_from,
- author,
- committer,
- "A commit on branch %s" % branch_from,
- tree,
- [first_commit.oid.hex],
- )
- refname = "refs/heads/%s" % (branch_from)
- ori_remote = clone_repo.remotes[0]
- PagureRepo.push(ori_remote, refname)
- # Create a PR for these changes
- project = pagure.lib.query.get_authorized_project(session, "test")
- req = pagure.lib.query.new_pull_request(
- session=session,
- repo_from=fork,
- branch_from=branch_from,
- repo_to=project,
- branch_to="master",
- title="PR from the %s branch" % branch_from,
- allow_rebase=allow_rebase,
- user=user,
- )
- session.commit()
- return req
- def clean_pull_requests_path():
- newpath = tempfile.mkdtemp(prefix="pagure-fork-test")
- shutil.rmtree(newpath)
- @contextmanager
- def capture_output(merge_stderr=True):
- oldout, olderr = sys.stdout, sys.stderr
- try:
- out = StringIO()
- err = StringIO()
- if merge_stderr:
- sys.stdout = sys.stderr = out
- yield out
- else:
- sys.stdout, sys.stderr = out, err
- yield out, err
- finally:
- sys.stdout, sys.stderr = oldout, olderr
- def get_alerts(html):
- soup = BeautifulSoup(html, "html.parser")
- alerts = []
- for element in soup.find_all("div", class_="alert"):
- severity = None
- for class_ in element["class"]:
- if not class_.startswith("alert-"):
- continue
- if class_ == "alert-dismissible":
- continue
- severity = class_[len("alert-") :]
- break
- element.find("button").decompose() # close button
- alerts.append(
- dict(severity=severity, text="".join(element.stripped_strings))
- )
- return alerts
- def definitely_wait(result):
- """ Helper function for definitely waiting in _maybe_wait. """
- result.wait()
- if __name__ == "__main__":
- SUITE = unittest.TestLoader().loadTestsFromTestCase(Modeltests)
- unittest.TextTestRunner(verbosity=2).run(SUITE)
|