Browse Source

Block all POST requests on the UI and the API for blocked users

Expand the test on blocked users ensure POST actions are not accessible
via neither the UI nor the API.

Signed-off-by: Pierre-Yves Chibon <pingou@pingoured.fr>
Pierre-Yves Chibon 5 years ago
parent
commit
4cfa1e4e36

+ 32 - 0
pagure/api/__init__.py

@@ -121,6 +121,7 @@ class APIERROR(enum.Enum):
     )
     ETRACKERREADONLY = "The issue tracker of this project is read-only"
     ENOPRSTATS = "No statistics could be computed for this PR"
+    EUBLOCKED = "You have been blocked from this project"
 
 
 def get_authorized_api_project(session, repo, user=None, namespace=None):
@@ -151,6 +152,37 @@ def api_login_required(acls=None):
             response = check_api_acls(acls)
             if response:
                 return response
+
+            # Block all POST request from blocked users
+            if flask.request.method == "POST":
+                # Retrieve the variables in the URL
+                url_args = flask.request.view_args or {}
+                # Check if there is a `repo` and an `username`
+                repo = url_args.get("repo")
+                username = url_args.get("username")
+                namespace = url_args.get("namespace")
+
+                if repo:
+                    flask.g.repo = pagure.lib.query.get_authorized_project(
+                        flask.g.session,
+                        repo,
+                        user=username,
+                        namespace=namespace,
+                    )
+
+                    if (
+                        flask.g.repo
+                        and flask.g.fas_user.username
+                        in flask.g.repo.block_users
+                    ):
+                        output = {
+                            "error": APIERROR.EUBLOCKED.value,
+                            "error_code": APIERROR.EUBLOCKED.name,
+                        }
+                        response = flask.jsonify(output)
+                        response.status_code = 403
+                        return response
+
             return function(*args, **kwargs)
 
         return decorated_function

+ 5 - 0
pagure/flask_app.py

@@ -305,6 +305,11 @@ def set_request():
                 flask.g.session, flask.g.repo, user=flask.g.fas_user.username
             )
 
+            # Block all POST request from blocked users
+            if flask.g.repo and flask.request.method != "GET":
+                if flask.g.fas_user.username in flask.g.repo.block_users:
+                    flask.abort(403, "You have been blocked from this project")
+
         if (
             not flask.g.repo
             and namespace

+ 4 - 3
tests/test_pagure_flask_api.py

@@ -237,10 +237,10 @@ class PagureFlaskApitests(tests.SimplePagureTest):
         output = self.app.get('/api/0/-/error_codes')
         self.assertEqual(output.status_code, 200)
         data = json.loads(output.get_data(as_text=True))
-        self.assertEqual(len(data), 35)
+        self.assertEqual(len(data), 36)
         self.assertEqual(
             sorted(data.keys()),
-            [
+            sorted([
                 'EDATETIME',
                 'EDBERROR',
                 'EGITERROR',
@@ -276,7 +276,8 @@ class PagureFlaskApitests(tests.SimplePagureTest):
                 'ETIMESTAMP',
                 'ETRACKERDISABLED',
                 'ETRACKERREADONLY',
-            ]
+                'EUBLOCKED',
+            ])
         )
 
     @patch("pagure.lib.tasks.get_result")

+ 82 - 26
tests/test_pagure_flask_api_project_blockuser.py

@@ -45,6 +45,8 @@ class PagureFlaskApiProjectBlockuserTests(tests.SimplePagureTest):
         super(PagureFlaskApiProjectBlockuserTests, self).setUp()
 
         tests.create_projects(self.session)
+        tests.create_projects_git(
+            os.path.join(self.path, 'repos'), bare=True)
         tests.create_tokens(self.session)
         tests.create_tokens_acl(self.session)
 
@@ -61,11 +63,17 @@ class PagureFlaskApiProjectBlockuserTests(tests.SimplePagureTest):
 
         project = pagure.lib.query.get_authorized_project(self.session, "test")
         self.assertEqual(project.block_users, [])
+        self.blocked_users = []
+
+        project = pagure.lib.query.get_authorized_project(self.session, "test2")
+        project.block_users = ["foo"]
+        self.session.add(project)
+        self.session.commit()
 
     def tearDown(self):
         """ Tears down the environment at the end of the tests. """
         project = pagure.lib.query.get_authorized_project(self.session, "test")
-        self.assertEqual(project.block_users, [])
+        self.assertEqual(project.block_users, self.blocked_users)
 
         super(PagureFlaskApiProjectBlockuserTests, self).tearDown()
 
@@ -162,40 +170,42 @@ class PagureFlaskApiProjectBlockuserTests(tests.SimplePagureTest):
             },
         )
 
+    def test_api_blockuser_with_data(self):
+        """ Test api_pull_request_assign method when the project doesn't exist.
+        """
+        self.blocked_users = ["foo"]
 
-class PagureFlaskApiProjectBlockuserFilledTests(tests.SimplePagureTest):
-    """ Tests for the flask API of pagure for assigning a PR """
-
-    maxDiff = None
-
-    @patch("pagure.lib.git.update_git", MagicMock(return_value=True))
-    @patch("pagure.lib.notify.send_email", MagicMock(return_value=True))
-    def setUp(self):
-        """ Set up the environnment, ran before every tests. """
-        super(PagureFlaskApiProjectBlockuserFilledTests, self).setUp()
-
-        tests.create_projects(self.session)
-        tests.create_tokens(self.session)
-        tests.create_tokens_acl(self.session)
+        headers = {"Authorization": "token aaabbbcccddd"}
+        data = {"username": ["foo"]}
 
-        project = pagure.lib.query.get_authorized_project(self.session, "test")
-        self.assertEqual(project.block_users, [])
+        # user blocked
+        output = self.app.post(
+            "/api/0/test/blockuser", headers=headers, data=data
+        )
+        self.assertEqual(output.status_code, 200)
+        data = json.loads(output.get_data(as_text=True))
+        self.assertDictEqual(data, {"message": "User(s) blocked"})
 
-    def tearDown(self):
-        """ Tears down the environment at the end of the tests. """
-        project = pagure.lib.query.get_authorized_project(self.session, "test")
-        self.assertEqual(project.block_users, ["foo"])
+        # Second request, no changes
+        headers = {"Authorization": "token aaabbbcccddd"}
+        data = {"username": ["foo"]}
 
-        super(PagureFlaskApiProjectBlockuserFilledTests, self).tearDown()
+        output = self.app.post(
+            "/api/0/test/blockuser", headers=headers, data=data
+        )
+        self.assertEqual(output.status_code, 200)
+        data = json.loads(output.get_data(as_text=True))
+        self.assertDictEqual(data, {"message": "User(s) blocked"})
 
-    def test_api_blockuser_with_data(self):
-        """ Test api_project_block_user method to block users.
+    def test_api_blockeduser_api(self):
+        """ Test doing a POST request to the API when the user is blocked.
         """
+        self.blocked_users = ["pingou"]
 
         headers = {"Authorization": "token aaabbbcccddd"}
-        data = {"username": ["foo"]}
+        data = {"username": ["pingou"]}
 
-        # No user blocked
+        # user blocked
         output = self.app.post(
             "/api/0/test/blockuser", headers=headers, data=data
         )
@@ -203,6 +213,52 @@ class PagureFlaskApiProjectBlockuserFilledTests(tests.SimplePagureTest):
         data = json.loads(output.get_data(as_text=True))
         self.assertDictEqual(data, {"message": "User(s) blocked"})
 
+        # Second request, but user is blocked
+        headers = {"Authorization": "token aaabbbcccddd"}
+        data = {"username": ["foo"]}
+
+        output = self.app.post(
+            "/api/0/test/blockuser", headers=headers, data=data
+        )
+        self.assertEqual(output.status_code, 403)
+        data = json.loads(output.get_data(as_text=True))
+        self.assertDictEqual(
+            data,
+            {
+                "error":"You have been blocked from this project",
+                "error_code":"EUBLOCKED"
+            }
+        )
+
+    def test_ui_new_issue_user_blocked(self):
+        """ Test doing a POST request to the UI when the user is blocked.
+        """
+
+        user = tests.FakeUser(username="foo")
+        with tests.user_set(self.app.application, user):
+
+            output = self.app.get('/test2/new_issue')
+            self.assertEqual(output.status_code, 200)
+            self.assertIn(
+                'New Issue',
+                output.get_data(as_text=True))
+
+            csrf_token = self.get_csrf(output=output)
+
+            data = {
+                'title': 'Test issue',
+                'issue_content': 'We really should improve on this issue',
+                'status': 'Open',
+                'csrf_token': csrf_token,
+            }
+
+            output = self.app.post('/test2/new_issue', data=data)
+            self.assertEqual(output.status_code, 403)
+            output_text = output.get_data(as_text=True)
+            self.assertIn(
+                '<p>You have been blocked from this project</p>',
+                output_text)
+
 
 if __name__ == "__main__":
     unittest.main(verbosity=2)