123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982 |
- # -*- coding: utf-8 -*-
- """
- (c) 2015-2017 - Copyright Red Hat Inc
- Authors:
- Pierre-Yves Chibon <pingou@pingoured.fr>
- """
- from __future__ import absolute_import, print_function, unicode_literals
- import abc
- import json
- import logging
- import os
- import subprocess
- import tempfile
- from io import open
- import pkg_resources
- import werkzeug.utils
- from six import with_metaclass
- from six.moves import dbm_gnu
- import pagure.exceptions
- import pagure.lib.model_base
- import pagure.lib.query
- from pagure.config import config as pagure_config
- from pagure.lib import model
- from pagure.utils import is_repo_collaborator, lookup_deploykey
- # logging.config.dictConfig(pagure_config.get('LOGGING') or {'version': 1})
- _log = logging.getLogger(__name__)
- GIT_AUTH_BACKEND_NAME = None
- GIT_AUTH_BACKEND_INSTANCE = None
- def get_git_auth_helper(backend=None):
- """Instantiate and return the appropriate git auth helper backend.
- :arg backend: The name of the backend to find on the system (declared via
- the entry_points in setup.py).
- Pagure comes by default with the following backends:
- test_auth, gitolite2, gitolite3
- :type backend: str
- """
- global GIT_AUTH_BACKEND_NAME
- global GIT_AUTH_BACKEND_INSTANCE
- if backend is None:
- backend = pagure_config["GIT_AUTH_BACKEND"]
- if (
- GIT_AUTH_BACKEND_NAME
- and GIT_AUTH_BACKEND_INSTANCE
- and backend == GIT_AUTH_BACKEND_NAME
- ):
- # This got previously instantiated, return that instance to avoid
- # having to instantiate it multiple times as long as the same backend
- # is used.
- return GIT_AUTH_BACKEND_INSTANCE
- _log.info("Looking for backend: %s", backend)
- points = pkg_resources.iter_entry_points("pagure.git_auth.helpers")
- classes = dict([(point.name, point) for point in points])
- _log.debug("Found the following installed helpers %r" % classes)
- if len(classes) == 0:
- _log.debug("Was unable to find any helpers, registering built-in")
- cls = {
- "test_auth": GitAuthTestHelper,
- "gitolite2": Gitolite2Auth,
- "gitolite3": Gitolite3Auth,
- "pagure": PagureGitAuth,
- "pagure_authorized_keys": PagureGitAuth,
- }[backend]
- else:
- cls = classes[backend].load()
- _log.debug("Returning helper %r from backend key %r" % (cls, backend))
- GIT_AUTH_BACKEND_NAME = backend
- GIT_AUTH_BACKEND_INSTANCE = cls()
- return GIT_AUTH_BACKEND_INSTANCE
- class GitAuthHelper(with_metaclass(abc.ABCMeta, object)):
- """The class to inherit from when creating your own git authentication
- helper.
- """
- is_dynamic = False
- @classmethod
- @abc.abstractmethod
- def generate_acls(self, project, group=None):
- """This is the method that is called by pagure to generate the
- configuration file.
- :arg project: the project of which to update the ACLs. This argument
- can take three values: ``-1``, ``None`` and a project.
- If project is ``-1``, the configuration should be refreshed for
- *all* projects.
- If project is ``None``, there no specific project to refresh
- but the ssh key of an user was added and updated or a group
- was removed.
- If project is a pagure.lib.model.Project, the configuration of
- this project should be updated.
- :type project: None, int or pagure.lib.model.Project
- :kwarg group: the group to refresh the members of
- :type group: None or pagure.lib.model.PagureGroup
- (This behaviour is based on the workflow of gitolite, if you are
- implementing a different auth backend and need more granularity,
- feel free to let us know.)
- """
- pass
- @classmethod
- @abc.abstractmethod
- def remove_acls(self, session, project):
- """This is the method that is called by pagure to remove a project
- from the configuration file.
- :arg cls: the current class
- :type: GitAuthHelper
- :arg session: the session with which to connect to the database
- :arg project: the project to remove from the gitolite configuration
- file.
- :type project: pagure.lib.model.Project
- """
- pass
- @classmethod
- # This method can't be marked as abstract, since it's new and that would
- # break backwards compatibility
- def check_acl(cls, session, project, username, refname, **info):
- """This method is used in Dynamic Git Auth helpers to check acls.
- It is acceptable for implementations to print things, which will be
- returned to the user.
- Please make sure to add a **kwarg in any implementation, even if
- specific keyword arguments are added for the known fields, to make
- sure your implementation remains working if new items are added.
- Args:
- session (sqlalchemy.Session): Database session
- project (model.Project): Project instance push is for
- username (string): The name of the user trying to push
- refname (string): The name of the ref being pushed to
- Kwargs:
- Extra arguments to help in deciding whether to approve or deny a
- push. This may get additional possible values later on, but will
- have at least:
- - is_update (bool): Whether this is being run at the "update" hook
- moment. See the return type notes to see the differences.
- - revfrom (string): The commit hash the update is happening from.
- - revto (string): The commit hash the update is happening to.
- - pull_request (model.PullRequest or None): The PR that is trying
- to be merged.
- - repotype (string): The pagure.lib.query.get_repotypes() value
- for the repo being pushed to.
- - repodir (string): A directory containing the current
- repository, including the new objects to be approved.
- Note that this might or might not be directly writable, and any
- writes might or might not be accepted. ACL checks MUST not make
- any changes in this repository. (added after 5.0.1)
- Returns (bool): Whether to allow this push.
- If is_update is False and the ACL returns False, the entire push
- is aborted. If is_update is True and the ACL returns True, only
- a single ref update is blocked. So if you want to block just a
- single ref from being updated, only return False if is_update
- is True.
- """
- raise NotImplementedError(
- "check_acl on static Git Auth Backend called"
- )
- def _read_file(filename):
- """Reads the specified file and return its content.
- Returns None if it could not read the file for any reason.
- """
- if not os.path.exists(filename):
- _log.info("Could not find file: %s", filename)
- else:
- with open(filename) as stream:
- return stream.read()
- class Gitolite2Auth(GitAuthHelper):
- """A gitolite 2 authentication module."""
- @classmethod
- def _process_project(cls, project, config, global_pr_only):
- """Generate the gitolite configuration for the specified project.
- :arg project: the project to generate the configuration for
- :type project: pagure.lib.model.Project
- :arg config: a list containing the different lines of the
- configuration file
- :type config: list
- :arg groups: a dictionary containing the group name as key and the
- users member of the group as values
- :type groups: dict(str: list)
- :arg global_pr_only: boolean on whether the pagure instance enforces
- the PR workflow only or not
- :type global_pr_only: bool
- :return: the updated config
- :return type: list
- """
- _log.debug(" Processing project: %s", project.fullname)
- # Check if the project or the pagure instance enforce the PR only
- # development model.
- pr_only = project.settings.get("pull_request_access_only", False)
- repos_to_create = ["repos"]
- if pagure_config.get("ENABLE_DOCS", True):
- repos_to_create.append("docs/")
- if pagure_config.get("ENABLE_TICKETS", True):
- repos_to_create.append("tickets/")
- # no setting yet to disable pull-requests
- repos_to_create.append("requests/")
- for repos in repos_to_create:
- if repos == "repos":
- # Do not grant access to project enforcing the PR model
- if pr_only or (global_pr_only and not project.is_fork):
- continue
- repos = ""
- config.append("repo %s%s" % (repos, project.fullname))
- if not project.private and repos not in ["tickets/", "requests/"]:
- config.append(" R = @all")
- if project.committer_groups:
- config.append(
- " RW+ = @%s"
- % " @".join(
- [
- group.group_name
- for group in project.committer_groups
- ]
- )
- )
- config.append(" RW+ = %s" % project.user.user)
- for user in project.committers:
- # This should never be the case (that the project.user
- # is in the committers) but better safe than sorry
- if user.user != project.user.user:
- config.append(" RW+ = %s" % user.user)
- for deploykey in project.deploykeys:
- access = "R"
- if deploykey.pushaccess:
- access = "RW+"
- # Note: the replace of / with _ is because gitolite
- # users can't contain a /. At first, this might look
- # like deploy keys in a project called
- # $namespace_$project would give access to the repos of
- # a project $namespace/$project or vica versa, however
- # this is NOT the case because we add the deploykey.id
- # to the end of the deploykey name, which means it is
- # unique. The project name is solely there to make it
- # easier to determine what project created the deploykey
- # for admins.
- config.append(
- " %s = deploykey_%s_%s"
- % (
- access,
- werkzeug.utils.secure_filename(project.fullname),
- deploykey.id,
- )
- )
- config.append("")
- return config
- @classmethod
- def _clean_current_config(cls, current_config, project):
- """Remove the specified project from the current configuration file
- :arg current_config: the content of the current/actual gitolite
- configuration file read from the disk
- :type current_config: list
- :arg project: the project to update in the configuration file
- :type project: pagure.lib.model.Project
- """
- keys = [
- "repo %s%s" % (repos, project.fullname)
- for repos in ["", "docs/", "tickets/", "requests/"]
- ]
- keep = True
- config = []
- for line in current_config:
- line = line.rstrip()
- if line in keys:
- keep = False
- continue
- if keep is False and line == "":
- keep = True
- if keep:
- config.append(line)
- return config
- @classmethod
- def _clean_groups(cls, config, group=None):
- """Removes the groups in the given configuration file.
- :arg config: the current configuration
- :type config: list
- :kwarg group: the group to refresh the members of
- :type group: None or pagure.lib.model.PagureGroup
- :return: the configuration without the groups
- :return type: list
- """
- if group is None:
- output = [
- row.rstrip()
- for row in config
- if not row.startswith("@") and row.strip() != "# end of groups"
- ]
- else:
- end_grp = None
- seen = False
- output = []
- for idx, row in enumerate(config):
- if end_grp is None and row.startswith("repo "):
- end_grp = idx
- if row.startswith("@%s " % group.group_name):
- seen = True
- row = "@%s = %s" % (
- group.group_name,
- " ".join(
- sorted([user.username for user in group.users])
- ),
- )
- output.append(row)
- if not seen:
- row = "@%s = %s" % (
- group.group_name,
- " ".join(sorted([user.username for user in group.users])),
- )
- output.insert(end_grp, "")
- output.insert(end_grp, row)
- return output
- @classmethod
- def _generate_groups_config(cls, session):
- """Generate the gitolite configuration for all of the groups.
- :arg session: the session with which to connect to the database
- :return: the gitolite configuration for the groups
- :return type: list
- """
- query = session.query(model.PagureGroup).order_by(
- model.PagureGroup.group_name
- )
- groups = {}
- for grp in query.all():
- groups[grp.group_name] = [user.username for user in grp.users]
- return groups
- @classmethod
- def _get_current_config(cls, configfile, preconfig=None, postconfig=None):
- """Load the current gitolite configuration file from the disk.
- :arg configfile: the name of the configuration file to load
- :type configfile: str
- :kwarg preconf: the content of the file to include at the top of the
- gitolite configuration file, used here to determine that a part of
- the configuration file should be cleaned at the top.
- :type preconf: None or str
- :kwarg postconf: the content of the file to include at the bottom of
- the gitolite configuration file, used here to determine that a part
- of the configuration file should be cleaned at the bottom.
- :type postconf: None or str
- """
- _log.info("Reading in the current configuration: %s", configfile)
- with open(configfile) as stream:
- current_config = [line.rstrip() for line in stream]
- if current_config and current_config[-1] == "# end of body":
- current_config = current_config[:-1]
- if preconfig:
- idx = None
- for idx, row in enumerate(current_config):
- if row.strip() == "# end of header":
- break
- if idx is not None:
- idx = idx + 1
- _log.info("Removing the first %s lines", idx)
- current_config = current_config[idx:]
- if postconfig:
- idx = None
- for idx, row in enumerate(current_config):
- if row.strip() == "# end of body":
- break
- if idx is not None:
- _log.info(
- "Keeping the first %s lines out of %s",
- idx,
- len(current_config),
- )
- current_config = current_config[:idx]
- return current_config
- @classmethod
- def write_gitolite_acls(
- cls,
- session,
- configfile,
- project,
- preconf=None,
- postconf=None,
- group=None,
- ):
- """Generate the configuration file for gitolite for all projects
- on the forge.
- :arg cls: the current class
- :type: Gitolite2Auth
- :arg session: a session to connect to the database with
- :arg configfile: the name of the configuration file to generate/write
- :type configfile: str
- :arg project: the project to update in the gitolite configuration
- file. It can be of three types/values.
- If it is ``-1`` or if the file does not exist on disk, the
- entire gitolite configuration will be re-generated.
- If it is ``None``, the gitolite configuration will have its
- groups information updated but not the projects and will be
- re-compiled.
- If it is a ``pagure.lib.model.Project``, the gitolite
- configuration will be updated for just this project.
- :type project: None, int or spagure.lib.model.Project
- :kwarg preconf: a file to include at the top of the configuration
- file
- :type preconf: None or str
- :kwarg postconf: a file to include at the bottom of the
- configuration file
- :type postconf: None or str
- :kwarg group: the group to refresh the members of
- :type group: None or pagure.lib.model.PagureGroup
- """
- _log.info("Write down the gitolite configuration file")
- preconfig = None
- if preconf:
- _log.info(
- "Loading the file to include at the top of the generated one"
- )
- preconfig = _read_file(preconf)
- postconfig = None
- if postconf:
- _log.info(
- "Loading the file to include at the end of the generated one"
- )
- postconfig = _read_file(postconf)
- global_pr_only = pagure_config.get("PR_ONLY", False)
- config = []
- groups = {}
- if group is None:
- groups = cls._generate_groups_config(session)
- if project == -1 or not os.path.exists(configfile):
- _log.info("Refreshing the configuration for all projects")
- query = session.query(model.Project).order_by(model.Project.id)
- for project in query.all():
- config = cls._process_project(project, config, global_pr_only)
- elif project:
- _log.info("Refreshing the configuration for one project")
- config = cls._process_project(project, config, global_pr_only)
- current_config = cls._get_current_config(
- configfile, preconfig, postconfig
- )
- current_config = cls._clean_current_config(current_config, project)
- config = current_config + config
- if config:
- _log.info("Cleaning the group %s from the loaded config", group)
- config = cls._clean_groups(config, group=group)
- else:
- current_config = cls._get_current_config(
- configfile, preconfig, postconfig
- )
- _log.info("Cleaning the group %s from the config on disk", group)
- config = cls._clean_groups(current_config, group=group)
- if not config:
- return
- _log.info("Writing the configuration to: %s", configfile)
- with open(configfile, "w", encoding="utf-8") as stream:
- if preconfig:
- stream.write(preconfig + "\n")
- stream.write("# end of header\n")
- if groups:
- for key in sorted(groups):
- stream.write("@%s = %s\n" % (key, " ".join(groups[key])))
- stream.write("# end of groups\n\n")
- prev = None
- for row in config:
- if prev is None:
- prev = row
- if prev == row == "":
- continue
- stream.write(row + "\n")
- prev = row
- stream.write("# end of body\n")
- if postconfig:
- stream.write(postconfig + "\n")
- @classmethod
- def _remove_from_gitolite_cache(cls, cache_file, project):
- """Removes project from gitolite cache file (gl-conf.cache)
- Gitolite has no notion of "deleting" a project and it can only
- add values to gl-conf.cache. Therefore we must manually wipe all
- entries related to a project when deleting it.
- If this method is not executed and if someone creates a project
- with the same fullname again then its `gl-conf` file won't get
- created (see link to commit below) and any subsequent invocation of
- `gitolite trigger POST_COMPILE` will fail, thus preventing creation
- of new repos/forks at the whole pagure instance.
- See https://github.com/sitaramc/gitolite/commit/41b7885b77c
- (later reverted upstream, but still used in most Pagure deployments)
- :arg cls: the current class
- :type: Gitolite2Auth
- :arg cache_file: path to the cache file
- :type cache_file: str
- :arg project: the project to remove from gitolite cache file
- :type project: pagure.lib.model.Project
- """
- _log.info("Remove project from the gitolite cache file")
- cf = None
- try:
- # unfortunately dbm_gnu.open isn't a context manager in Python 2 :(
- cf = dbm_gnu.open(cache_file, "ws")
- for repo in ["", "docs/", "tickets/", "requests/"]:
- to_remove = repo + project.fullname
- if to_remove.encode("ascii") in cf:
- del cf[to_remove]
- except dbm_gnu.error as e:
- msg = "Failed to remove project from gitolite cache: {msg}".format(
- msg=e[1]
- )
- raise pagure.exceptions.PagureException(msg)
- finally:
- if cf:
- cf.close()
- @classmethod
- def remove_acls(cls, session, project):
- """Remove a project from the configuration file for gitolite.
- :arg cls: the current class
- :type: Gitolite2Auth
- :arg session: the session with which to connect to the database
- :arg project: the project to remove from the gitolite configuration
- file.
- :type project: pagure.lib.model.Project
- """
- _log.info("Remove project from the gitolite configuration file")
- if not project:
- raise RuntimeError("Project undefined")
- configfile = pagure_config["GITOLITE_CONFIG"]
- preconf = pagure_config.get("GITOLITE_PRE_CONFIG") or None
- postconf = pagure_config.get("GITOLITE_POST_CONFIG") or None
- if not os.path.exists(configfile):
- _log.info(
- "Not configuration file found at: %s... bailing" % configfile
- )
- return
- preconfig = None
- if preconf:
- _log.info(
- "Loading the file to include at the top of the generated one"
- )
- preconfig = _read_file(preconf)
- postconfig = None
- if postconf:
- _log.info(
- "Loading the file to include at the end of the generated one"
- )
- postconfig = _read_file(postconf)
- config = []
- groups = cls._generate_groups_config(session)
- _log.info("Removing the project from the configuration")
- current_config = cls._get_current_config(
- configfile, preconfig, postconfig
- )
- current_config = cls._clean_current_config(current_config, project)
- config = current_config + config
- if config:
- _log.info("Cleaning the groups from the loaded config")
- config = cls._clean_groups(config)
- else:
- current_config = cls._get_current_config(
- configfile, preconfig, postconfig
- )
- _log.info("Cleaning the groups from the config on disk")
- config = cls._clean_groups(config)
- if not config:
- return
- _log.info("Writing the configuration to: %s", configfile)
- with open(configfile, "w", encoding="utf-8") as stream:
- if preconfig:
- stream.write(preconfig + "\n")
- stream.write("# end of header\n")
- if groups:
- for key in sorted(groups):
- stream.write("@%s = %s\n" % (key, " ".join(groups[key])))
- stream.write("# end of groups\n\n")
- prev = None
- for row in config:
- if prev is None:
- prev = row
- if prev == row == "":
- continue
- stream.write(row + "\n")
- prev = row
- stream.write("# end of body\n")
- if postconfig:
- stream.write(postconfig + "\n")
- gl_cache_path = os.path.join(
- os.path.dirname(configfile), "..", "gl-conf.cache"
- )
- if os.path.exists(gl_cache_path):
- cls._remove_from_gitolite_cache(gl_cache_path, project)
- @staticmethod
- def _get_gitolite_command():
- """Return the gitolite command to run based on the info in the
- configuration file.
- """
- _log.info("Compiling the gitolite configuration")
- gitolite_folder = pagure_config.get("GITOLITE_HOME", None)
- if gitolite_folder:
- cmd = "GL_RC=%s GL_BINDIR=%s gl-compile-conf" % (
- pagure_config.get("GL_RC"),
- pagure_config.get("GL_BINDIR"),
- )
- _log.debug("Command: %s", cmd)
- return cmd
- @classmethod
- def _repos_from_lines(cls, lines):
- """Return list of strings representing complete repo entries from list
- of lines as returned by _process_project.
- """
- repos = []
- for line in lines:
- if line.startswith("repo "):
- repos.append([line])
- else:
- repos[-1].append(line)
- for i, repo_lines in enumerate(repos):
- repos[i] = "\n".join(repo_lines)
- return repos
- @classmethod
- def _run_gitolite_cmd(cls, cmd):
- """Run gitolite command as subprocess, raise PagureException
- if it fails.
- """
- if cmd:
- proc = subprocess.Popen(
- cmd,
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- cwd=pagure_config["GITOLITE_HOME"],
- )
- stdout, stderr = proc.communicate()
- if proc.returncode != 0:
- error_msg = (
- 'The command "{0}" failed with'
- '\n\n out: "{1}\n\n err:"{2}"'.format(
- cmd, stdout, stderr
- )
- )
- raise pagure.exceptions.PagureException(error_msg)
- @classmethod
- def generate_acls(cls, project, group=None):
- """Generate the gitolite configuration file for all repos
- :arg project: the project to update in the gitolite configuration
- file. It can be of three types/values.
- If it is ``-1`` or if the file does not exist on disk, the
- entire gitolite configuration will be re-generated.
- If it is ``None``, the gitolite configuration will not be
- changed but will be re-compiled.
- If it is a ``pagure.lib.model.Project``, the gitolite
- configuration will be updated for just this project.
- :type project: None, int or pagure.lib.model.Project
- :kwarg group: the group to refresh the members of
- :type group: None or pagure.lib.model.PagureGroup
- """
- _log.info("Refresh gitolite configuration")
- if project is not None or group is not None:
- session = pagure.lib.model_base.create_session(
- pagure_config["DB_URL"]
- )
- cls.write_gitolite_acls(
- session,
- project=project,
- configfile=pagure_config["GITOLITE_CONFIG"],
- preconf=pagure_config.get("GITOLITE_PRE_CONFIG") or None,
- postconf=pagure_config.get("GITOLITE_POST_CONFIG") or None,
- group=group,
- )
- session.remove()
- if (
- not group
- and project not in [None, -1]
- and hasattr(cls, "_individual_repos_command")
- and pagure_config.get("GITOLITE_HAS_COMPILE_1", False)
- ):
- # optimization for adding single repo - we don't want to recompile
- # whole gitolite.conf
- repos_config = []
- cls._process_project(
- project, repos_config, pagure_config.get("PR_ONLY", False)
- )
- # repos_config will contain lines for repo itself as well as
- # docs, requests, tickets; compile-1 only accepts one repo,
- # so we have to run it separately for all of them
- for repo in cls._repos_from_lines(repos_config):
- repopath = repo.splitlines()[0][len("repo ") :].strip()
- repotype = repopath.split("/")[0]
- if (
- repotype == "docs" and not pagure_config.get("ENABLE_DOCS")
- ) or (
- repotype == "tickets"
- and not pagure_config.get("ENABLE_TICKETS")
- ):
- continue
- with tempfile.NamedTemporaryFile() as f:
- f.write(repo)
- f.flush()
- cmd = cls._individual_repos_command(f.name)
- cls._run_gitolite_cmd(cmd)
- else:
- cmd = cls._get_gitolite_command()
- cls._run_gitolite_cmd(cmd)
- class Gitolite3Auth(Gitolite2Auth):
- """A gitolite 3 authentication module."""
- @staticmethod
- def _individual_repos_command(config_file):
- _log.info(
- "Compiling gitolite configuration %s for single repository",
- config_file,
- )
- gitolite_folder = pagure_config.get("GITOLITE_HOME", None)
- if gitolite_folder:
- cmd = "HOME=%s gitolite compile-1 %s" % (
- gitolite_folder,
- config_file,
- )
- _log.debug("Command: %s", cmd)
- return cmd
- @staticmethod
- def _get_gitolite_command():
- """Return the gitolite command to run based on the info in the
- configuration file.
- """
- _log.info("Compiling the gitolite configuration")
- gitolite_folder = pagure_config.get("GITOLITE_HOME", None)
- if gitolite_folder:
- cmd = (
- "HOME=%s gitolite compile && HOME=%s gitolite trigger "
- "POST_COMPILE" % (gitolite_folder, gitolite_folder)
- )
- _log.debug("Command: %s", cmd)
- return cmd
- @classmethod
- def post_compile_only(cls):
- """This method runs `gitolite trigger POST_COMPILE` without touching
- any other gitolite configuration. Most importantly, this will process
- SSH keys used by gitolite.
- """
- _log.info("Triggering gitolite POST_COMPILE")
- gitolite_folder = pagure_config.get("GITOLITE_HOME", None)
- if gitolite_folder:
- cmd = "HOME=%s gitolite trigger POST_COMPILE" % gitolite_folder
- _log.debug("Command: %s", cmd)
- cls._run_gitolite_cmd(cmd)
- class PagureGitAuth(GitAuthHelper):
- """Standard Pagure git auth implementation."""
- is_dynamic = True
- @classmethod
- def generate_acls(self, project, group=None):
- """This function is required but not used."""
- pass
- @classmethod
- def remove_acls(self, session, project):
- """This function is required but not used."""
- pass
- def info(self, msg):
- """Function that prints info about decisions to clients.
- This is a function to make it possible to override for test suite."""
- print(msg)
- def check_acl(
- self,
- session,
- project,
- username,
- refname,
- pull_request,
- repotype,
- is_internal,
- **info
- ):
- if is_internal:
- self.info("Internal push allowed")
- return True
- # Check whether a PR is required for this repo or in general
- global_pr_only = pagure_config.get("PR_ONLY", False)
- pr_only = project.settings.get("pull_request_access_only", False)
- if repotype == "main":
- if (
- pr_only or (global_pr_only and not project.is_fork)
- ) and not pull_request:
- self.info("Pull request required")
- return False
- if username is None:
- return False
- # Determine whether the current user is allowed to push
- is_committer = is_repo_collaborator(
- project, refname, username, session
- )
- deploykey = lookup_deploykey(project, username)
- if deploykey is not None:
- self.info("Deploykey used. Push access: %s" % deploykey.pushaccess)
- is_committer = deploykey.pushaccess
- self.info("Has commit access: %s" % is_committer)
- return is_committer
- class GitAuthTestHelper(GitAuthHelper):
- """Simple test auth module to check the auth customization system."""
- is_dynamic = True
- @classmethod
- def generate_acls(cls, project, group=None):
- """Print a statement when called, useful for debugging, only.
- :arg project: this variable is just printed out but not used
- in any real place.
- :type project: None, int or spagure.lib.model.Project
- :kwarg group: the group to refresh the members of
- :type group: None or pagure.lib.model.PagureGroup
- """
- out = (
- "Called GitAuthTestHelper.generate_acls() "
- "with args: project=%s, group=%s" % (project, group)
- )
- print(out)
- return out
- @classmethod
- def remove_acls(cls, session, project):
- """Print a statement about which a project would be removed from
- the configuration file for gitolite.
- :arg cls: the current class
- :type: GitAuthHelper
- :arg session: the session with which to connect to the database
- :arg project: the project to remove from the gitolite configuration
- file.
- :type project: pagure.lib.model.Project
- """
- out = (
- "Called GitAuthTestHelper.remove_acls() "
- "with args: project=%s" % (project.fullname)
- )
- print(out)
- return out
- @classmethod
- def check_acl(
- cls, session, project, username, refname, pull_request, **info
- ):
- testfile = pagure_config.get("TEST_AUTH_STATUS", None)
- if not testfile or not os.path.exists(testfile):
- # If we are not configured, we will assume allowed
- return True
- with open(testfile, "r") as statusfile:
- status = json.loads(statusfile.read())
- if status is True or status is False:
- return status
- # Other option would be a dict with ref->allow
- # (with allow True, pronly), missing means False)
- if refname not in status:
- print("ref '%s' not in status" % refname)
- return False
- elif status[refname] is True:
- return True
- elif status[refname] == "pronly":
- return pull_request is not None
|