# -*- coding: utf-8 -*- """ (c) 2017 - Copyright Red Hat Inc Authors: Pierre-Yves Chibon """ from __future__ import unicode_literals __requires__ = ['SQLAlchemy >= 0.8'] import pkg_resources import datetime import os import shutil import sys import tempfile import time import unittest from io import open import pygit2 from mock import patch, MagicMock sys.path.insert(0, os.path.join(os.path.dirname( os.path.abspath(__file__)), '..')) import pagure.lib.git import pagure.lib.query import tests from pagure.lib.repo import PagureRepo CORE_CONFIG = """repo test R = @all RW+ = pingou repo docs/test R = @all RW+ = pingou repo tickets/test RW+ = pingou repo requests/test RW+ = pingou repo test2 R = @all RW+ = pingou repo docs/test2 R = @all RW+ = pingou repo tickets/test2 RW+ = pingou repo requests/test2 RW+ = pingou repo somenamespace/test3 R = @all RW+ = pingou repo docs/somenamespace/test3 R = @all RW+ = pingou repo tickets/somenamespace/test3 RW+ = pingou repo requests/somenamespace/test3 RW+ = pingou""" class PagureLibGitoliteConfigtests(tests.Modeltests): """ Tests for pagure.lib.git """ maxDiff = None def setUp(self): """ Set up the environnment, ran before every tests. """ super(PagureLibGitoliteConfigtests, self).setUp() tests.create_projects(self.session) self.outputconf = os.path.join(self.path, 'test_gitolite.conf') self.preconf = os.path.join(self.path, 'header_gitolite') with open(self.preconf, 'w') as stream: stream.write('# this is a header that is manually added\n') stream.write('\n') stream.write('@group1 = foo bar baz\n') stream.write('@group2 = threebean puiterwijk kevin pingou\n') self.postconf = os.path.join(self.path, 'footer_gitolite') with open(self.postconf, 'w', encoding="utf-8") as stream: stream.write('# end of generated configuration\n') stream.write(r'# \ó/') stream.write('\n# end of footer\n') def tearDown(self): """ Tearn down the environnment, ran before every tests. """ super(PagureLibGitoliteConfigtests, self).tearDown() if os.path.exists(self.outputconf): os.unlink(self.outputconf) self.assertFalse(os.path.exists(self.outputconf)) def test_write_gitolite_pre_post_projectNone(self): """ Test the write_gitolite_acls function of pagure.lib.git with a postconf set """ helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3') helper.write_gitolite_acls( self.session, self.outputconf, project=None, preconf=self.preconf, postconf=self.postconf ) self.assertTrue(os.path.exists(self.outputconf)) with open(self.outputconf, 'r') as stream: data = stream.read() exp = r"""# this is a header that is manually added @group1 = foo bar baz @group2 = threebean puiterwijk kevin pingou # end of header %s # end of body # end of generated configuration # \ó/ # end of footer """ % CORE_CONFIG #print data self.assertEqual(data, exp) def test_write_gitolite_pre_post_projectNone_to_existing_file(self): """ Test the write_gitolite_acls function of pagure.lib.git with a postconf set with existing output file """ with open(self.outputconf, 'w') as stream: pass helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3') helper.write_gitolite_acls( self.session, self.outputconf, project=None, preconf=self.preconf, postconf=self.postconf ) self.assertTrue(os.path.exists(self.outputconf)) with open(self.outputconf) as stream: data = stream.read() self.assertEqual(data, '') def test_write_gitolite_pre_post_project_1(self): """ Test the write_gitolite_acls function of pagure.lib.git with a postconf set """ with open(self.outputconf, 'w') as stream: pass helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3') helper.write_gitolite_acls( self.session, self.outputconf, project=-1, preconf=self.preconf, postconf=self.postconf ) self.assertTrue(os.path.exists(self.outputconf)) with open(self.outputconf) as stream: data = stream.read() exp = r"""# this is a header that is manually added @group1 = foo bar baz @group2 = threebean puiterwijk kevin pingou # end of header %s # end of body # end of generated configuration # \ó/ # end of footer """ % CORE_CONFIG #print data self.assertEqual(data, exp) def test_write_gitolite_pre_post_project_test(self): """ Test the write_gitolite_acls function of pagure.lib.git with a postconf set """ with open(self.outputconf, 'w') as stream: pass project = pagure.lib.query._get_project(self.session, 'test') helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3') helper.write_gitolite_acls( self.session, self.outputconf, project=project, preconf=self.preconf, postconf=self.postconf ) self.assertTrue(os.path.exists(self.outputconf)) with open(self.outputconf) as stream: data = stream.read() exp = r"""# this is a header that is manually added @group1 = foo bar baz @group2 = threebean puiterwijk kevin pingou # end of header repo test R = @all RW+ = pingou repo docs/test R = @all RW+ = pingou repo tickets/test RW+ = pingou repo requests/test RW+ = pingou # end of body # end of generated configuration # \ó/ # end of footer """ #print data self.assertEqual(data, exp) def test_write_gitolite_pre_post_project_test_full_file(self): """ Test the write_gitolite_acls function of pagure.lib.git with a postconf set """ # Re-generate the gitolite config for all the projects self.test_write_gitolite_pre_post_project_1() self.assertTrue(os.path.exists(self.outputconf)) project = pagure.lib.query._get_project(self.session, 'test') project.user_id = 2 self.session.add(project) self.session.commit() project = pagure.lib.query._get_project(self.session, 'test') msg = pagure.lib.query.add_user_to_project( self.session, project=project, new_user='pingou', user='foo', access='commit' ) self.assertEqual(msg, 'User added') self.session.commit() project = pagure.lib.query._get_project(self.session, 'test') helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3') helper.write_gitolite_acls( self.session, self.outputconf, project=project, preconf=self.preconf, postconf=self.postconf ) self.assertTrue(os.path.exists(self.outputconf)) with open(self.outputconf) as stream: data = stream.read() exp = r"""# this is a header that is manually added @group1 = foo bar baz @group2 = threebean puiterwijk kevin pingou # end of header repo test2 R = @all RW+ = pingou repo docs/test2 R = @all RW+ = pingou repo tickets/test2 RW+ = pingou repo requests/test2 RW+ = pingou repo somenamespace/test3 R = @all RW+ = pingou repo docs/somenamespace/test3 R = @all RW+ = pingou repo tickets/somenamespace/test3 RW+ = pingou repo requests/somenamespace/test3 RW+ = pingou repo test R = @all RW+ = foo RW+ = pingou repo docs/test R = @all RW+ = foo RW+ = pingou repo tickets/test RW+ = foo RW+ = pingou repo requests/test RW+ = foo RW+ = pingou # end of body # end of generated configuration # \ó/ # end of footer """ #print data self.assertEqual(data, exp) @patch.dict('pagure.config.config', {'ENABLE_DOCS': False, 'ENABLE_TICKETS': False}) def test_write_gitolite_disabled_docs_tickets(self): """ Test the write_gitolite_acls function when docs and tickets are disabled """ # Re-generate the gitolite config for all the projects project = pagure.lib.query._get_project(self.session, 'test') project.user_id = 2 self.session.add(project) self.session.commit() project = pagure.lib.query._get_project(self.session, 'test') msg = pagure.lib.query.add_user_to_project( self.session, project=project, new_user='pingou', user='foo', access='commit' ) self.assertEqual(msg, 'User added') self.session.commit() project = pagure.lib.query._get_project(self.session, 'test') helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3') helper.write_gitolite_acls( self.session, self.outputconf, project=project, ) self.assertTrue(os.path.exists(self.outputconf)) with open(self.outputconf) as stream: data = stream.read() exp = """repo test R = @all RW+ = foo RW+ = pingou repo requests/test RW+ = foo RW+ = pingou repo test2 R = @all RW+ = pingou repo requests/test2 RW+ = pingou repo somenamespace/test3 R = @all RW+ = pingou repo requests/somenamespace/test3 RW+ = pingou # end of body """ self.assertEqual(data, exp) class PagureLibGitoliteGroupConfigtests(tests.Modeltests): """ Tests for generating the gitolite configuration file for a group change """ maxDiff = None def setUp(self): """ Set up the environnment, ran before every tests. """ super(PagureLibGitoliteGroupConfigtests, self).setUp() tests.create_projects(self.session) pagure.lib.query.add_group( self.session, group_name='grp', display_name='grp group', description=None, group_type='user', user='pingou', is_admin=False, blacklist=[], ) pagure.lib.query.add_group( self.session, group_name='grp2', display_name='grp2 group', description=None, group_type='user', user='foo', is_admin=False, blacklist=[], ) self.session.commit() self.outputconf = os.path.join(self.path, 'test_gitolite.conf') self.preconf = os.path.join(self.path, 'header_gitolite') with open(self.preconf, 'w') as stream: stream.write('# this is a header that is manually added\n') stream.write('\n') stream.write('@group1 = foo bar baz\n') stream.write('@group2 = threebean puiterwijk kevin pingou\n') self.postconf = os.path.join(self.path, 'footer_gitolite') with open(self.postconf, 'w') as stream: stream.write('# end of generated configuration\n') stream.write(r'# \ó/') stream.write('\n# end of footer\n') def tearDown(self): """ Tearn down the environnment, ran before every tests. """ super(PagureLibGitoliteGroupConfigtests, self).tearDown() if os.path.exists(self.outputconf): os.unlink(self.outputconf) self.assertFalse(os.path.exists(self.outputconf)) def test_write_gitolite_project_test_group(self): """ Test the write_gitolite_acls when updating a single group. """ with open(self.outputconf, 'w') as stream: pass project = pagure.lib.query._get_project(self.session, 'test') group = pagure.lib.query.search_groups(self.session, group_name='grp') helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3') helper.write_gitolite_acls( self.session, self.outputconf, project=project, preconf=self.preconf, postconf=self.postconf, group=group, ) self.assertTrue(os.path.exists(self.outputconf)) with open(self.outputconf) as stream: data = stream.read() exp = r"""# this is a header that is manually added @group1 = foo bar baz @group2 = threebean puiterwijk kevin pingou # end of header @grp = pingou repo test R = @all RW+ = pingou repo docs/test R = @all RW+ = pingou repo tickets/test RW+ = pingou repo requests/test RW+ = pingou # end of body # end of generated configuration # \ó/ # end of footer """ #print data self.assertEqual(data, exp) def test_write_gitolite_project_test_all_groups(self): """ Test the write_gitolite_acls when updating all groups. """ with open(self.outputconf, 'w') as stream: pass project = pagure.lib.query._get_project(self.session, 'test') helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3') helper.write_gitolite_acls( self.session, self.outputconf, project=project, preconf=self.preconf, postconf=self.postconf, ) self.assertTrue(os.path.exists(self.outputconf)) with open(self.outputconf) as stream: data = stream.read() exp = r"""# this is a header that is manually added @group1 = foo bar baz @group2 = threebean puiterwijk kevin pingou # end of header @grp = pingou @grp2 = foo # end of groups repo test R = @all RW+ = pingou repo docs/test R = @all RW+ = pingou repo tickets/test RW+ = pingou repo requests/test RW+ = pingou # end of body # end of generated configuration # \ó/ # end of footer """ #print data self.assertEqual(data, exp) def test_write_gitolite_project_all_projects_groups(self): """ Test the generating the entire gitolite config. """ with open(self.outputconf, 'w') as stream: pass helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3') helper.write_gitolite_acls( self.session, self.outputconf, project=-1, preconf=self.preconf, postconf=self.postconf, ) self.assertTrue(os.path.exists(self.outputconf)) with open(self.outputconf) as stream: data = stream.read() exp = r"""# this is a header that is manually added @group1 = foo bar baz @group2 = threebean puiterwijk kevin pingou # end of header @grp = pingou @grp2 = foo # end of groups %s # end of body # end of generated configuration # \ó/ # end of footer """ % CORE_CONFIG #print data self.assertEqual(data, exp) def test_write_gitolite_project_all_projects_one_group(self): """ Test the generating the entire gitolite config. """ # Generate the full gitolite config that we will update self.test_write_gitolite_project_all_projects_groups() project = pagure.lib.query._get_project(self.session, 'test') group = pagure.lib.query.search_groups(self.session, group_name='grp') # Let's add `foo` to `grp` so something changes msg = pagure.lib.query.add_user_to_group( self.session, username='foo', group=group, user='pingou', is_admin=False, ) self.session.commit() self.assertEqual(msg, 'User `foo` added to the group `grp`.') # Let's add `foo` to `test` so the project changes as well msg = pagure.lib.query.add_user_to_project( self.session, project=project, new_user='foo', user='pingou', access='commit' ) self.assertEqual(msg, 'User added') self.session.commit() helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3') helper.write_gitolite_acls( self.session, self.outputconf, project=project, group=group, preconf=self.preconf, postconf=self.postconf, ) self.assertTrue(os.path.exists(self.outputconf)) with open(self.outputconf) as stream: data = stream.read() exp = r"""# this is a header that is manually added @group1 = foo bar baz @group2 = threebean puiterwijk kevin pingou # end of header @grp = foo pingou @grp2 = foo # end of groups repo test2 R = @all RW+ = pingou repo docs/test2 R = @all RW+ = pingou repo tickets/test2 RW+ = pingou repo requests/test2 RW+ = pingou repo somenamespace/test3 R = @all RW+ = pingou repo docs/somenamespace/test3 R = @all RW+ = pingou repo tickets/somenamespace/test3 RW+ = pingou repo requests/somenamespace/test3 RW+ = pingou repo test R = @all RW+ = pingou RW+ = foo repo docs/test R = @all RW+ = pingou RW+ = foo repo tickets/test RW+ = pingou RW+ = foo repo requests/test RW+ = pingou RW+ = foo # end of body # end of generated configuration # \ó/ # end of footer """ #print data self.assertEqual(data, exp) def test_write_gitolite_delete_group(self): """ Test the updating the gitolite config after having deleted a group. """ # Generate the full gitolite config that we will update self.test_write_gitolite_project_all_projects_groups() # Delete the group `grp` self.assertEqual(len(pagure.lib.query.search_groups(self.session)), 2) group = pagure.lib.query.search_groups(self.session, group_name='grp') self.session.delete(group) self.session.commit() self.assertEqual(len(pagure.lib.query.search_groups(self.session)), 1) helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3') helper.write_gitolite_acls( self.session, self.outputconf, project=None, preconf=self.preconf, postconf=self.postconf, ) self.assertTrue(os.path.exists(self.outputconf)) with open(self.outputconf) as stream: data = stream.read() exp = r"""# this is a header that is manually added @group1 = foo bar baz @group2 = threebean puiterwijk kevin pingou # end of header @grp2 = foo # end of groups %s # end of body # end of generated configuration # \ó/ # end of footer """ % CORE_CONFIG #print data self.assertEqual(data, exp) @patch('pagure.lib.git_auth.get_git_auth_helper') def test_task_generate_gitolite_acls_one_group(self, get_helper): """ Test the generate_gitolite_acls task to ensure if group is None then None is passed to the helper. """ helper = MagicMock() get_helper.return_value = helper pagure.lib.query.SESSIONMAKER = self.session.session_factory pagure.lib.tasks.generate_gitolite_acls( namespace=None, name='test', user=None, group=None) get_helper.assert_called_with() args = helper.generate_acls.call_args self.assertIsNone(args[1].get('group')) self.assertIsNotNone(args[1].get('project')) def test_write_gitolite_project_test_private(self): """ Test the write_gitolite_acls function of pagure.lib.git with a postconf set """ # Make the test project private project = pagure.lib.query._get_project(self.session, 'test') project.private = True self.session.add(project) self.session.commit() # Re-generate the gitolite config just for this project helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3') helper.write_gitolite_acls( self.session, self.outputconf, project=None, ) self.assertTrue(os.path.exists(self.outputconf)) with open(self.outputconf) as stream: data = stream.read() exp = """@grp = pingou @grp2 = foo # end of groups repo test RW+ = pingou repo docs/test RW+ = pingou repo tickets/test RW+ = pingou repo requests/test RW+ = pingou repo test2 R = @all RW+ = pingou repo docs/test2 R = @all RW+ = pingou repo tickets/test2 RW+ = pingou repo requests/test2 RW+ = pingou repo somenamespace/test3 R = @all RW+ = pingou repo docs/somenamespace/test3 R = @all RW+ = pingou repo tickets/somenamespace/test3 RW+ = pingou repo requests/somenamespace/test3 RW+ = pingou # end of body """ #print data self.assertEqual(data, exp) def test_remove_acls(self): """ Test the remove_acls function of pagure.lib.git when deleting a project """ pagure.config.config['GITOLITE_CONFIG'] = self.outputconf with open(self.outputconf, 'w') as stream: pass helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3') helper.write_gitolite_acls( self.session, self.outputconf, project=-1, ) self.assertTrue(os.path.exists(self.outputconf)) with open(self.outputconf) as stream: data = stream.read() exp = """@grp = pingou @grp2 = foo # end of groups %s # end of body """ % CORE_CONFIG #print data self.assertEqual(data, exp) # Test removing a project from the existing config project = pagure.lib.query.get_authorized_project( self.session, project_name='test') helper.remove_acls(self.session, project=project) with open(self.outputconf) as stream: data = stream.read() exp = """@grp = pingou @grp2 = foo # end of groups repo test2 R = @all RW+ = pingou repo docs/test2 R = @all RW+ = pingou repo tickets/test2 RW+ = pingou repo requests/test2 RW+ = pingou repo somenamespace/test3 R = @all RW+ = pingou repo docs/somenamespace/test3 R = @all RW+ = pingou repo tickets/somenamespace/test3 RW+ = pingou repo requests/somenamespace/test3 RW+ = pingou # end of body """ #print data self.assertEqual(data, exp) def test_remove_acls_no_project(self): """ Test the remove_acls function of pagure.lib.git when no project is specified """ pagure.config.config['GITOLITE_CONFIG'] = self.outputconf with open(self.outputconf, 'w') as stream: pass helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3') helper.write_gitolite_acls( self.session, self.outputconf, project=-1, ) self.assertTrue(os.path.exists(self.outputconf)) with open(self.outputconf) as stream: data = stream.read() exp = """@grp = pingou @grp2 = foo # end of groups %s # end of body """ % CORE_CONFIG #print data self.assertEqual(data, exp) # Test nothing changes if no project is specified self.assertRaises( RuntimeError, helper.remove_acls, self.session, project=None ) with open(self.outputconf) as stream: data = stream.read() self.assertEqual(data, exp) if __name__ == '__main__': unittest.main(verbosity=2)