123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734 |
- # -*- coding: utf-8 -*-
- """
- (c) 2015-2017 - Copyright Red Hat Inc
- Authors:
- Pierre-Yves Chibon <pingou@pingoured.fr>
- """
- __requires__ = ['SQLAlchemy >= 0.7']
- import pkg_resources
- import logging
- import unittest
- import shutil
- import subprocess
- import sys
- import tempfile
- import os
- logging.basicConfig(stream=sys.stderr)
- # Always enable performance counting for tests
- os.environ['PAGURE_PERFREPO'] = 'true'
- from datetime import date
- from datetime import datetime
- from datetime import timedelta
- from functools import wraps
- import pygit2
- from contextlib import contextmanager
- from sqlalchemy import create_engine
- from sqlalchemy.orm import sessionmaker
- from sqlalchemy.orm import scoped_session
- sys.path.insert(0, os.path.join(os.path.dirname(
- os.path.abspath(__file__)), '..'))
- import pagure
- import pagure.lib
- import pagure.lib.model
- from pagure.lib.repo import PagureRepo
- import pagure.perfrepo as perfrepo
- DB_PATH = None
- FAITOUT_URL = 'http://faitout.fedorainfracloud.org/'
- if os.environ.get('FAITOUT_URL'):
- FAITOUT_URL = os.environ.get('FAITOUT_URL')
- HERE = os.path.join(os.path.dirname(os.path.abspath(__file__)))
- LOG = logging.getLogger(__name__)
- LOG.setLevel(logging.DEBUG)
- PAGLOG = logging.getLogger('pagure')
- PAGLOG.setLevel(logging.CRITICAL)
- PAGLOG.handlers = []
- CONFIG_TEMPLATE = """
- GIT_FOLDER = '%(path)s/repos'
- DOCS_FOLDER = '%(path)s/docs'
- TICKETS_FOLDER = '%(path)s/tickets'
- REQUESTS_FOLDER = '%(path)s/requests'
- REMOTE_GIT_FOLDER = '%(path)s/remotes'
- ATTACHMENTS_FOLDER = '%(path)s/attachments'
- DB_URL = '%(dburl)s'
- """
- LOG.info('BUILD_ID: %s', os.environ.get('BUILD_ID'))
- if os.environ.get('BUILD_ID')or os.environ.get('FAITOUT_URL'):
- try:
- import requests
- req = requests.get('%s/new' % FAITOUT_URL)
- if req.status_code == 200:
- DB_PATH = req.text
- LOG.info('Using faitout at: %s', DB_PATH)
- else:
- LOG.info('faitout returned: %s : %s', req.status_code, req.text)
- except Exception as err:
- LOG.info('Error while querying faitout: %s', err)
- pass
- # Remove the log handlers for the tests
- pagure.APP.logger.handlers = []
- @contextmanager
- def user_set(APP, user):
- """ Set the provided user as fas_user in the provided application."""
- # Hack used to remove the before_request function set by
- # flask.ext.fas_openid.FAS which otherwise kills our effort to set a
- # flask.g.fas_user.
- from flask import appcontext_pushed, g
- keep = []
- for meth in APP.before_request_funcs[None]:
- if 'flask_fas_openid.FAS' in str(meth):
- continue
- keep.append(meth)
- APP.before_request_funcs[None] = keep
- def handler(sender, **kwargs):
- g.fas_user = user
- g.fas_session_id = b'123'
- with appcontext_pushed.connected_to(handler, APP):
- yield
- class Modeltests(unittest.TestCase):
- """ Model tests. """
- def __init__(self, method_name='runTest'):
- """ Constructor. """
- unittest.TestCase.__init__(self, method_name)
- self.session = None
- self.path = None
- self.gitrepo = None
- self.gitrepos = None
- def perfMaxWalks(self, max_walks, max_steps):
- """ Check that we have not performed too many walks/steps. """
- num_walks = 0
- num_steps = 0
- for reqstat in perfrepo.REQUESTS:
- for walk in reqstat['walks'].values():
- num_walks += 1
- num_steps += walk['steps']
- self.assertLessEqual(num_walks, max_walks,
- '%s git repo walks performed, at most %s allowed'
- % (num_walks, max_walks))
- self.assertLessEqual(num_steps, max_steps,
- '%s git repo steps performed, at most %s allowed'
- % (num_steps, max_steps))
- def perfReset(self):
- """ Reset perfrepo stats. """
- perfrepo.reset_stats()
- perfrepo.REQUESTS = []
- def setUp(self): # pylint: disable=invalid-name
- """ Set up the environnment, ran before every tests. """
- # Clean up test performance info
- perfrepo.reset_stats()
- perfrepo.REQUESTS = []
- pagure.REDIS = None
- pagure.lib.REDIS = None
- if self.path is not None:
- raise Exception('Double init?!')
- self.path = tempfile.mkdtemp(prefix='pagure-tests-path-')
- LOG.info('Testdir: %s', self.path)
- for folder in ['tickets', 'repos', 'forks', 'docs', 'requests',
- 'releases', 'remotes', 'attachments']:
- os.mkdir(os.path.join(self.path, folder))
- if DB_PATH:
- self.dbpath = DB_PATH
- else:
- self.dbpath = 'sqlite:///%s' % os.path.join(self.path,
- 'db.sqlite')
- # Write a config file
- config_values = {'path': self.path,
- 'dburl': self.dbpath}
- with open(os.path.join(self.path, 'config'), 'w') as f:
- f.write(CONFIG_TEMPLATE % config_values)
- # Create a broker
- broker_url = os.path.join(self.path, 'broker')
- self.broker = subprocess.Popen(
- ['/usr/bin/redis-server', '--unixsocket', broker_url, '--port',
- '0', '--loglevel', 'warning', '--logfile', '/dev/null'],
- stdout=None, stderr=None)
- self.broker.poll()
- if self.broker.returncode is not None:
- raise Exception('Broker failed to start')
- self.session = pagure.lib.model.create_tables(
- self.dbpath, acls=pagure.APP.config.get('ACLS', {}))
- reload(pagure.lib.tasks)
- celery_broker_url = 'redis+socket://' + broker_url
- pagure.lib.tasks.conn.conf.broker_url = celery_broker_url
- pagure.lib.tasks.conn.conf.result_backend = celery_broker_url
- # Start a worker
- # Using cocurrency 2 to test with some concurrency, but not be heavy
- # Using eventlet so that worker.terminate kills everything
- self.workerlog = open(os.path.join(self.path, 'worker.log'), 'w')
- self.worker = subprocess.Popen(
- ['/usr/bin/celery', '-A', 'pagure.lib.tasks', 'worker',
- '--loglevel=info', '--concurrency=2', '--pool=eventlet',
- '--without-gossip', '--without-mingle', '--quiet'],
- env={'PAGURE_BROKER_URL': celery_broker_url,
- 'PAGURE_CONFIG': os.path.join(self.path, 'config'),
- 'PYTHONPATH': '.'},
- cwd=os.path.normpath(os.path.join(os.path.dirname(__file__),
- '..')),
- stdout=self.workerlog,
- stderr=self.workerlog)
- self.worker.poll()
- if self.worker.returncode is not None:
- raise Exception('Worker failed to start')
- # Create a couple of users
- item = pagure.lib.model.User(
- user='pingou',
- fullname='PY C',
- password='foo',
- default_email='bar@pingou.com',
- )
- self.session.add(item)
- item = pagure.lib.model.UserEmail(
- user_id=1,
- email='bar@pingou.com')
- self.session.add(item)
- item = pagure.lib.model.UserEmail(
- user_id=1,
- email='foo@pingou.com')
- self.session.add(item)
- item = pagure.lib.model.User(
- user='foo',
- fullname='foo bar',
- password='foo',
- default_email='foo@bar.com',
- )
- self.session.add(item)
- item = pagure.lib.model.UserEmail(
- user_id=2,
- email='foo@bar.com')
- self.session.add(item)
- self.session.commit()
- # Prevent unit-tests to send email, globally
- pagure.APP.config['EMAIL_SEND'] = False
- pagure.APP.config['TESTING'] = True
- pagure.APP.config['GIT_FOLDER'] = os.path.join(
- self.path, 'repos')
- pagure.APP.config['TICKETS_FOLDER'] = os.path.join(
- self.path, 'tickets')
- pagure.APP.config['DOCS_FOLDER'] = os.path.join(
- self.path, 'docs')
- pagure.APP.config['REQUESTS_FOLDER'] = os.path.join(
- self.path, 'requests')
- pagure.APP.config['ATTACHMENTS_FOLDER'] = os.path.join(
- self.path, 'attachments')
- self.app = pagure.APP.test_client()
- def tearDown(self): # pylint: disable=invalid-name
- """ Remove the test.db database if there is one. """
- self.session.close()
- # Clear DB
- if self.dbpath.startswith('postgres'):
- if 'localhost' not in self.dbpath:
- db_name = self.dbpath.rsplit('/', 1)[1]
- requests.get('%s/clean/%s' % (FAITOUT_URL, db_name))
- # Terminate worker and broker
- self.worker.terminate()
- self.worker.wait()
- self.broker.terminate()
- self.broker.wait()
- # Remove testdir
- shutil.rmtree(self.path)
- self.path = None
- def get_csrf(self, url='/new'):
- """Retrieve a CSRF token from given URL."""
- output = self.app.get(url)
- self.assertEqual(output.status_code, 200)
- return output.data.split(
- 'name="csrf_token" type="hidden" value="')[1].split('">')[0]
- class FakeGroup(object): # pylint: disable=too-few-public-methods
- """ Fake object used to make the FakeUser object closer to the
- expectations.
- """
- def __init__(self, name):
- """ Constructor.
- :arg name: the name given to the name attribute of this object.
- """
- self.name = name
- self.group_type = 'cla'
- class FakeUser(object): # pylint: disable=too-few-public-methods
- """ Fake user used to test the fedocallib library. """
- def __init__(self, groups=[], username='username', cla_done=True, id=1):
- """ Constructor.
- :arg groups: list of the groups in which this fake user is
- supposed to be.
- """
- if isinstance(groups, basestring):
- groups = [groups]
- self.id = id
- self.groups = groups
- self.user = username
- self.username = username
- self.name = username
- self.email = 'foo@bar.com'
- self.approved_memberships = [
- FakeGroup('packager'),
- FakeGroup('design-team')
- ]
- self.dic = {}
- self.dic['timezone'] = 'Europe/Paris'
- self.login_time = datetime.utcnow()
- self.cla_done = cla_done
- def __getitem__(self, key):
- return self.dic[key]
- def create_projects(session):
- """ Create some projects in the database. """
- item = pagure.lib.model.Project(
- user_id=1, # pingou
- name='test',
- description='test project #1',
- hook_token='aaabbbccc',
- )
- item.close_status = ['Invalid', 'Insufficient data', 'Fixed', 'Duplicate']
- session.add(item)
- item = pagure.lib.model.Project(
- user_id=1, # pingou
- name='test2',
- description='test project #2',
- hook_token='aaabbbddd',
- )
- item.close_status = ['Invalid', 'Insufficient data', 'Fixed', 'Duplicate']
- session.add(item)
- item = pagure.lib.model.Project(
- user_id=1, # pingou
- name='test3',
- description='namespaced test project',
- hook_token='aaabbbeee',
- namespace='somenamespace',
- )
- item.close_status = ['Invalid', 'Insufficient data', 'Fixed', 'Duplicate']
- session.add(item)
- session.commit()
- def create_projects_git(folder, bare=False):
- """ Create some projects in the database. """
- repos = []
- for project in ['test.git', 'test2.git',
- os.path.join('somenamespace', 'test3.git')]:
- repo_path = os.path.join(folder, project)
- repos.append(repo_path)
- if not os.path.exists(repo_path):
- os.makedirs(repo_path)
- pygit2.init_repository(repo_path, bare=bare)
- return repos
- def create_tokens(session, user_id=1, project_id=1):
- """ Create some tokens for the project in the database. """
- item = pagure.lib.model.Token(
- id='aaabbbcccddd',
- user_id=user_id,
- project_id=project_id,
- expiration=datetime.utcnow() + timedelta(days=30)
- )
- session.add(item)
- item = pagure.lib.model.Token(
- id='foo_token',
- user_id=user_id,
- project_id=project_id,
- expiration=datetime.utcnow() + timedelta(days=30)
- )
- session.add(item)
- item = pagure.lib.model.Token(
- id='expired_token',
- user_id=user_id,
- project_id=project_id,
- expiration=datetime.utcnow() - timedelta(days=1)
- )
- session.add(item)
- session.commit()
- def create_tokens_acl(session, token_id='aaabbbcccddd'):
- """ Create some acls for the tokens. """
- for aclid in range(len(pagure.APP.config['ACLS'])):
- item = pagure.lib.model.TokenAcl(
- token_id=token_id,
- acl_id=aclid + 1,
- )
- session.add(item)
- session.commit()
- def add_content_git_repo(folder, branch='master'):
- """ Create some content for the specified git repo. """
- if not os.path.exists(folder):
- os.makedirs(folder)
- brepo = pygit2.init_repository(folder, bare=True)
- newfolder = tempfile.mkdtemp(prefix='pagure-tests')
- repo = pygit2.clone_repository(folder, newfolder)
- # Create a file in that git repo
- with open(os.path.join(newfolder, 'sources'), 'w') as stream:
- stream.write('foo\n bar')
- repo.index.add('sources')
- repo.index.write()
- parents = []
- commit = None
- try:
- commit = repo.revparse_single(
- 'HEAD' if branch == 'master' else branch)
- except KeyError:
- pass
- if commit:
- parents = [commit.oid.hex]
- # Commits the files added
- tree = repo.index.write_tree()
- author = pygit2.Signature(
- 'Alice Author', 'alice@authors.tld')
- committer = pygit2.Signature(
- 'Cecil Committer', 'cecil@committers.tld')
- repo.create_commit(
- 'refs/heads/%s' % branch, # the name of the reference to update
- author,
- committer,
- 'Add sources file for testing',
- # binary string representing the tree object ID
- tree,
- # list of binary strings representing parents of the new commit
- parents,
- )
- parents = []
- commit = None
- try:
- commit = repo.revparse_single(
- 'HEAD' if branch == 'master' else branch)
- except KeyError:
- pass
- if commit:
- parents = [commit.oid.hex]
- subfolder = os.path.join('folder1', 'folder2')
- if not os.path.exists(os.path.join(newfolder, subfolder)):
- os.makedirs(os.path.join(newfolder, subfolder))
- # Create a file in that git repo
- with open(os.path.join(newfolder, subfolder, 'file'), 'w') as stream:
- stream.write('foo\n bar\nbaz')
- repo.index.add(os.path.join(subfolder, 'file'))
- with open(os.path.join(newfolder, subfolder, 'fileŠ'), 'w') as stream:
- stream.write('foo\n bar\nbaz')
- repo.index.add(os.path.join(subfolder, 'fileŠ'))
- repo.index.write()
- # Commits the files added
- tree = repo.index.write_tree()
- author = pygit2.Signature(
- 'Alice Author', 'alice@authors.tld')
- committer = pygit2.Signature(
- 'Cecil Committer', 'cecil@committers.tld')
- repo.create_commit(
- 'refs/heads/%s' % branch, # the name of the reference to update
- author,
- committer,
- 'Add some directory and a file for more testing',
- # binary string representing the tree object ID
- tree,
- # list of binary strings representing parents of the new commit
- parents
- )
- # Push to origin
- ori_remote = repo.remotes[0]
- master_ref = repo.lookup_reference(
- 'HEAD' if branch == 'master' else 'refs/heads/%s' % branch).resolve()
- refname = '%s:%s' % (master_ref.name, master_ref.name)
- PagureRepo.push(ori_remote, refname)
- shutil.rmtree(newfolder)
- def add_readme_git_repo(folder):
- """ Create a README file for the specified git repo. """
- if not os.path.exists(folder):
- os.makedirs(folder)
- brepo = pygit2.init_repository(folder, bare=True)
- newfolder = tempfile.mkdtemp(prefix='pagure-tests')
- repo = pygit2.clone_repository(folder, newfolder)
- content = """Pagure
- ======
- :Author: Pierre-Yves Chibon <pingou@pingoured.fr>
- Pagure is a light-weight git-centered forge based on pygit2.
- Currently, Pagure offers a web-interface for git repositories, a ticket
- system and possibilities to create new projects, fork existing ones and
- create/merge pull-requests across or within projects.
- Homepage: https://github.com/pypingou/pagure
- Dev instance: http://209.132.184.222/ (/!\\ May change unexpectedly, it's a dev instance ;-))
- """
- parents = []
- commit = None
- try:
- commit = repo.revparse_single('HEAD')
- except KeyError:
- pass
- if commit:
- parents = [commit.oid.hex]
- # Create a file in that git repo
- with open(os.path.join(newfolder, 'README.rst'), 'w') as stream:
- stream.write(content)
- repo.index.add('README.rst')
- repo.index.write()
- # Commits the files added
- tree = repo.index.write_tree()
- author = pygit2.Signature(
- 'Alice Author', 'alice@authors.tld')
- committer = pygit2.Signature(
- 'Cecil Committer', 'cecil@committers.tld')
- repo.create_commit(
- 'refs/heads/master', # the name of the reference to update
- author,
- committer,
- 'Add a README file',
- # binary string representing the tree object ID
- tree,
- # list of binary strings representing parents of the new commit
- parents
- )
- # Push to origin
- ori_remote = repo.remotes[0]
- master_ref = repo.lookup_reference('HEAD').resolve()
- refname = '%s:%s' % (master_ref.name, master_ref.name)
- PagureRepo.push(ori_remote, refname)
- shutil.rmtree(newfolder)
- def add_commit_git_repo(folder, ncommits=10, filename='sources',
- branch='master'):
- """ Create some more commits for the specified git repo. """
- if not os.path.exists(folder):
- os.makedirs(folder)
- pygit2.init_repository(folder, bare=True)
- newfolder = tempfile.mkdtemp(prefix='pagure-tests')
- repo = pygit2.clone_repository(folder, newfolder)
- for index in range(ncommits):
- # Create a file in that git repo
- with open(os.path.join(newfolder, filename), 'a') as stream:
- stream.write('Row %s\n' % index)
- repo.index.add(filename)
- repo.index.write()
- parents = []
- commit = None
- try:
- commit = repo.revparse_single('HEAD')
- except KeyError:
- pass
- if commit:
- parents = [commit.oid.hex]
- # Commits the files added
- tree = repo.index.write_tree()
- author = pygit2.Signature(
- 'Alice Author', 'alice@authors.tld')
- committer = pygit2.Signature(
- 'Cecil Committer', 'cecil@committers.tld')
- repo.create_commit(
- 'refs/heads/master',
- author,
- committer,
- 'Add row %s to %s file' % (index, filename),
- # binary string representing the tree object ID
- tree,
- # list of binary strings representing parents of the new commit
- parents,
- )
- # Push to origin
- ori_remote = repo.remotes[0]
- PagureRepo.push(ori_remote, 'HEAD:refs/heads/%s' % branch)
- shutil.rmtree(newfolder)
- def add_content_to_git(folder, filename='sources', content='foo'):
- """ Create some more commits for the specified git repo. """
- if not os.path.exists(folder):
- os.makedirs(folder)
- brepo = pygit2.init_repository(folder, bare=True)
- newfolder = tempfile.mkdtemp(prefix='pagure-tests')
- repo = pygit2.clone_repository(folder, newfolder)
- # Create a file in that git repo
- with open(os.path.join(newfolder, filename), 'a') as stream:
- stream.write('%s\n' % content)
- repo.index.add(filename)
- repo.index.write()
- parents = []
- commit = None
- try:
- commit = repo.revparse_single('HEAD')
- except KeyError:
- pass
- if commit:
- parents = [commit.oid.hex]
- # Commits the files added
- tree = repo.index.write_tree()
- author = pygit2.Signature(
- 'Alice Author', 'alice@authors.tld')
- committer = pygit2.Signature(
- 'Cecil Committer', 'cecil@committers.tld')
- repo.create_commit(
- 'refs/heads/master', # the name of the reference to update
- author,
- committer,
- 'Add content to file %s' % (filename),
- # binary string representing the tree object ID
- tree,
- # list of binary strings representing parents of the new commit
- parents,
- )
- # Push to origin
- ori_remote = repo.remotes[0]
- master_ref = repo.lookup_reference('HEAD').resolve()
- refname = '%s:%s' % (master_ref.name, master_ref.name)
- PagureRepo.push(ori_remote, refname)
- shutil.rmtree(newfolder)
- def add_binary_git_repo(folder, filename):
- """ Create a fake image file for the specified git repo. """
- if not os.path.exists(folder):
- os.makedirs(folder)
- brepo = pygit2.init_repository(folder, bare=True)
- newfolder = tempfile.mkdtemp(prefix='pagure-tests')
- repo = pygit2.clone_repository(folder, newfolder)
- content = b"""\x00\x00\x01\x00\x01\x00\x18\x18\x00\x00\x01\x00 \x00\x88
- \t\x00\x00\x16\x00\x00\x00(\x00\x00\x00\x18\x00x00\x00\x01\x00 \x00\x00\x00
- \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
- 00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa7lM\x01\xa6kM\t\xa6kM\x01
- \xa4fF\x04\xa2dE\x95\xa2cD8\xa1a
- """
- parents = []
- commit = None
- try:
- commit = repo.revparse_single('HEAD')
- except KeyError:
- pass
- if commit:
- parents = [commit.oid.hex]
- # Create a file in that git repo
- with open(os.path.join(newfolder, filename), 'wb') as stream:
- stream.write(content)
- repo.index.add(filename)
- repo.index.write()
- # Commits the files added
- tree = repo.index.write_tree()
- author = pygit2.Signature(
- 'Alice Author', 'alice@authors.tld')
- committer = pygit2.Signature(
- 'Cecil Committer', 'cecil@committers.tld')
- repo.create_commit(
- 'refs/heads/master', # the name of the reference to update
- author,
- committer,
- 'Add a fake image file',
- # binary string representing the tree object ID
- tree,
- # list of binary strings representing parents of the new commit
- parents
- )
- # Push to origin
- ori_remote = repo.remotes[0]
- master_ref = repo.lookup_reference('HEAD').resolve()
- refname = '%s:%s' % (master_ref.name, master_ref.name)
- PagureRepo.push(ori_remote, refname)
- shutil.rmtree(newfolder)
- if __name__ == '__main__':
- SUITE = unittest.TestLoader().loadTestsFromTestCase(Modeltests)
- unittest.TextTestRunner(verbosity=2).run(SUITE)
|