test_pagure_lib.py 211 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. (c) 2015-2018 - Copyright Red Hat Inc
  4. Authors:
  5. Pierre-Yves Chibon <pingou@pingoured.fr>
  6. """
  7. from __future__ import unicode_literals, absolute_import
  8. import datetime
  9. import unittest
  10. import shutil
  11. import sys
  12. import os
  13. import pygit2
  14. import markdown
  15. from mock import patch, MagicMock, Mock
  16. sys.path.insert(
  17. 0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")
  18. )
  19. import pagure
  20. import pagure.lib.query
  21. import pagure.lib.model
  22. import tests
  23. class PagureLibtests_search_user(tests.Modeltests):
  24. """
  25. Test the pagure.lib search_user() method
  26. """
  27. maxDiff = None
  28. def test_search_user_all(self):
  29. """
  30. Test the method returns all the users for the given session
  31. """
  32. # Retrieve all users
  33. items = pagure.lib.query.search_user(self.session)
  34. self.assertEqual(2, len(items))
  35. self.assertEqual(2, items[0].id)
  36. self.assertEqual("foo", items[0].user)
  37. self.assertEqual("foo", items[0].username)
  38. self.assertEqual([], items[1].groups)
  39. self.assertEqual(1, items[1].id)
  40. self.assertEqual("pingou", items[1].user)
  41. self.assertEqual("pingou", items[1].username)
  42. self.assertEqual([], items[1].groups)
  43. def test_search_user_username(self):
  44. """
  45. Test the method returns the user for a given username
  46. """
  47. # Retrieve user by username
  48. item = pagure.lib.query.search_user(self.session, username="foo")
  49. self.assertEqual("foo", item.user)
  50. self.assertEqual("foo", item.username)
  51. self.assertEqual([], item.groups)
  52. item = pagure.lib.query.search_user(self.session, username="bar")
  53. self.assertEqual(None, item)
  54. def test_search_user_email(self):
  55. """
  56. Test the method returns a user for a given email address
  57. """
  58. # Retrieve user by email
  59. item = pagure.lib.query.search_user(self.session, email="foo@foo.com")
  60. self.assertEqual(None, item)
  61. item = pagure.lib.query.search_user(self.session, email="foo@bar.com")
  62. self.assertEqual("foo", item.user)
  63. self.assertEqual("foo", item.username)
  64. self.assertEqual([], item.groups)
  65. self.assertEqual(
  66. ["foo@bar.com"], [email.email for email in item.emails]
  67. )
  68. item = pagure.lib.query.search_user(
  69. self.session, email="foo@pingou.com"
  70. )
  71. self.assertEqual("pingou", item.user)
  72. self.assertEqual(
  73. sorted(["bar@pingou.com", "foo@pingou.com"]),
  74. sorted([email.email for email in item.emails]),
  75. )
  76. def test_search_user_token(self):
  77. """
  78. Test the method returns a user for a given token
  79. """
  80. # Retrieve user by token
  81. item = pagure.lib.query.search_user(self.session, token="aaa")
  82. self.assertEqual(None, item)
  83. item = pagure.lib.model.User(
  84. user="pingou2",
  85. fullname="PY C",
  86. token="aaabbb",
  87. default_email="bar@pingou.com",
  88. )
  89. self.session.add(item)
  90. self.session.commit()
  91. item = pagure.lib.query.search_user(self.session, token="aaabbb")
  92. self.assertEqual("pingou2", item.user)
  93. self.assertEqual("PY C", item.fullname)
  94. def test_search_user_pattern(self):
  95. """
  96. Test the method returns a user for a given pattern
  97. """
  98. # Retrieve user by pattern
  99. item = pagure.lib.query.search_user(self.session, pattern="a*")
  100. self.assertEqual([], item)
  101. item = pagure.lib.model.User(
  102. user="pingou2",
  103. fullname="PY C",
  104. token="aaabbb",
  105. default_email="bar@pingou.com",
  106. )
  107. self.session.add(item)
  108. self.session.commit()
  109. items = pagure.lib.query.search_user(self.session, pattern="p*")
  110. self.assertEqual(2, len(items))
  111. self.assertEqual(1, items[0].id)
  112. self.assertEqual("pingou", items[0].user)
  113. self.assertEqual("pingou", items[0].username)
  114. self.assertEqual([], items[0].groups)
  115. self.assertEqual(
  116. sorted(["bar@pingou.com", "foo@pingou.com"]),
  117. sorted([email.email for email in items[0].emails]),
  118. )
  119. self.assertEqual(3, items[1].id)
  120. self.assertEqual("pingou2", items[1].user)
  121. self.assertEqual("pingou2", items[1].username)
  122. self.assertEqual([], items[1].groups)
  123. self.assertEqual([], [email.email for email in items[1].emails])
  124. class PagureLibtests_search_projects(tests.Modeltests):
  125. """
  126. Test the pagure.lib search_projects() method
  127. """
  128. def setUp(self):
  129. super(PagureLibtests_search_projects, self).setUp()
  130. tests.create_projects(self.session)
  131. def test_search_projects_all(self):
  132. """
  133. Test the method returns all the projects for the given session
  134. """
  135. projects = pagure.lib.query.search_projects(self.session)
  136. self.assertEqual(len(projects), 3)
  137. self.assertEqual(projects[0].id, 1)
  138. self.assertEqual(projects[1].id, 2)
  139. def test_search_projects_username(self):
  140. """
  141. Test the method returns all the projects for the given username
  142. """
  143. projects = pagure.lib.query.search_projects(
  144. self.session, username="foo"
  145. )
  146. self.assertEqual(len(projects), 0)
  147. projects = pagure.lib.query.search_projects(
  148. self.session, username="pingou"
  149. )
  150. self.assertEqual(len(projects), 3)
  151. self.assertEqual(projects[0].id, 1)
  152. self.assertEqual(projects[1].id, 2)
  153. def test_search_projects_start(self):
  154. """
  155. Test the method returns all the projects for the given start
  156. """
  157. projects = pagure.lib.query.search_projects(self.session, start=1)
  158. self.assertEqual(len(projects), 2)
  159. self.assertEqual(projects[0].id, 2)
  160. def test_search_projects_limit(self):
  161. """
  162. Test the method returns all the projects for the given limit
  163. """
  164. projects = pagure.lib.query.search_projects(self.session, limit=1)
  165. self.assertEqual(len(projects), 1)
  166. self.assertEqual(projects[0].id, 1)
  167. def test_search_projects_count(self):
  168. """
  169. Test the method returns the count of the projects
  170. """
  171. projects = pagure.lib.query.search_projects(self.session, count=True)
  172. self.assertEqual(projects, 3)
  173. def test_search_projects_commit_access(self):
  174. """
  175. Test the method returns the project of user with only commit access
  176. """
  177. # Also check if the project shows up if a user doesn't
  178. # have admin access in the project
  179. # Check with commit access first
  180. project = pagure.lib.query.get_authorized_project(
  181. self.session, project_name="test"
  182. )
  183. msg = pagure.lib.query.add_user_to_project(
  184. self.session,
  185. project=project,
  186. new_user="foo",
  187. user="pingou",
  188. access="commit",
  189. )
  190. self.assertEqual(msg, "User added")
  191. self.session.commit()
  192. projects = pagure.lib.query.search_projects(
  193. self.session, username="foo"
  194. )
  195. self.assertEqual(len(projects), 1)
  196. def test_search_projects_ticket_access(self):
  197. """
  198. Test the method does not returns the project of user with only ticket access
  199. """
  200. # Now check with only ticket access
  201. project = pagure.lib.query.get_authorized_project(
  202. self.session, project_name="test"
  203. )
  204. msg = pagure.lib.query.add_user_to_project(
  205. self.session,
  206. project=project,
  207. new_user="foo",
  208. user="pingou",
  209. access="ticket",
  210. )
  211. self.assertEqual(msg, "User added")
  212. self.session.commit()
  213. projects = pagure.lib.query.search_projects(
  214. self.session, username="foo"
  215. )
  216. self.assertEqual(len(projects), 0)
  217. def test_search_project_forked(self):
  218. """
  219. Test the search_project for forked projects in pagure.lib.query.
  220. """
  221. # Create two forked repo
  222. item = pagure.lib.model.Project(
  223. user_id=2, # foo
  224. name="test",
  225. description="test project #1",
  226. is_fork=True,
  227. parent_id=1,
  228. hook_token="aaabbbttt",
  229. )
  230. self.session.add(item)
  231. item = pagure.lib.model.Project(
  232. user_id=2, # foo
  233. name="test2",
  234. description="test project #2",
  235. is_fork=True,
  236. parent_id=2,
  237. hook_token="aaabbbuuu",
  238. )
  239. self.session.add(item)
  240. # Since we have two forks, let's search them
  241. projects = pagure.lib.query.search_projects(self.session, fork=True)
  242. self.assertEqual(len(projects), 2)
  243. projects = pagure.lib.query.search_projects(self.session, fork=False)
  244. self.assertEqual(len(projects), 3)
  245. def test_search_projects_private(self):
  246. """
  247. Test the method for private projects
  248. """
  249. item = pagure.lib.model.Project(
  250. user_id=1, # pingou
  251. name="private_test",
  252. description="Private test project #1",
  253. hook_token="aaabbbcccpp",
  254. private=True,
  255. )
  256. self.session.add(item)
  257. self.session.commit()
  258. # non authenticated
  259. projects = pagure.lib.query.search_projects(self.session)
  260. self.assertEqual(len(projects), 3)
  261. self.assertEqual(
  262. [p.path for p in projects],
  263. ["test.git", "test2.git", "somenamespace/test3.git"],
  264. )
  265. # non authenticated
  266. projects = pagure.lib.query.search_projects(
  267. self.session, username="pingou"
  268. )
  269. self.assertEqual(len(projects), 3)
  270. self.assertEqual(
  271. [p.path for p in projects],
  272. ["test.git", "test2.git", "somenamespace/test3.git"],
  273. )
  274. # authenticated as pingou
  275. projects = pagure.lib.query.search_projects(
  276. self.session, username="pingou", private="pingou"
  277. )
  278. self.assertEqual(len(projects), 4)
  279. self.assertEqual(
  280. [p.path for p in projects],
  281. [
  282. "private_test.git",
  283. "test.git",
  284. "test2.git",
  285. "somenamespace/test3.git",
  286. ],
  287. )
  288. # authenticated as foo
  289. projects = pagure.lib.query.search_projects(
  290. self.session, username="pingou", private="foo"
  291. )
  292. self.assertEqual(len(projects), 3)
  293. self.assertEqual(
  294. [p.path for p in projects],
  295. ["test.git", "test2.git", "somenamespace/test3.git"],
  296. )
  297. def test_search_projects_tags(self):
  298. """
  299. Test the method returns all the projects for the given tags
  300. """
  301. # Add tags to the project
  302. project = pagure.lib.query._get_project(self.session, "test")
  303. tag = pagure.lib.model.Tag(tag="fedora")
  304. self.session.add(tag)
  305. self.session.commit()
  306. tp = pagure.lib.model.TagProject(project_id=project.id, tag="fedora")
  307. self.session.add(tp)
  308. self.session.commit()
  309. projects = pagure.lib.query.search_projects(
  310. self.session, tags="fedora"
  311. )
  312. self.assertEqual(len(projects), 1)
  313. self.assertEqual(projects[0].path, "test.git")
  314. def test_search_projects_pattern(self):
  315. """
  316. Test the method returns all the projects for the given pattern
  317. """
  318. projects = pagure.lib.query.search_projects(
  319. self.session, pattern="test*"
  320. )
  321. self.assertEqual(len(projects), 3)
  322. self.assertEqual(
  323. [p.path for p in projects],
  324. ["test.git", "test2.git", "somenamespace/test3.git"],
  325. )
  326. def test_search_projects_sort(self):
  327. """
  328. Test the method returns all the projects sorted by lastest and oldest
  329. """
  330. projects = pagure.lib.query.search_projects(
  331. self.session, pattern="*", sort="latest"
  332. )
  333. self.assertEqual(len(projects), 3)
  334. self.assertEqual(
  335. [p.path for p in projects],
  336. ["somenamespace/test3.git", "test2.git", "test.git"],
  337. )
  338. projects = pagure.lib.query.search_projects(
  339. self.session, pattern="*", sort="oldest"
  340. )
  341. self.assertEqual(len(projects), 3)
  342. self.assertEqual(
  343. [p.path for p in projects],
  344. ["test.git", "test2.git", "somenamespace/test3.git"],
  345. )
  346. class PagureLibtests(tests.Modeltests):
  347. """ Tests for pagure.lib """
  348. maxDiff = None
  349. def test_get_repotypes(self):
  350. """ Test the get_repotypes function of pagure.lib.query. """
  351. with patch.dict(
  352. pagure.config.config,
  353. {"ENABLE_TICKETS": False, "ENABLE_DOCS": False},
  354. ):
  355. self.assertEqual(
  356. set(("main", "requests")),
  357. set(pagure.lib.query.get_repotypes()),
  358. )
  359. with patch.dict(
  360. pagure.config.config,
  361. {"ENABLE_TICKETS": True, "ENABLE_DOCS": False},
  362. ):
  363. self.assertEqual(
  364. set(("main", "requests", "tickets")),
  365. set(pagure.lib.query.get_repotypes()),
  366. )
  367. with patch.dict(
  368. pagure.config.config,
  369. {"ENABLE_TICKETS": False, "ENABLE_DOCS": True},
  370. ):
  371. self.assertEqual(
  372. set(("main", "requests", "docs")),
  373. set(pagure.lib.query.get_repotypes()),
  374. )
  375. with patch.dict(
  376. pagure.config.config, {"ENABLE_TICKETS": True, "ENABLE_DOCS": True}
  377. ):
  378. self.assertEqual(
  379. set(("main", "requests", "tickets", "docs")),
  380. set(pagure.lib.query.get_repotypes()),
  381. )
  382. def test_get_next_id(self):
  383. """ Test the get_next_id function of pagure.lib.query. """
  384. tests.create_projects(self.session)
  385. self.assertEqual(1, pagure.lib.query.get_next_id(self.session, 1))
  386. @patch("pagure.lib.git.update_git")
  387. @patch("pagure.lib.notify.send_email")
  388. def test_new_issue(self, p_send_email, p_ugt):
  389. """ Test the new_issue of pagure.lib.query. """
  390. p_send_email.return_value = True
  391. p_ugt.return_value = True
  392. tests.create_projects(self.session)
  393. repo = pagure.lib.query._get_project(self.session, "test")
  394. # Set some priorities to the project
  395. repo.priorities = {"1": "High", "2": "Normal"}
  396. self.session.add(repo)
  397. self.session.commit()
  398. # Before
  399. issues = pagure.lib.query.search_issues(self.session, repo)
  400. self.assertEqual(len(issues), 0)
  401. self.assertEqual(repo.open_tickets, 0)
  402. self.assertEqual(repo.open_tickets_public, 0)
  403. # See where it fails
  404. self.assertRaises(
  405. pagure.exceptions.PagureException,
  406. pagure.lib.query.new_issue,
  407. session=self.session,
  408. repo=repo,
  409. title="Test issue",
  410. content="We should work on this",
  411. user="blah",
  412. )
  413. # Fails since we're trying to give a non-existant priority
  414. self.assertRaises(
  415. pagure.exceptions.PagureException,
  416. pagure.lib.query.new_issue,
  417. session=self.session,
  418. repo=repo,
  419. title="Test issue",
  420. content="We should work on this",
  421. user="pingou",
  422. priority=0,
  423. )
  424. # Add an extra user to project `foo`
  425. repo = pagure.lib.query._get_project(self.session, "test")
  426. msg = pagure.lib.query.add_user_to_project(
  427. session=self.session, project=repo, new_user="foo", user="pingou"
  428. )
  429. self.session.commit()
  430. self.assertEqual(msg, "User added")
  431. # Try adding again this extra user to project `foo`
  432. self.assertRaises(
  433. pagure.exceptions.PagureException,
  434. pagure.lib.query.add_user_to_project,
  435. session=self.session,
  436. project=repo,
  437. new_user="foo",
  438. user="pingou",
  439. )
  440. self.session.commit()
  441. self.assertEqual(msg, "User added")
  442. # Create issues to play with
  443. msg = pagure.lib.query.new_issue(
  444. session=self.session,
  445. repo=repo,
  446. title="Test issue",
  447. content="We should work on this",
  448. user="pingou",
  449. )
  450. self.session.commit()
  451. self.assertEqual(msg.title, "Test issue")
  452. self.assertEqual(repo.open_tickets, 1)
  453. self.assertEqual(repo.open_tickets_public, 1)
  454. msg = pagure.lib.query.new_issue(
  455. session=self.session,
  456. repo=repo,
  457. title="Test issue #2",
  458. content="We should work on this for the second time",
  459. user="foo",
  460. status="Open",
  461. )
  462. self.session.commit()
  463. self.assertEqual(msg.title, "Test issue #2")
  464. self.assertEqual(repo.open_tickets, 2)
  465. self.assertEqual(repo.open_tickets_public, 2)
  466. # After
  467. issues = pagure.lib.query.search_issues(self.session, repo)
  468. self.assertEqual(len(issues), 2)
  469. @patch("pagure.lib.git.update_git")
  470. @patch("pagure.lib.notify.send_email")
  471. def test_edit_issue(self, p_send_email, p_ugt):
  472. """ Test the edit_issue of pagure.lib.query. """
  473. p_send_email.return_value = True
  474. p_ugt.return_value = True
  475. self.test_new_issue()
  476. repo = pagure.lib.query._get_project(self.session, "test")
  477. issue = pagure.lib.query.search_issues(self.session, repo, issueid=2)
  478. self.assertEqual(repo.open_tickets, 2)
  479. self.assertEqual(repo.open_tickets_public, 2)
  480. # Edit the issue
  481. msg = pagure.lib.query.edit_issue(
  482. session=self.session, issue=issue, user="pingou"
  483. )
  484. self.session.commit()
  485. self.assertEqual(msg, None)
  486. msg = pagure.lib.query.edit_issue(
  487. session=self.session,
  488. issue=issue,
  489. user="pingou",
  490. title="Test issue #2",
  491. content="We should work on this for the second time",
  492. status="Open",
  493. )
  494. self.session.commit()
  495. self.assertEqual(msg, None)
  496. msg = pagure.lib.query.edit_issue(
  497. session=self.session,
  498. issue=issue,
  499. user="pingou",
  500. title="Foo issue #2",
  501. content="We should work on this period",
  502. status="Closed",
  503. close_status="Invalid",
  504. private=True,
  505. )
  506. self.session.commit()
  507. self.assertEqual(
  508. msg,
  509. [
  510. "Issue status updated to: Closed (was: Open)",
  511. "Issue close_status updated to: Invalid",
  512. "Issue private status set to: True",
  513. ],
  514. )
  515. msg = pagure.lib.query.edit_issue(
  516. session=self.session,
  517. issue=issue,
  518. user="pingou",
  519. title="Foo issue #2",
  520. content="Fixed!",
  521. status="Closed",
  522. close_status="Fixed",
  523. private=False,
  524. )
  525. self.session.commit()
  526. self.assertEqual(
  527. msg,
  528. [
  529. "Issue close_status updated to: Fixed (was: Invalid)",
  530. "Issue private status set to: False (was: True)",
  531. ],
  532. )
  533. repo = pagure.lib.query._get_project(self.session, "test")
  534. self.assertEqual(repo.open_tickets, 1)
  535. self.assertEqual(repo.open_tickets_public, 1)
  536. self.assertEqual(repo.issues[1].status, "Closed")
  537. self.assertEqual(repo.issues[1].close_status, "Fixed")
  538. # Edit the status: re-open the ticket
  539. msg = pagure.lib.query.edit_issue(
  540. session=self.session,
  541. issue=issue,
  542. user="pingou",
  543. status="Open",
  544. private=True,
  545. )
  546. self.session.commit()
  547. self.assertEqual(
  548. msg,
  549. [
  550. "Issue status updated to: Open (was: Closed)",
  551. "Issue private status set to: True",
  552. ],
  553. )
  554. repo = pagure.lib.query._get_project(self.session, "test")
  555. for issue in repo.issues:
  556. self.assertEqual(issue.status, "Open")
  557. self.assertEqual(issue.close_status, None)
  558. # 2 open but one of them is private
  559. self.assertEqual(repo.open_tickets, 2)
  560. self.assertEqual(repo.open_tickets_public, 1)
  561. # Edit the status: re-close the ticket
  562. msg = pagure.lib.query.edit_issue(
  563. session=self.session,
  564. issue=issue,
  565. user="pingou",
  566. status="Closed",
  567. close_status="Invalid",
  568. private=True,
  569. )
  570. self.session.commit()
  571. self.assertEqual(
  572. msg,
  573. [
  574. "Issue status updated to: Closed (was: Open)",
  575. "Issue close_status updated to: Invalid",
  576. ],
  577. )
  578. repo = pagure.lib.query._get_project(self.session, "test")
  579. self.assertEqual(repo.open_tickets, 1)
  580. self.assertEqual(repo.open_tickets_public, 1)
  581. self.assertEqual(repo.issues[1].status, "Closed")
  582. self.assertEqual(repo.issues[1].close_status, "Invalid")
  583. @patch("pagure.lib.git.update_git")
  584. @patch("pagure.lib.notify.send_email")
  585. def test_edit_issue_close_status(self, p_send_email, p_ugt):
  586. """ Test the edit_issue of pagure.lib.query. """
  587. p_send_email.return_value = True
  588. p_ugt.return_value = True
  589. self.test_new_issue()
  590. repo = pagure.lib.query._get_project(self.session, "test")
  591. issue = pagure.lib.query.search_issues(self.session, repo, issueid=2)
  592. self.assertEqual(issue.status, "Open")
  593. self.assertEqual(issue.close_status, None)
  594. repo = pagure.lib.query._get_project(self.session, "test")
  595. self.assertEqual(repo.open_tickets, 2)
  596. self.assertEqual(repo.open_tickets_public, 2)
  597. # Edit the issue, providing just a close_status should also close
  598. # the ticket
  599. msg = pagure.lib.query.edit_issue(
  600. session=self.session,
  601. issue=issue,
  602. user="pingou",
  603. close_status="Fixed",
  604. )
  605. self.session.commit()
  606. self.assertEqual(msg, ["Issue close_status updated to: Fixed"])
  607. issue = pagure.lib.query.search_issues(self.session, repo, issueid=2)
  608. self.assertEqual(issue.status, "Closed")
  609. self.assertEqual(issue.close_status, "Fixed")
  610. repo = pagure.lib.query._get_project(self.session, "test")
  611. self.assertEqual(repo.open_tickets, 1)
  612. self.assertEqual(repo.open_tickets_public, 1)
  613. # Edit the issue, editing the status to open, should reset the
  614. # close_status
  615. msg = pagure.lib.query.edit_issue(
  616. session=self.session, issue=issue, user="pingou", status="Open"
  617. )
  618. self.session.commit()
  619. self.assertEqual(msg, ["Issue status updated to: Open (was: Closed)"])
  620. issue = pagure.lib.query.search_issues(self.session, repo, issueid=2)
  621. self.assertEqual(issue.status, "Open")
  622. self.assertEqual(issue.close_status, None)
  623. repo = pagure.lib.query._get_project(self.session, "test")
  624. self.assertEqual(repo.open_tickets, 2)
  625. self.assertEqual(repo.open_tickets_public, 2)
  626. @patch("pagure.lib.git.update_git")
  627. @patch("pagure.lib.notify.send_email")
  628. def test_edit_issue_priority(self, p_send_email, p_ugt):
  629. """Test the edit_issue of pagure.lib when changing the priority."""
  630. p_send_email.return_value = True
  631. p_ugt.return_value = True
  632. self.test_new_issue()
  633. repo = pagure.lib.query.get_authorized_project(self.session, "test")
  634. issue = pagure.lib.query.search_issues(self.session, repo, issueid=2)
  635. # Set some priorities to the repo
  636. repo = pagure.lib.query.get_authorized_project(self.session, "test")
  637. repo.priorities = {"1": "High", "2": "Normal"}
  638. self.session.add(repo)
  639. self.session.commit()
  640. self.assertEqual(repo.open_tickets, 2)
  641. self.assertEqual(repo.open_tickets_public, 2)
  642. # Edit the issue -- Wrong priority value: No changes
  643. msg = pagure.lib.query.edit_issue(
  644. session=self.session, issue=issue, user="pingou", priority=3
  645. )
  646. self.session.commit()
  647. self.assertEqual(msg, None)
  648. # Edit the issue -- Good priority
  649. msg = pagure.lib.query.edit_issue(
  650. session=self.session, issue=issue, user="pingou", priority=2
  651. )
  652. self.session.commit()
  653. self.assertEqual(msg, ["Issue priority set to: Normal"])
  654. # Edit the issue -- Update priority
  655. msg = pagure.lib.query.edit_issue(
  656. session=self.session, issue=issue, user="pingou", priority=1
  657. )
  658. self.session.commit()
  659. self.assertEqual(msg, ["Issue priority set to: High (was: Normal)"])
  660. @patch("pagure.lib.git.update_git")
  661. @patch("pagure.lib.notify.send_email")
  662. def test_edit_issue_depending(self, p_send_email, p_ugt):
  663. """Test the edit_issue of pagure.lib when the issue depends on
  664. another.
  665. """
  666. p_send_email.return_value = True
  667. p_ugt.return_value = True
  668. tests.create_projects(self.session)
  669. repo = pagure.lib.query.get_authorized_project(self.session, "test")
  670. # Create 3 issues
  671. msg = pagure.lib.query.new_issue(
  672. session=self.session,
  673. repo=repo,
  674. title="Test issue #1",
  675. content="We should work on this for the second time",
  676. user="foo",
  677. status="Open",
  678. )
  679. self.session.commit()
  680. self.assertEqual(msg.title, "Test issue #1")
  681. msg = pagure.lib.query.new_issue(
  682. session=self.session,
  683. repo=repo,
  684. title="Test issue #2",
  685. content="We should work on this for the second time",
  686. user="foo",
  687. status="Open",
  688. )
  689. self.session.commit()
  690. self.assertEqual(msg.title, "Test issue #2")
  691. msg = pagure.lib.query.new_issue(
  692. session=self.session,
  693. repo=repo,
  694. title="Test issue #3",
  695. content="We should work on this for the second time",
  696. user="foo",
  697. status="Open",
  698. )
  699. self.session.commit()
  700. self.assertEqual(msg.title, "Test issue #3")
  701. issue = pagure.lib.query.search_issues(self.session, repo, issueid=2)
  702. self.assertEqual(repo.open_tickets, 3)
  703. self.assertEqual(repo.open_tickets_public, 3)
  704. # Make issue #2 blocking on issue #1
  705. msgs = pagure.lib.query.update_blocked_issue(
  706. self.session, repo, issue, blocks=["1"], username="pingou"
  707. )
  708. self.assertEqual(msgs, ["Issue marked as blocking: #1"])
  709. # Make issue #2 depend on issue #3
  710. msgs = pagure.lib.query.update_dependency_issue(
  711. self.session, repo, issue, depends=["3"], username="pingou"
  712. )
  713. self.assertEqual(msgs, ["Issue marked as depending on: #3"])
  714. # Edit the issue #3
  715. issue = pagure.lib.query.search_issues(self.session, repo, issueid=3)
  716. msg = pagure.lib.query.edit_issue(
  717. session=self.session, issue=issue, user="pingou"
  718. )
  719. self.session.commit()
  720. self.assertEqual(msg, None)
  721. msg = pagure.lib.query.edit_issue(
  722. session=self.session,
  723. issue=issue,
  724. user="pingou",
  725. title="Foo issue #2",
  726. content="We should work on this period",
  727. status="Closed",
  728. close_status="Invalid",
  729. private=True,
  730. )
  731. self.session.commit()
  732. self.assertEqual(
  733. msg,
  734. [
  735. "Issue status updated to: Closed (was: Open)",
  736. "Issue close_status updated to: Invalid",
  737. "Issue private status set to: True",
  738. ],
  739. )
  740. self.assertEqual(repo.open_tickets, 2)
  741. self.assertEqual(repo.open_tickets_public, 2)
  742. @patch("pagure.lib.query.REDIS", MagicMock(return_value=True))
  743. @patch("pagure.lib.git.update_git", MagicMock(return_value=True))
  744. @patch("pagure.lib.notify.send_email", MagicMock(return_value=True))
  745. def test_add_issue_dependency(self):
  746. """ Test the add_issue_dependency of pagure.lib.query. """
  747. self.test_new_issue()
  748. repo = pagure.lib.query._get_project(self.session, "test")
  749. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  750. issue_blocked = pagure.lib.query.search_issues(
  751. self.session, repo, issueid=2
  752. )
  753. # Before
  754. self.assertEqual(issue.parents, [])
  755. self.assertEqual(issue.children, [])
  756. self.assertEqual(issue_blocked.parents, [])
  757. self.assertEqual(issue_blocked.children, [])
  758. self.assertRaises(
  759. pagure.exceptions.PagureException,
  760. pagure.lib.query.add_issue_dependency,
  761. session=self.session,
  762. issue=issue,
  763. issue_blocked=issue,
  764. user="pingou",
  765. )
  766. msg = pagure.lib.query.add_issue_dependency(
  767. session=self.session,
  768. issue=issue,
  769. issue_blocked=issue_blocked,
  770. user="pingou",
  771. )
  772. self.session.commit()
  773. self.assertEqual(msg, "Issue marked as depending on: #2")
  774. # After
  775. self.assertEqual(len(issue.parents), 0)
  776. self.assertEqual(issue.parents, [])
  777. self.assertEqual(len(issue.children), 1)
  778. self.assertEqual(issue.children[0].id, 2)
  779. self.assertEqual(issue.depending_text, [])
  780. self.assertEqual(issue.blocking_text, [2])
  781. self.assertEqual(len(issue_blocked.children), 0)
  782. self.assertEqual(issue_blocked.children, [])
  783. self.assertEqual(len(issue_blocked.parents), 1)
  784. self.assertEqual(issue_blocked.parents[0].id, 1)
  785. self.assertEqual(issue_blocked.depending_text, [1])
  786. self.assertEqual(issue_blocked.blocking_text, [])
  787. @patch("pagure.lib.query.REDIS")
  788. @patch("pagure.lib.git.update_git", MagicMock(return_value=True))
  789. @patch("pagure.lib.notify.send_email", MagicMock(return_value=True))
  790. def test_edit_comment(self, mock_redis):
  791. """ Test the edit_issue of pagure.lib.query. """
  792. mock_redis.return_value = True
  793. self.test_add_issue_comment()
  794. repo = pagure.lib.query._get_project(self.session, "test")
  795. self.assertEqual(repo.open_tickets, 2)
  796. self.assertEqual(repo.open_tickets_public, 2)
  797. self.assertEqual(mock_redis.publish.call_count, 0)
  798. # Before
  799. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  800. self.assertEqual(len(issue.comments), 1)
  801. self.assertEqual(issue.comments[0].comment, "Hey look a comment!")
  802. # Edit one of the
  803. msg = pagure.lib.query.edit_comment(
  804. session=self.session,
  805. parent=issue,
  806. comment=issue.comments[0],
  807. user="pingou",
  808. updated_comment="Edited comment",
  809. )
  810. self.session.commit()
  811. self.assertEqual(msg, "Comment updated")
  812. self.assertEqual(mock_redis.publish.call_count, 1)
  813. # After
  814. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  815. self.assertEqual(len(issue.comments), 1)
  816. self.assertEqual(issue.comments[0].comment, "Edited comment")
  817. @patch("pagure.lib.query.REDIS")
  818. @patch("pagure.lib.git.update_git", MagicMock(return_value=True))
  819. @patch("pagure.lib.notify.send_email", MagicMock(return_value=True))
  820. def test_edit_comment_private(self, mock_redis):
  821. """ Test the edit_issue of pagure.lib.query. """
  822. self.test_add_issue_comment_private()
  823. repo = pagure.lib.query._get_project(self.session, "test")
  824. self.assertEqual(repo.open_tickets, 1)
  825. self.assertEqual(repo.open_tickets_public, 0)
  826. self.assertEqual(mock_redis.publish.call_count, 0)
  827. # Before
  828. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  829. self.assertEqual(len(issue.comments), 1)
  830. self.assertEqual(issue.comments[0].comment, "Hey look a comment!")
  831. # Edit one of the
  832. msg = pagure.lib.query.edit_comment(
  833. session=self.session,
  834. parent=issue,
  835. comment=issue.comments[0],
  836. user="pingou",
  837. updated_comment="Edited comment",
  838. )
  839. self.session.commit()
  840. self.assertEqual(msg, "Comment updated")
  841. self.assertEqual(mock_redis.publish.call_count, 1)
  842. # After
  843. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  844. self.assertEqual(len(issue.comments), 1)
  845. self.assertEqual(issue.comments[0].comment, "Edited comment")
  846. @patch("pagure.lib.git.update_git", MagicMock(return_value=True))
  847. @patch("pagure.lib.notify.send_email", MagicMock(return_value=True))
  848. @patch("pagure.lib.query.REDIS")
  849. def test_add_tag_obj(self, mock_redis):
  850. """ Test the add_tag_obj of pagure.lib.query. """
  851. mock_redis.return_value = True
  852. self.test_edit_issue()
  853. repo = pagure.lib.query._get_project(self.session, "test")
  854. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  855. self.assertFalse(issue.private)
  856. self.assertFalse(issue.project.private)
  857. args = mock_redis.publish.call_args_list
  858. self.assertEqual(len(args), 4)
  859. # Add a tag to the issue
  860. msg = pagure.lib.query.add_tag_obj(
  861. session=self.session, obj=issue, tags="tag1", user="pingou"
  862. )
  863. self.session.commit()
  864. self.assertEqual(msg, "Issue tagged with: tag1")
  865. args = mock_redis.publish.call_args_list
  866. self.assertEqual(len(args), 5)
  867. # Get the arguments of the last call and get the second of these
  868. # arguments (the first one changing for each test run)
  869. self.assertJSONEqual(
  870. args[-1:][0][0][1],
  871. '{"added_tags_color": ["DeepSkyBlue"], "added_tags": ["tag1"]}',
  872. )
  873. # Try a second time
  874. msg = pagure.lib.query.add_tag_obj(
  875. session=self.session, obj=issue, tags="tag1", user="pingou"
  876. )
  877. self.session.commit()
  878. self.assertEqual(msg, "Nothing to add")
  879. issues = pagure.lib.query.search_issues(
  880. self.session, repo, tags="tag1"
  881. )
  882. self.assertEqual(len(issues), 1)
  883. self.assertEqual(issues[0].id, 1)
  884. self.assertEqual(issues[0].project_id, 1)
  885. self.assertEqual(issues[0].status, "Open")
  886. self.assertEqual([tag.tag for tag in issues[0].tags], ["tag1"])
  887. @patch("pagure.lib.git.update_git")
  888. @patch("pagure.lib.notify.send_email")
  889. def test_remove_tags(self, p_send_email, p_ugt):
  890. """ Test the remove_tags of pagure.lib.query. """
  891. p_send_email.return_value = True
  892. p_ugt.return_value = True
  893. self.test_add_tag_obj()
  894. repo = pagure.lib.query._get_project(self.session, "test")
  895. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  896. self.assertRaises(
  897. pagure.exceptions.PagureException,
  898. pagure.lib.query.remove_tags,
  899. session=self.session,
  900. project=repo,
  901. tags="foo",
  902. user="pingou",
  903. )
  904. msgs = pagure.lib.query.remove_tags(
  905. session=self.session, project=repo, tags="tag1", user="pingou"
  906. )
  907. self.assertEqual(msgs, ["Tag: tag1 has been deleted"])
  908. @patch("pagure.lib.query.REDIS", MagicMock(return_value=True))
  909. @patch("pagure.lib.git.update_git", MagicMock(return_value=True))
  910. @patch("pagure.lib.notify.send_email", MagicMock(return_value=True))
  911. def test_remove_tags_obj(self):
  912. """ Test the remove_tags_obj of pagure.lib.query. """
  913. self.test_add_tag_obj()
  914. repo = pagure.lib.query._get_project(self.session, "test")
  915. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  916. msgs = pagure.lib.query.remove_tags_obj(
  917. session=self.session, obj=issue, tags="tag1", user="pingou"
  918. )
  919. self.assertEqual(msgs, "Issue **un**tagged with: tag1")
  920. @patch("pagure.lib.git.update_git")
  921. @patch("pagure.lib.notify.send_email")
  922. def test_remove_tags_obj_from_project(self, p_send_email, p_ugt):
  923. """ Test the remove_tags_obj of pagure.lib from a project. """
  924. p_send_email.return_value = True
  925. p_ugt.return_value = True
  926. tests.create_projects(self.session)
  927. # Add a tag to the project
  928. repo = pagure.lib.query.get_authorized_project(self.session, "test")
  929. msg = pagure.lib.query.add_tag_obj(
  930. self.session, repo, tags=["pagure", "test"], user="pingou"
  931. )
  932. self.assertEqual(msg, "Project tagged with: pagure, test")
  933. self.session.commit()
  934. # Check the tags
  935. repo = pagure.lib.query.get_authorized_project(self.session, "test")
  936. self.assertEqual(repo.tags_text, ["pagure", "test"])
  937. # Remove one of the the tag
  938. msgs = pagure.lib.query.remove_tags_obj(
  939. session=self.session, obj=repo, tags="test", user="pingou"
  940. )
  941. self.assertEqual(msgs, "Project **un**tagged with: test")
  942. self.session.commit()
  943. # Check the tags
  944. repo = pagure.lib.query.get_authorized_project(self.session, "test")
  945. self.assertEqual(repo.tags_text, ["pagure"])
  946. @patch("pagure.lib.git.update_git")
  947. @patch("pagure.lib.notify.send_email")
  948. def test_edit_issue_tags(self, p_send_email, p_ugt):
  949. """ Test the edit_issue_tags of pagure.lib.query. """
  950. p_send_email.return_value = True
  951. p_ugt.return_value = True
  952. self.test_add_tag_obj()
  953. repo = pagure.lib.query._get_project(self.session, "test")
  954. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  955. self.assertRaises(
  956. pagure.exceptions.PagureException,
  957. pagure.lib.query.edit_issue_tags,
  958. session=self.session,
  959. project=repo,
  960. old_tag="foo",
  961. new_tag="bar",
  962. new_tag_description="lorem ipsum",
  963. new_tag_color="black",
  964. user="pingou",
  965. )
  966. self.assertRaises(
  967. pagure.exceptions.PagureException,
  968. pagure.lib.query.edit_issue_tags,
  969. session=self.session,
  970. project=repo,
  971. old_tag=None,
  972. new_tag="bar",
  973. new_tag_description="lorem ipsum",
  974. new_tag_color="black",
  975. user="pingou",
  976. )
  977. msgs = pagure.lib.query.edit_issue_tags(
  978. session=self.session,
  979. project=repo,
  980. old_tag="tag1",
  981. new_tag="tag2",
  982. new_tag_description="lorem ipsum",
  983. new_tag_color="black",
  984. user="pingou",
  985. )
  986. self.session.commit()
  987. self.assertEqual(
  988. msgs,
  989. ["Edited tag: tag1()[DeepSkyBlue] to tag2(lorem ipsum)[black]"],
  990. )
  991. # Try editing the tag without changing anything
  992. self.assertRaises(
  993. pagure.exceptions.PagureException,
  994. pagure.lib.query.edit_issue_tags,
  995. session=self.session,
  996. project=repo,
  997. old_tag="tag2",
  998. new_tag="tag2",
  999. new_tag_description="lorem ipsum",
  1000. new_tag_color="black",
  1001. user="pingou",
  1002. )
  1003. # Add a new tag
  1004. msg = pagure.lib.query.add_tag_obj(
  1005. session=self.session, obj=issue, tags="tag3", user="pingou"
  1006. )
  1007. self.session.commit()
  1008. self.assertEqual(msg, "Issue tagged with: tag3")
  1009. self.assertEqual([tag.tag for tag in issue.tags], ["tag2", "tag3"])
  1010. # Attempt to rename an existing tag into another existing one
  1011. self.assertRaises(
  1012. pagure.exceptions.PagureException,
  1013. pagure.lib.query.edit_issue_tags,
  1014. session=self.session,
  1015. project=repo,
  1016. old_tag="tag2",
  1017. new_tag="tag3",
  1018. new_tag_description="lorem ipsum",
  1019. new_tag_color="red",
  1020. user="pingou",
  1021. )
  1022. # Rename an existing tag
  1023. msgs = pagure.lib.query.edit_issue_tags(
  1024. session=self.session,
  1025. project=repo,
  1026. old_tag="tag2",
  1027. new_tag="tag4",
  1028. new_tag_description="ipsum lorem",
  1029. new_tag_color="purple",
  1030. user="pingou",
  1031. )
  1032. self.session.commit()
  1033. self.assertEqual(
  1034. msgs,
  1035. [
  1036. "Edited tag: tag2(lorem ipsum)[black] to tag4(ipsum lorem)[purple]"
  1037. ],
  1038. )
  1039. @patch("pagure.lib.git.update_git")
  1040. @patch("pagure.lib.notify.send_email")
  1041. def test_search_issues(self, p_send_email, p_ugt):
  1042. """ Test the search_issues of pagure.lib.query. """
  1043. p_send_email.return_value = True
  1044. p_ugt.return_value = True
  1045. self.test_edit_issue()
  1046. repo = pagure.lib.query._get_project(self.session, "test")
  1047. # All issues
  1048. issues = pagure.lib.query.search_issues(self.session, repo)
  1049. self.assertEqual(len(issues), 2)
  1050. self.assertEqual(issues[1].id, 1)
  1051. self.assertEqual(issues[1].project_id, 1)
  1052. self.assertEqual(issues[1].status, "Open")
  1053. self.assertEqual(issues[1].tags, [])
  1054. self.assertEqual(issues[0].id, 2)
  1055. self.assertEqual(issues[0].project_id, 1)
  1056. self.assertEqual(issues[0].status, "Closed")
  1057. self.assertEqual(issues[0].close_status, "Invalid")
  1058. self.assertEqual(issues[0].tags, [])
  1059. # Issues by status
  1060. issues = pagure.lib.query.search_issues(
  1061. self.session, repo, status="Closed"
  1062. )
  1063. self.assertEqual(len(issues), 1)
  1064. self.assertEqual(issues[0].id, 2)
  1065. self.assertEqual(issues[0].project_id, 1)
  1066. self.assertEqual(issues[0].status, "Closed")
  1067. self.assertEqual(issues[0].close_status, "Invalid")
  1068. self.assertEqual(issues[0].tags, [])
  1069. # Issues closed
  1070. issues = pagure.lib.query.search_issues(
  1071. self.session, repo, closed=True
  1072. )
  1073. self.assertEqual(len(issues), 1)
  1074. self.assertEqual(issues[0].id, 2)
  1075. self.assertEqual(issues[0].project_id, 1)
  1076. self.assertEqual(issues[0].status, "Closed")
  1077. self.assertEqual(issues[0].close_status, "Invalid")
  1078. self.assertEqual(issues[0].tags, [])
  1079. # Issues by tag
  1080. issues = pagure.lib.query.search_issues(self.session, repo, tags="foo")
  1081. self.assertEqual(len(issues), 0)
  1082. issues = pagure.lib.query.search_issues(
  1083. self.session, repo, tags="!foo"
  1084. )
  1085. self.assertEqual(len(issues), 2)
  1086. # Issue by id
  1087. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  1088. self.assertEqual(issue.title, "Test issue")
  1089. self.assertEqual(issue.user.user, "pingou")
  1090. self.assertEqual(issue.tags, [])
  1091. # Issues by authors
  1092. issues = pagure.lib.query.search_issues(
  1093. self.session, repo, author="foo"
  1094. )
  1095. self.assertEqual(len(issues), 1)
  1096. self.assertEqual(issues[0].id, 2)
  1097. self.assertEqual(issues[0].project_id, 1)
  1098. self.assertEqual(issues[0].status, "Closed")
  1099. self.assertEqual(issues[0].close_status, "Invalid")
  1100. self.assertEqual(issues[0].tags, [])
  1101. # Issues by assignee
  1102. issues = pagure.lib.query.search_issues(
  1103. self.session, repo, assignee="foo"
  1104. )
  1105. self.assertEqual(len(issues), 0)
  1106. issues = pagure.lib.query.search_issues(
  1107. self.session, repo, assignee="!foo"
  1108. )
  1109. self.assertEqual(len(issues), 2)
  1110. issues = pagure.lib.query.search_issues(
  1111. self.session, repo, private="foo"
  1112. )
  1113. self.assertEqual(len(issues), 2)
  1114. @patch("pagure.lib.query.REDIS", MagicMock(return_value=True))
  1115. @patch("pagure.lib.git.update_git", MagicMock(return_value=True))
  1116. @patch("pagure.lib.notify.send_email", MagicMock(return_value=True))
  1117. def test_add_issue_assignee(self):
  1118. """ Test the add_issue_assignee of pagure.lib.query. """
  1119. self.test_new_issue()
  1120. repo = pagure.lib.query._get_project(self.session, "test")
  1121. issue = pagure.lib.query.search_issues(self.session, repo, issueid=2)
  1122. # Before
  1123. issues = pagure.lib.query.search_issues(
  1124. self.session, repo, assignee="pingou"
  1125. )
  1126. self.assertEqual(len(issues), 0)
  1127. # Test when it fails
  1128. self.assertRaises(
  1129. pagure.exceptions.PagureException,
  1130. pagure.lib.query.add_issue_assignee,
  1131. session=self.session,
  1132. issue=issue,
  1133. assignee="foo@foobar.com",
  1134. user="foo@pingou.com",
  1135. )
  1136. self.assertRaises(
  1137. pagure.exceptions.PagureException,
  1138. pagure.lib.query.add_issue_assignee,
  1139. session=self.session,
  1140. issue=issue,
  1141. assignee="foo@bar.com",
  1142. user="foo@foopingou.com",
  1143. )
  1144. # Set the assignee by its email
  1145. msg = pagure.lib.query.add_issue_assignee(
  1146. session=self.session,
  1147. issue=issue,
  1148. assignee="foo@bar.com",
  1149. user="foo@pingou.com",
  1150. )
  1151. self.session.commit()
  1152. self.assertEqual(msg, "Issue assigned to foo@bar.com")
  1153. # Change the assignee to someone else by its username
  1154. msg = pagure.lib.query.add_issue_assignee(
  1155. session=self.session, issue=issue, assignee="pingou", user="pingou"
  1156. )
  1157. self.session.commit()
  1158. self.assertEqual(msg, "Issue assigned to pingou (was: foo)")
  1159. # After -- Searches by assignee
  1160. issues = pagure.lib.query.search_issues(
  1161. self.session, repo, assignee="pingou"
  1162. )
  1163. self.assertEqual(len(issues), 1)
  1164. self.assertEqual(issues[0].id, 2)
  1165. self.assertEqual(issues[0].project_id, 1)
  1166. self.assertEqual(issues[0].status, "Open")
  1167. self.assertEqual(issues[0].tags, [])
  1168. issues = pagure.lib.query.search_issues(
  1169. self.session, repo, assignee=True
  1170. )
  1171. self.assertEqual(len(issues), 1)
  1172. self.assertEqual(issues[0].id, 2)
  1173. self.assertEqual(issues[0].title, "Test issue #2")
  1174. self.assertEqual(issues[0].project_id, 1)
  1175. self.assertEqual(issues[0].status, "Open")
  1176. self.assertEqual(issues[0].tags, [])
  1177. issues = pagure.lib.query.search_issues(
  1178. self.session, repo, assignee=False
  1179. )
  1180. self.assertEqual(len(issues), 1)
  1181. self.assertEqual(issues[0].id, 1)
  1182. self.assertEqual(issues[0].title, "Test issue")
  1183. self.assertEqual(issues[0].project_id, 1)
  1184. self.assertEqual(issues[0].status, "Open")
  1185. self.assertEqual(issues[0].tags, [])
  1186. # Reset the assignee to no-one
  1187. msg = pagure.lib.query.add_issue_assignee(
  1188. session=self.session, issue=issue, assignee=None, user="pingou"
  1189. )
  1190. self.session.commit()
  1191. self.assertEqual(msg, "Assignee reset")
  1192. issues = pagure.lib.query.search_issues(
  1193. self.session, repo, assignee=False
  1194. )
  1195. self.assertEqual(len(issues), 2)
  1196. self.assertEqual(issues[0].id, 2)
  1197. self.assertEqual(issues[1].id, 1)
  1198. issues = pagure.lib.query.search_issues(
  1199. self.session, repo, assignee=True
  1200. )
  1201. self.assertEqual(len(issues), 0)
  1202. @patch("pagure.lib.query.REDIS", MagicMock(return_value=True))
  1203. @patch("pagure.lib.git.update_git", MagicMock(return_value=True))
  1204. @patch("pagure.lib.notify.send_email", MagicMock(return_value=True))
  1205. def test_add_issue_comment(self):
  1206. """ Test the add_issue_comment of pagure.lib.query. """
  1207. self.test_new_issue()
  1208. repo = pagure.lib.query._get_project(self.session, "test")
  1209. # Before
  1210. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  1211. self.assertEqual(len(issue.comments), 0)
  1212. # Set the assignee by its email
  1213. msg = pagure.lib.query.add_issue_assignee(
  1214. session=self.session,
  1215. issue=issue,
  1216. assignee="foo@bar.com",
  1217. user="foo@pingou.com",
  1218. )
  1219. self.session.commit()
  1220. self.assertEqual(msg, "Issue assigned to foo@bar.com")
  1221. # Add a comment to that issue
  1222. msg = pagure.lib.query.add_issue_comment(
  1223. session=self.session,
  1224. issue=issue,
  1225. comment="Hey look a comment!",
  1226. user="foo",
  1227. )
  1228. self.session.commit()
  1229. self.assertEqual(msg, "Comment added")
  1230. # After
  1231. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  1232. self.assertEqual(len(issue.comments), 1)
  1233. self.assertEqual(issue.comments[0].comment, "Hey look a comment!")
  1234. self.assertEqual(issue.comments[0].user.user, "foo")
  1235. @patch("pagure.lib.query.REDIS", MagicMock(return_value=True))
  1236. @patch("pagure.lib.git.update_git", MagicMock(return_value=True))
  1237. @patch("pagure.lib.notify.send_email", MagicMock(return_value=True))
  1238. def test_add_issue_comment_private(self):
  1239. """ Test the add_issue_comment of pagure.lib.query. """
  1240. tests.create_projects(self.session)
  1241. project = pagure.lib.query._get_project(self.session, "test")
  1242. msg = pagure.lib.query.new_issue(
  1243. session=self.session,
  1244. repo=project,
  1245. title="Test issue #1",
  1246. content="We should work on this for the second time",
  1247. user="foo",
  1248. status="Open",
  1249. private=True,
  1250. )
  1251. self.session.commit()
  1252. self.assertEqual(msg.title, "Test issue #1")
  1253. self.assertEqual(project.open_tickets, 1)
  1254. self.assertEqual(project.open_tickets_public, 0)
  1255. # Before
  1256. issue = pagure.lib.query.search_issues(
  1257. self.session, project, issueid=1
  1258. )
  1259. self.assertEqual(len(issue.comments), 0)
  1260. # Add a comment to that issue
  1261. msg = pagure.lib.query.add_issue_comment(
  1262. session=self.session,
  1263. issue=issue,
  1264. comment="Hey look a comment!",
  1265. user="foo",
  1266. )
  1267. self.session.commit()
  1268. self.assertEqual(msg, "Comment added")
  1269. # After
  1270. issue = pagure.lib.query.search_issues(
  1271. self.session, project, issueid=1
  1272. )
  1273. self.assertEqual(len(issue.comments), 1)
  1274. self.assertEqual(issue.comments[0].comment, "Hey look a comment!")
  1275. self.assertEqual(issue.comments[0].user.user, "foo")
  1276. @patch("pagure.lib.notify.send_email")
  1277. def test_add_user_to_project(self, p_send_email):
  1278. """ Test the add_user_to_project of pagure.lib.query. """
  1279. p_send_email.return_value = True
  1280. tests.create_projects(self.session)
  1281. # Before
  1282. repo = pagure.lib.query._get_project(self.session, "test")
  1283. self.assertEqual(len(repo.users), 0)
  1284. # Add an user to a project
  1285. self.assertRaises(
  1286. pagure.exceptions.PagureException,
  1287. pagure.lib.query.add_user_to_project,
  1288. session=self.session,
  1289. project=repo,
  1290. new_user="foobar",
  1291. user="pingou",
  1292. )
  1293. msg = pagure.lib.query.add_user_to_project(
  1294. session=self.session, project=repo, new_user="foo", user="pingou"
  1295. )
  1296. self.session.commit()
  1297. self.assertEqual(msg, "User added")
  1298. # After
  1299. repo = pagure.lib.query._get_project(self.session, "test")
  1300. self.assertEqual(len(repo.users), 1)
  1301. self.assertEqual(repo.users[0].user, "foo")
  1302. self.assertEqual(repo.admins[0].user, "foo")
  1303. # Try adding the same user with the same access
  1304. self.assertRaises(
  1305. pagure.exceptions.PagureException,
  1306. pagure.lib.query.add_user_to_project,
  1307. session=self.session,
  1308. project=repo,
  1309. new_user="foo",
  1310. user="pingou",
  1311. access="admin",
  1312. )
  1313. # Update the access of the user
  1314. msg = pagure.lib.query.add_user_to_project(
  1315. session=self.session,
  1316. project=repo,
  1317. new_user="foo",
  1318. user="pingou",
  1319. access="commit",
  1320. )
  1321. self.session.commit()
  1322. self.assertEqual(msg, "User access updated")
  1323. self.assertEqual(len(repo.users), 1)
  1324. self.assertEqual(repo.users[0].user, "foo")
  1325. self.assertEqual(repo.committers[0].user, "foo")
  1326. def test_new_project(self):
  1327. """ Test the new_project of pagure.lib.query. """
  1328. gitfolder = os.path.join(self.path, "repos")
  1329. docfolder = os.path.join(gitfolder, "docs")
  1330. ticketfolder = os.path.join(gitfolder, "tickets")
  1331. requestfolder = os.path.join(gitfolder, "requests")
  1332. # Try creating a blacklisted project
  1333. self.assertRaises(
  1334. pagure.exceptions.ProjectBlackListedException,
  1335. pagure.lib.query.new_project,
  1336. session=self.session,
  1337. user="pingou",
  1338. name="static",
  1339. repospanner_region=None,
  1340. blacklist=["static"],
  1341. allowed_prefix=[],
  1342. description="description for static",
  1343. parent_id=None,
  1344. )
  1345. # Try creating a project that's blacklisted by wildcard match
  1346. self.assertRaises(
  1347. pagure.exceptions.ProjectBlackListedException,
  1348. pagure.lib.query.new_project,
  1349. session=self.session,
  1350. user="pingou",
  1351. namespace="space",
  1352. name="static",
  1353. repospanner_region=None,
  1354. blacklist=["space/*"],
  1355. allowed_prefix=["space"],
  1356. description="description for static",
  1357. parent_id=None,
  1358. )
  1359. # Try creating a 40 chars project
  1360. self.assertRaises(
  1361. pagure.exceptions.PagureException,
  1362. pagure.lib.query.new_project,
  1363. session=self.session,
  1364. user="pingou",
  1365. name="s" * 40,
  1366. namespace="pingou",
  1367. repospanner_region=None,
  1368. blacklist=["static"],
  1369. allowed_prefix=["pingou"],
  1370. description="description for 40 chars length project",
  1371. parent_id=None,
  1372. prevent_40_chars=True,
  1373. )
  1374. # Create a new project
  1375. pagure.config.config["GIT_FOLDER"] = gitfolder
  1376. task = pagure.lib.query.new_project(
  1377. session=self.session,
  1378. user="pingou",
  1379. name="testproject",
  1380. repospanner_region=None,
  1381. blacklist=[],
  1382. allowed_prefix=[],
  1383. description="description for testproject",
  1384. parent_id=None,
  1385. )
  1386. self.session.commit()
  1387. self.assertEqual(
  1388. task.get(),
  1389. {
  1390. "endpoint": "ui_ns.view_repo",
  1391. "repo": "testproject",
  1392. "namespace": None,
  1393. },
  1394. )
  1395. # Try creating an existing project using a different case
  1396. self.assertRaises(
  1397. pagure.exceptions.PagureException,
  1398. pagure.lib.query.new_project,
  1399. session=self.session,
  1400. user="pingou",
  1401. name="TestProject",
  1402. repospanner_region=None,
  1403. blacklist=[],
  1404. allowed_prefix=[],
  1405. description="description for testproject",
  1406. parent_id=None,
  1407. )
  1408. # Now test that creation fails if ignore_existing_repo is False
  1409. self.session.commit()
  1410. repo = pagure.lib.query.get_authorized_project(
  1411. self.session, "testproject"
  1412. )
  1413. self.assertEqual(repo.path, "testproject.git")
  1414. gitrepo = os.path.join(gitfolder, repo.path)
  1415. docrepo = os.path.join(docfolder, repo.path)
  1416. ticketrepo = os.path.join(ticketfolder, repo.path)
  1417. requestrepo = os.path.join(requestfolder, repo.path)
  1418. self.assertTrue(os.path.exists(gitrepo))
  1419. self.assertTrue(os.path.exists(docrepo))
  1420. self.assertTrue(os.path.exists(ticketrepo))
  1421. self.assertTrue(os.path.exists(requestrepo))
  1422. # Try re-creating it but all repos are existing
  1423. self.assertRaises(
  1424. pagure.exceptions.PagureException,
  1425. pagure.lib.query.new_project,
  1426. session=self.session,
  1427. user="pingou",
  1428. name="testproject",
  1429. repospanner_region=None,
  1430. blacklist=[],
  1431. allowed_prefix=[],
  1432. description="description for testproject",
  1433. parent_id=None,
  1434. )
  1435. self.session.rollback()
  1436. self.assertTrue(os.path.exists(gitrepo))
  1437. self.assertTrue(os.path.exists(docrepo))
  1438. self.assertTrue(os.path.exists(ticketrepo))
  1439. self.assertTrue(os.path.exists(requestrepo))
  1440. # Try re-creating it ignoring the existing repos- but repo in the DB
  1441. self.assertRaises(
  1442. pagure.exceptions.PagureException,
  1443. pagure.lib.query.new_project,
  1444. session=self.session,
  1445. user="pingou",
  1446. name="testproject",
  1447. repospanner_region=None,
  1448. blacklist=[],
  1449. allowed_prefix=[],
  1450. description="description for testproject",
  1451. parent_id=None,
  1452. )
  1453. self.session.rollback()
  1454. # Re-create it, ignoring the existing repos on disk
  1455. repo = pagure.lib.query._get_project(self.session, "testproject")
  1456. self.session.delete(repo)
  1457. self.session.commit()
  1458. task = pagure.lib.query.new_project(
  1459. session=self.session,
  1460. user="pingou",
  1461. name="testproject",
  1462. repospanner_region=None,
  1463. blacklist=[],
  1464. allowed_prefix=[],
  1465. description="description for testproject",
  1466. parent_id=None,
  1467. ignore_existing_repo=True,
  1468. )
  1469. self.session.commit()
  1470. self.assertEqual(
  1471. task.get(),
  1472. {
  1473. "endpoint": "ui_ns.view_repo",
  1474. "repo": "testproject",
  1475. "namespace": None,
  1476. },
  1477. )
  1478. # Delete the repo from the DB so we can try again
  1479. repo = pagure.lib.query._get_project(self.session, "testproject")
  1480. self.session.delete(repo)
  1481. self.session.commit()
  1482. self.assertTrue(os.path.exists(gitrepo))
  1483. self.assertTrue(os.path.exists(docrepo))
  1484. self.assertTrue(os.path.exists(ticketrepo))
  1485. self.assertTrue(os.path.exists(requestrepo))
  1486. # Drop the main git repo and try again
  1487. shutil.rmtree(gitrepo)
  1488. task = pagure.lib.query.new_project(
  1489. session=self.session,
  1490. user="pingou",
  1491. name="testproject",
  1492. repospanner_region=None,
  1493. blacklist=[],
  1494. allowed_prefix=[],
  1495. description="description for testproject",
  1496. parent_id=None,
  1497. )
  1498. self.assertIn("already exists", str(task.get(propagate=False)))
  1499. self.assertFalse(os.path.exists(gitrepo))
  1500. self.assertTrue(os.path.exists(docrepo))
  1501. self.assertTrue(os.path.exists(ticketrepo))
  1502. self.assertTrue(os.path.exists(requestrepo))
  1503. # Drop the doc repo and try again
  1504. shutil.rmtree(docrepo)
  1505. with self.assertRaises(pagure.exceptions.RepoExistsException):
  1506. task = pagure.lib.query.new_project(
  1507. session=self.session,
  1508. user="pingou",
  1509. name="testproject",
  1510. repospanner_region=None,
  1511. blacklist=[],
  1512. allowed_prefix=[],
  1513. description="description for testproject",
  1514. parent_id=None,
  1515. )
  1516. task.get()
  1517. self.assertFalse(os.path.exists(gitrepo))
  1518. self.assertFalse(os.path.exists(docrepo))
  1519. self.assertTrue(os.path.exists(ticketrepo))
  1520. self.assertTrue(os.path.exists(requestrepo))
  1521. # Drop the request repo and try again
  1522. shutil.rmtree(ticketrepo)
  1523. with self.assertRaises(pagure.exceptions.RepoExistsException):
  1524. task = pagure.lib.query.new_project(
  1525. session=self.session,
  1526. user="pingou",
  1527. name="testproject",
  1528. repospanner_region=None,
  1529. blacklist=[],
  1530. allowed_prefix=[],
  1531. description="description for testproject",
  1532. parent_id=None,
  1533. )
  1534. task.get()
  1535. self.assertFalse(os.path.exists(gitrepo))
  1536. self.assertFalse(os.path.exists(docrepo))
  1537. self.assertFalse(os.path.exists(ticketrepo))
  1538. self.assertTrue(os.path.exists(requestrepo))
  1539. # Re-Try creating a 40 chars project this time allowing it
  1540. task = pagure.lib.query.new_project(
  1541. session=self.session,
  1542. user="pingou",
  1543. name="pingou/" + "s" * 40,
  1544. repospanner_region=None,
  1545. blacklist=["static"],
  1546. allowed_prefix=["pingou"],
  1547. description="description for 40 chars length project",
  1548. parent_id=None,
  1549. )
  1550. self.session.commit()
  1551. self.assertEqual(
  1552. task.get(),
  1553. {
  1554. "endpoint": "ui_ns.view_repo",
  1555. "repo": "pingou/ssssssssssssssssssssssssssssssssssssssss",
  1556. "namespace": None,
  1557. },
  1558. )
  1559. def test_new_project_user_ns(self):
  1560. """ Test the new_project of pagure.lib with user_ns on. """
  1561. gitfolder = os.path.join(self.path, "repos")
  1562. docfolder = os.path.join(gitfolder, "docs")
  1563. ticketfolder = os.path.join(gitfolder, "tickets")
  1564. requestfolder = os.path.join(gitfolder, "requests")
  1565. # Create a new project with user_ns as True
  1566. pagure.config.config["GIT_FOLDER"] = gitfolder
  1567. task = pagure.lib.query.new_project(
  1568. session=self.session,
  1569. user="pingou",
  1570. name="testproject",
  1571. repospanner_region=None,
  1572. blacklist=[],
  1573. allowed_prefix=[],
  1574. description="description for testproject",
  1575. parent_id=None,
  1576. user_ns=True,
  1577. )
  1578. self.session.commit()
  1579. self.assertEqual(
  1580. task.get(),
  1581. {
  1582. "endpoint": "ui_ns.view_repo",
  1583. "repo": "testproject",
  1584. "namespace": "pingou",
  1585. },
  1586. )
  1587. repo = pagure.lib.query._get_project(
  1588. self.session, "testproject", namespace="pingou"
  1589. )
  1590. self.assertEqual(repo.path, "pingou/testproject.git")
  1591. gitrepo = os.path.join(gitfolder, repo.path)
  1592. docrepo = os.path.join(docfolder, repo.path)
  1593. ticketrepo = os.path.join(ticketfolder, repo.path)
  1594. requestrepo = os.path.join(requestfolder, repo.path)
  1595. for path in [gitrepo, docrepo, ticketrepo, requestrepo]:
  1596. self.assertTrue(os.path.exists(path))
  1597. shutil.rmtree(path)
  1598. # Create a new project with a namespace and user_ns as True
  1599. pagure.config.config["GIT_FOLDER"] = gitfolder
  1600. task = pagure.lib.query.new_project(
  1601. session=self.session,
  1602. user="pingou",
  1603. name="testproject2",
  1604. namespace="testns",
  1605. repospanner_region=None,
  1606. blacklist=[],
  1607. allowed_prefix=["testns"],
  1608. description="description for testproject2",
  1609. parent_id=None,
  1610. user_ns=True,
  1611. )
  1612. self.session.commit()
  1613. self.assertEqual(
  1614. task.get(),
  1615. {
  1616. "endpoint": "ui_ns.view_repo",
  1617. "repo": "testproject2",
  1618. "namespace": "testns",
  1619. },
  1620. )
  1621. repo = pagure.lib.query._get_project(
  1622. self.session, "testproject2", namespace="testns"
  1623. )
  1624. self.assertEqual(repo.path, "testns/testproject2.git")
  1625. gitrepo = os.path.join(gitfolder, repo.path)
  1626. docrepo = os.path.join(docfolder, repo.path)
  1627. ticketrepo = os.path.join(ticketfolder, repo.path)
  1628. requestrepo = os.path.join(requestfolder, repo.path)
  1629. for path in [gitrepo, docrepo, ticketrepo, requestrepo]:
  1630. self.assertTrue(os.path.exists(path))
  1631. shutil.rmtree(path)
  1632. @patch("pagure.lib.notify.log")
  1633. def test_update_project_settings(self, mock_log):
  1634. """ Test the update_project_settings of pagure.lib.query. """
  1635. tests.create_projects(self.session)
  1636. # Before
  1637. repo = pagure.lib.query._get_project(self.session, "test2")
  1638. self.assertTrue(repo.settings["issue_tracker"])
  1639. self.assertFalse(repo.settings["project_documentation"])
  1640. msg = pagure.lib.query.update_project_settings(
  1641. session=self.session,
  1642. repo=repo,
  1643. settings={
  1644. "issue_tracker": True,
  1645. "project_documentation": False,
  1646. "pull_requests": True,
  1647. "Only_assignee_can_merge_pull-request": False,
  1648. "Minimum_score_to_merge_pull-request": -1,
  1649. "Web-hooks": None,
  1650. "Enforce_signed-off_commits_in_pull-request": False,
  1651. "always_merge": False,
  1652. "issues_default_to_private": False,
  1653. "fedmsg_notifications": True,
  1654. "stomp_notifications": True,
  1655. "pull_request_access_only": False,
  1656. "mqtt_notifications": True,
  1657. },
  1658. user="pingou",
  1659. )
  1660. self.assertEqual(msg, "No settings to change")
  1661. mock_log.assert_not_called()
  1662. # Invalid `Minimum_score_to_merge_pull-request`
  1663. self.assertRaises(
  1664. pagure.exceptions.PagureException,
  1665. pagure.lib.query.update_project_settings,
  1666. session=self.session,
  1667. repo=repo,
  1668. settings={
  1669. "issue_tracker": False,
  1670. "project_documentation": True,
  1671. "pull_requests": False,
  1672. "Only_assignee_can_merge_pull-request": None,
  1673. "Minimum_score_to_merge_pull-request": "foo",
  1674. "Web-hooks": "https://pagure.io/foobar",
  1675. "Enforce_signed-off_commits_in_pull-request": False,
  1676. "issues_default_to_private": False,
  1677. "fedmsg_notifications": True,
  1678. "stomp_notifications": True,
  1679. "pull_request_access_only": False,
  1680. },
  1681. user="pingou",
  1682. )
  1683. msg = pagure.lib.query.update_project_settings(
  1684. session=self.session,
  1685. repo=repo,
  1686. settings={
  1687. "issue_tracker": False,
  1688. "project_documentation": True,
  1689. "pull_requests": False,
  1690. "Only_assignee_can_merge_pull-request": None,
  1691. "Minimum_score_to_merge_pull-request": None,
  1692. "Web-hooks": "https://pagure.io/foobar",
  1693. "Enforce_signed-off_commits_in_pull-request": False,
  1694. "issues_default_to_private": False,
  1695. "fedmsg_notifications": True,
  1696. "stomp_notifications": True,
  1697. "pull_request_access_only": False,
  1698. "mqtt_notifications": True,
  1699. },
  1700. user="pingou",
  1701. )
  1702. self.assertEqual(msg, "Edited successfully settings of repo: test2")
  1703. self.assertEqual(mock_log.call_count, 1)
  1704. args = mock_log.call_args
  1705. self.assertEqual(len(args), 2)
  1706. self.assertEqual(args[0][0].fullname, "test2")
  1707. self.assertEqual(
  1708. sorted(args[1]["msg"]["fields"]),
  1709. sorted(
  1710. [
  1711. "Web-hooks",
  1712. "project_documentation",
  1713. "issue_tracker",
  1714. "pull_requests",
  1715. ]
  1716. ),
  1717. )
  1718. self.assertEqual(args[1]["topic"], "project.edit")
  1719. # After
  1720. repo = pagure.lib.query._get_project(self.session, "test2")
  1721. self.assertFalse(repo.settings["issue_tracker"])
  1722. self.assertTrue(repo.settings["project_documentation"])
  1723. self.assertFalse(repo.settings["pull_requests"])
  1724. def test_search_issues_milestones_invalid(self):
  1725. """ Test the search_issues of pagure.lib.query. """
  1726. self.test_edit_issue()
  1727. repo = pagure.lib.query._get_project(self.session, "test")
  1728. self.assertEqual(len(repo.issues), 2)
  1729. issues = pagure.lib.query.search_issues(
  1730. self.session, repo, milestones="foo"
  1731. )
  1732. self.assertEqual(len(issues), 0)
  1733. issues = pagure.lib.query.search_issues(
  1734. self.session, repo, milestones="foo", no_milestones=True
  1735. )
  1736. self.assertEqual(len(issues), 2)
  1737. def test_search_issues_custom_search(self):
  1738. """ Test the search_issues of pagure.lib.query. """
  1739. self.test_edit_issue()
  1740. repo = pagure.lib.query._get_project(self.session, "test")
  1741. self.assertEqual(len(repo.issues), 2)
  1742. issues = pagure.lib.query.search_issues(
  1743. self.session, repo, custom_search={"foo": "*"}
  1744. )
  1745. self.assertEqual(len(issues), 0)
  1746. def test_search_issues_offset(self):
  1747. """ Test the search_issues of pagure.lib.query. """
  1748. self.test_edit_issue()
  1749. repo = pagure.lib.query._get_project(self.session, "test")
  1750. issues = pagure.lib.query.search_issues(self.session, repo)
  1751. self.assertEqual(len(issues), 2)
  1752. self.assertEqual([i.id for i in issues], [2, 1])
  1753. issues = pagure.lib.query.search_issues(self.session, repo, offset=1)
  1754. self.assertEqual(len(issues), 1)
  1755. self.assertEqual([i.id for i in issues], [1])
  1756. def test_search_issues_tags(self):
  1757. """ Test the search_issues of pagure.lib.query. """
  1758. self.test_edit_issue()
  1759. repo = pagure.lib.query._get_project(self.session, "test")
  1760. self.assertEqual(len(repo.issues), 2)
  1761. # Add `tag1` to one issues and `tag2` only to the other one
  1762. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  1763. msg = pagure.lib.query.add_tag_obj(
  1764. session=self.session, obj=issue, tags="tag1", user="pingou"
  1765. )
  1766. self.session.commit()
  1767. self.assertEqual(msg, "Issue tagged with: tag1")
  1768. issue = pagure.lib.query.search_issues(self.session, repo, issueid=2)
  1769. msg = pagure.lib.query.add_tag_obj(
  1770. session=self.session, obj=issue, tags="tag2", user="pingou"
  1771. )
  1772. self.session.commit()
  1773. self.assertEqual(msg, "Issue tagged with: tag2")
  1774. # Search all issues tagged with `tag1`
  1775. issues = pagure.lib.query.search_issues(
  1776. self.session, repo, tags="tag1"
  1777. )
  1778. self.assertEqual(len(issues), 1)
  1779. self.assertEqual(issues[0].id, 1)
  1780. self.assertEqual(issues[0].project_id, 1)
  1781. self.assertEqual([tag.tag for tag in issues[0].tags], ["tag1"])
  1782. # Search all issues *not* tagged with `tag1`
  1783. issues = pagure.lib.query.search_issues(
  1784. self.session, repo, tags="!tag1"
  1785. )
  1786. self.assertEqual(len(issues), 1)
  1787. self.assertEqual(issues[0].id, 2)
  1788. self.assertEqual(issues[0].project_id, 1)
  1789. self.assertEqual([tag.tag for tag in issues[0].tags], ["tag2"])
  1790. # Search all issues *not* tagged with `tag1` but tagged with `tag2`
  1791. issues = pagure.lib.query.search_issues(
  1792. self.session, repo, tags=["!tag1", "tag2"]
  1793. )
  1794. self.assertEqual(len(issues), 1)
  1795. self.assertEqual(issues[0].id, 2)
  1796. self.assertEqual(issues[0].project_id, 1)
  1797. self.assertEqual([tag.tag for tag in issues[0].tags], ["tag2"])
  1798. def test_get_tags_of_project(self):
  1799. """ Test the get_tags_of_project of pagure.lib.query. """
  1800. self.test_add_tag_obj()
  1801. repo = pagure.lib.query._get_project(self.session, "test")
  1802. tags = pagure.lib.query.get_tags_of_project(self.session, repo)
  1803. self.assertEqual([tag.tag for tag in tags], ["tag1"])
  1804. tags = pagure.lib.query.get_tags_of_project(
  1805. self.session, repo, pattern="T*"
  1806. )
  1807. self.assertEqual([tag.tag for tag in tags], ["tag1"])
  1808. repo = pagure.lib.query._get_project(self.session, "test2")
  1809. tags = pagure.lib.query.get_tags_of_project(self.session, repo)
  1810. self.assertEqual([tag.tag for tag in tags], [])
  1811. def test_get_issue_statuses(self):
  1812. """ Test the get_issue_statuses of pagure.lib.query. """
  1813. statuses = pagure.lib.query.get_issue_statuses(self.session)
  1814. self.assertEqual(sorted(statuses), ["Closed", "Open"])
  1815. @patch.dict(
  1816. pagure.config.config, {"ALLOWED_EMAIL_DOMAINS": ["fp.o"]}, clear=True
  1817. )
  1818. def test_set_up_user(self):
  1819. """ Test the set_up_user of pagure.lib.query. """
  1820. items = pagure.lib.query.search_user(self.session)
  1821. self.assertEqual(2, len(items))
  1822. self.assertEqual(2, items[0].id)
  1823. self.assertEqual("foo", items[0].user)
  1824. self.assertEqual(1, items[1].id)
  1825. self.assertEqual("pingou", items[1].user)
  1826. pagure.lib.query.set_up_user(
  1827. session=self.session,
  1828. username="skvidal",
  1829. fullname="Seth",
  1830. default_email="skvidal@fp.o",
  1831. keydir=pagure.config.config.get("GITOLITE_KEYDIR", None),
  1832. ssh_key="ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDtgzSO9d1IrKdmyBFUvtAJPLgGOhp0lSySkWRSe+/+3KXYjSnsLnCJQlO5M7JfaXhtTHEow86rh4W9+FoJdzo5iocAwH5xPZ5ttHLy7VHgTzNMUeMgKpjy6bBOdPoGPPG4mo7QCMCRJdWBRDv4OSEMLU5jQAvC272YK2V8L918VQ== root@test",
  1833. )
  1834. self.session.commit()
  1835. items = pagure.lib.query.search_user(self.session)
  1836. self.assertEqual(3, len(items))
  1837. self.assertEqual(2, items[0].id)
  1838. self.assertEqual("foo", items[0].user)
  1839. self.assertEqual(1, items[1].id)
  1840. self.assertEqual("pingou", items[1].user)
  1841. self.assertEqual(3, items[2].id)
  1842. self.assertEqual("skvidal", items[2].user)
  1843. self.assertEqual("Seth", items[2].fullname)
  1844. self.assertEqual(
  1845. ["skvidal@fp.o"], [email.email for email in items[2].emails]
  1846. )
  1847. # Add the user a second time
  1848. pagure.lib.query.set_up_user(
  1849. session=self.session,
  1850. username="skvidal",
  1851. fullname="Seth V",
  1852. default_email="skvidal@fp.o",
  1853. keydir=pagure.config.config.get("GITOLITE_KEYDIR", None),
  1854. )
  1855. self.session.commit()
  1856. # Nothing changed
  1857. items = pagure.lib.query.search_user(self.session)
  1858. self.assertEqual(3, len(items))
  1859. self.assertEqual("skvidal", items[2].user)
  1860. self.assertEqual("Seth V", items[2].fullname)
  1861. self.assertEqual(
  1862. ["skvidal@fp.o"], [email.email for email in items[2].emails]
  1863. )
  1864. # Add the user a third time with a different email
  1865. pagure.lib.query.set_up_user(
  1866. session=self.session,
  1867. username="skvidal",
  1868. fullname="Seth",
  1869. default_email="svidal@fp.o",
  1870. keydir=pagure.config.config.get("GITOLITE_KEYDIR", None),
  1871. )
  1872. self.session.commit()
  1873. # Email added
  1874. items = pagure.lib.query.search_user(self.session)
  1875. self.assertEqual(3, len(items))
  1876. self.assertEqual("skvidal", items[2].user)
  1877. self.assertEqual(
  1878. sorted(["skvidal@fp.o", "svidal@fp.o"]),
  1879. sorted([email.email for email in items[2].emails]),
  1880. )
  1881. # add again with forbidden email domain
  1882. pagure.lib.query.set_up_user(
  1883. session=self.session,
  1884. username="skvidal",
  1885. fullname="Seth",
  1886. default_email="svidal@example.o",
  1887. keydir=pagure.config.config.get("GITOLITE_KEYDIR", None),
  1888. )
  1889. self.session.commit()
  1890. # Email should not be added
  1891. items = pagure.lib.query.search_user(self.session)
  1892. self.assertEqual(3, len(items))
  1893. self.assertEqual("skvidal", items[2].user)
  1894. self.assertEqual(
  1895. sorted(["skvidal@fp.o", "svidal@fp.o"]),
  1896. sorted([email.email for email in items[2].emails]),
  1897. )
  1898. @patch.dict(
  1899. pagure.config.config,
  1900. {"ALLOWED_EMAIL_DOMAINS": ["fp.o", "example.c"]},
  1901. clear=True,
  1902. )
  1903. def test_set_up_user_multiple_emaildomains(self):
  1904. """Test the set_up_user of pagure.lib when there are
  1905. multimple domains configured in ALLOWED_EMAIL_DOMAINS"""
  1906. items = pagure.lib.query.search_user(self.session)
  1907. self.assertEqual(2, len(items))
  1908. self.assertEqual(2, items[0].id)
  1909. self.assertEqual("foo", items[0].user)
  1910. self.assertEqual(1, items[1].id)
  1911. self.assertEqual("pingou", items[1].user)
  1912. pagure.lib.query.set_up_user(
  1913. session=self.session,
  1914. username="skvidal",
  1915. fullname="Seth",
  1916. default_email="skvidal@fp.o",
  1917. keydir=pagure.config.config.get("GITOLITE_KEYDIR", None),
  1918. )
  1919. self.session.commit()
  1920. # Add the user a second time with a different email
  1921. pagure.lib.query.set_up_user(
  1922. session=self.session,
  1923. username="skvidal",
  1924. fullname="Seth",
  1925. default_email="skvidal@example.c",
  1926. keydir=pagure.config.config.get("GITOLITE_KEYDIR", None),
  1927. )
  1928. self.session.commit()
  1929. # Email added
  1930. items = pagure.lib.query.search_user(self.session)
  1931. self.assertEqual(3, len(items))
  1932. self.assertEqual("skvidal", items[2].user)
  1933. self.assertEqual(
  1934. sorted(["skvidal@fp.o", "skvidal@example.c"]),
  1935. sorted([email.email for email in items[2].emails]),
  1936. )
  1937. # add again with forbidden email domain
  1938. pagure.lib.query.set_up_user(
  1939. session=self.session,
  1940. username="skvidal",
  1941. fullname="Seth",
  1942. default_email="svidal@example.o",
  1943. keydir=pagure.config.config.get("GITOLITE_KEYDIR", None),
  1944. )
  1945. self.session.commit()
  1946. # Email should not be added
  1947. items = pagure.lib.query.search_user(self.session)
  1948. self.assertEqual(3, len(items))
  1949. self.assertEqual("skvidal", items[2].user)
  1950. self.assertEqual(
  1951. sorted(["skvidal@fp.o", "skvidal@example.c"]),
  1952. sorted([email.email for email in items[2].emails]),
  1953. )
  1954. def avatar_url_from_email(self):
  1955. """ Test the avatar_url_from_openid of pagure.lib.query. """
  1956. output = pagure.lib.query.avatar_url_from_email(
  1957. "pingou@fedoraproject.org"
  1958. )
  1959. self.assertEqual(
  1960. output,
  1961. "https://seccdn.libravatar.org/avatar/"
  1962. "b3ee7bb4de70b6522c2478df3b4cd6322b5ec5d62ac7ceb1128e3d4ff42f6928"
  1963. "?s=64&d=retro",
  1964. )
  1965. output = pagure.lib.query.avatar_url_from_email("zoé@çëfò.org")
  1966. self.assertEqual(
  1967. output,
  1968. "https://seccdn.libravatar.org/avatar/"
  1969. "8fa6110d1f6a7a013969f012e1149ff89bf1252d4f15d25edee31d4662878656"
  1970. "?s=64&d=retro",
  1971. )
  1972. def test_fork_project_with_branch(self):
  1973. """ Test the fork_project of pagure.lib.query. """
  1974. gitfolder = os.path.join(self.path, "repos")
  1975. docfolder = os.path.join(gitfolder, "docs")
  1976. ticketfolder = os.path.join(gitfolder, "tickets")
  1977. requestfolder = os.path.join(gitfolder, "requests")
  1978. pagure.config.config["GIT_FOLDER"] = gitfolder
  1979. projects = pagure.lib.query.search_projects(self.session)
  1980. self.assertEqual(len(projects), 0)
  1981. # Create a new project
  1982. task = pagure.lib.query.new_project(
  1983. session=self.session,
  1984. user="pingou",
  1985. name="testproject",
  1986. repospanner_region=None,
  1987. blacklist=[],
  1988. allowed_prefix=[],
  1989. description="description for testproject",
  1990. parent_id=None,
  1991. )
  1992. self.session.commit()
  1993. self.assertEqual(
  1994. task.get(),
  1995. {
  1996. "endpoint": "ui_ns.view_repo",
  1997. "repo": "testproject",
  1998. "namespace": None,
  1999. },
  2000. )
  2001. projects = pagure.lib.query.search_projects(self.session)
  2002. self.assertEqual(len(projects), 1)
  2003. project = pagure.lib.query._get_project(self.session, "testproject")
  2004. gitrepo = os.path.join(gitfolder, project.path)
  2005. docrepo = os.path.join(docfolder, project.path)
  2006. ticketrepo = os.path.join(ticketfolder, project.path)
  2007. requestrepo = os.path.join(requestfolder, project.path)
  2008. # Add content to the main repo into three branches
  2009. tests.add_content_git_repo(gitrepo, "master")
  2010. tests.add_content_git_repo(gitrepo, "feature1")
  2011. tests.add_content_git_repo(gitrepo, "feature2")
  2012. # Check the branches of the main repo
  2013. self.assertEqual(
  2014. sorted(pagure.lib.git.get_git_branches(project)),
  2015. ["feature1", "feature2", "master"],
  2016. )
  2017. # Fork
  2018. task = pagure.lib.query.fork_project(
  2019. session=self.session, user="foo", repo=project
  2020. )
  2021. self.session.commit()
  2022. self.assertEqual(
  2023. task.get(),
  2024. {
  2025. "endpoint": "ui_ns.view_repo",
  2026. "repo": "testproject",
  2027. "namespace": None,
  2028. "username": "foo",
  2029. },
  2030. )
  2031. projects = pagure.lib.query.search_projects(self.session)
  2032. self.assertEqual(len(projects), 2)
  2033. project = pagure.lib.query._get_project(
  2034. self.session, "testproject", user="foo"
  2035. )
  2036. # Check the branches of the fork
  2037. self.assertEqual(
  2038. sorted(pagure.lib.git.get_git_branches(project)),
  2039. ["feature1", "feature2", "master"],
  2040. )
  2041. def test_fork_project_preserves_tags(self):
  2042. """ Test the fork_project of pagure.lib.query pushes tags to the fork. """
  2043. gitfolder = os.path.join(self.path, "repos")
  2044. docfolder = os.path.join(gitfolder, "docs")
  2045. ticketfolder = os.path.join(gitfolder, "tickets")
  2046. requestfolder = os.path.join(gitfolder, "requests")
  2047. pagure.config.config["GIT_FOLDER"] = gitfolder
  2048. projects = pagure.lib.query.search_projects(self.session)
  2049. self.assertEqual(len(projects), 0)
  2050. # Create a new project
  2051. task = pagure.lib.query.new_project(
  2052. session=self.session,
  2053. user="pingou",
  2054. name="testproject",
  2055. repospanner_region=None,
  2056. blacklist=[],
  2057. allowed_prefix=[],
  2058. description="description for testproject",
  2059. parent_id=None,
  2060. )
  2061. self.session.commit()
  2062. self.assertEqual(
  2063. task.get(),
  2064. {
  2065. "endpoint": "ui_ns.view_repo",
  2066. "repo": "testproject",
  2067. "namespace": None,
  2068. },
  2069. )
  2070. projects = pagure.lib.query.search_projects(self.session)
  2071. self.assertEqual(len(projects), 1)
  2072. project = pagure.lib.query._get_project(self.session, "testproject")
  2073. gitrepo = os.path.join(gitfolder, project.path)
  2074. docrepo = os.path.join(docfolder, project.path)
  2075. ticketrepo = os.path.join(ticketfolder, project.path)
  2076. requestrepo = os.path.join(requestfolder, project.path)
  2077. # Add content to the main repo into three branches
  2078. tests.add_content_git_repo(gitrepo, "master")
  2079. # Add a tag
  2080. tagged_commit = (
  2081. pygit2.Repository(gitrepo).revparse_single("master").hex
  2082. )
  2083. tag_sha = tests.add_tag_git_repo(
  2084. gitrepo, "1.2.3", tagged_commit, "release 1.2.3"
  2085. )
  2086. # Fork
  2087. task = pagure.lib.query.fork_project(
  2088. session=self.session, user="foo", repo=project
  2089. )
  2090. self.session.commit()
  2091. self.assertEqual(
  2092. task.get(),
  2093. {
  2094. "endpoint": "ui_ns.view_repo",
  2095. "repo": "testproject",
  2096. "namespace": None,
  2097. "username": "foo",
  2098. },
  2099. )
  2100. projects = pagure.lib.query.search_projects(self.session)
  2101. self.assertEqual(len(projects), 2)
  2102. project = pagure.lib.query._get_project(
  2103. self.session, "testproject", user="foo"
  2104. )
  2105. # Check the tag is there
  2106. fork_obj = pygit2.Repository(project.repopath("main"))
  2107. tag = fork_obj.get(tag_sha)
  2108. self.assertEqual(fork_obj[tag.target].hex, tagged_commit)
  2109. self.assertEqual(tag.message, "release 1.2.3")
  2110. def test_fork_project_namespaced(self):
  2111. """ Test the fork_project of pagure.lib on a namespaced project. """
  2112. gitfolder = os.path.join(self.path, "repos")
  2113. docfolder = os.path.join(gitfolder, "docs")
  2114. ticketfolder = os.path.join(gitfolder, "tickets")
  2115. requestfolder = os.path.join(gitfolder, "requests")
  2116. projects = pagure.lib.query.search_projects(self.session)
  2117. self.assertEqual(len(projects), 0)
  2118. # Create a new project
  2119. task = pagure.lib.query.new_project(
  2120. session=self.session,
  2121. user="pingou",
  2122. name="testproject",
  2123. namespace="foonamespace",
  2124. repospanner_region=None,
  2125. blacklist=[],
  2126. allowed_prefix=["foonamespace"],
  2127. description="description for testproject",
  2128. parent_id=None,
  2129. )
  2130. self.session.commit()
  2131. self.assertEqual(
  2132. task.get(),
  2133. {
  2134. "endpoint": "ui_ns.view_repo",
  2135. "repo": "testproject",
  2136. "namespace": "foonamespace",
  2137. },
  2138. )
  2139. projects = pagure.lib.query.search_projects(self.session)
  2140. self.assertEqual(len(projects), 1)
  2141. repo = pagure.lib.query._get_project(
  2142. self.session, "testproject", namespace="foonamespace"
  2143. )
  2144. gitrepo = os.path.join(gitfolder, repo.path)
  2145. docrepo = os.path.join(docfolder, repo.path)
  2146. ticketrepo = os.path.join(ticketfolder, repo.path)
  2147. requestrepo = os.path.join(requestfolder, repo.path)
  2148. self.assertTrue(os.path.exists(gitrepo))
  2149. self.assertTrue(os.path.exists(docrepo))
  2150. self.assertTrue(os.path.exists(ticketrepo))
  2151. self.assertTrue(os.path.exists(requestrepo))
  2152. # Fork worked
  2153. task = pagure.lib.query.fork_project(
  2154. session=self.session, user="foo", repo=repo
  2155. )
  2156. self.session.commit()
  2157. self.assertEqual(
  2158. task.get(),
  2159. {
  2160. "endpoint": "ui_ns.view_repo",
  2161. "repo": "testproject",
  2162. "namespace": "foonamespace",
  2163. "username": "foo",
  2164. },
  2165. )
  2166. # Fork a fork
  2167. repo = pagure.lib.query._get_project(
  2168. self.session, "testproject", user="foo", namespace="foonamespace"
  2169. )
  2170. task = pagure.lib.query.fork_project(
  2171. session=self.session, user="pingou", repo=repo
  2172. )
  2173. self.session.commit()
  2174. self.assertEqual(
  2175. task.get(),
  2176. {
  2177. "endpoint": "ui_ns.view_repo",
  2178. "repo": "testproject",
  2179. "namespace": "foonamespace",
  2180. "username": "pingou",
  2181. },
  2182. )
  2183. @patch("pagure.lib.notify.send_email")
  2184. def test_new_pull_request(self, mockemail):
  2185. """ test new_pull_request of pagure.lib.query. """
  2186. mockemail.return_value = True
  2187. tests.create_projects(self.session)
  2188. tests.create_projects_git(os.path.join(self.path, "repos"), bare=True)
  2189. # Create a forked repo
  2190. item = pagure.lib.model.Project(
  2191. user_id=1, # pingou
  2192. name="test",
  2193. description="test project #1",
  2194. is_fork=True,
  2195. parent_id=1,
  2196. hook_token="aaabbbrrr",
  2197. )
  2198. self.session.commit()
  2199. self.session.add(item)
  2200. # Add an extra user to project `foo`
  2201. repo = pagure.lib.query._get_project(self.session, "test")
  2202. self.assertEqual(repo.open_requests, 0)
  2203. msg = pagure.lib.query.add_user_to_project(
  2204. session=self.session, project=repo, new_user="foo", user="pingou"
  2205. )
  2206. self.session.commit()
  2207. self.assertEqual(msg, "User added")
  2208. repo = pagure.lib.query._get_project(self.session, "test")
  2209. forked_repo = pagure.lib.query._get_project(
  2210. self.session, "test", user="pingou"
  2211. )
  2212. # Fails for the lack of repo_from and remote_git
  2213. self.assertRaises(
  2214. pagure.exceptions.PagureException,
  2215. pagure.lib.query.new_pull_request,
  2216. session=self.session,
  2217. repo_from=None,
  2218. branch_from="master",
  2219. repo_to=repo,
  2220. branch_to="master",
  2221. title="test pull-request",
  2222. user="pingou",
  2223. )
  2224. # Let's pretend we turned on the CI hook for the project
  2225. project = pagure.lib.query._get_project(self.session, "test")
  2226. obj = pagure.hooks.pagure_ci.PagureCITable(
  2227. project_id=project.id, active=True
  2228. )
  2229. self.session.add(obj)
  2230. self.session.commit()
  2231. # Create the new PR
  2232. req = pagure.lib.query.new_pull_request(
  2233. session=self.session,
  2234. repo_from=forked_repo,
  2235. branch_from="master",
  2236. repo_to=repo,
  2237. branch_to="master",
  2238. title="test pull-request",
  2239. user="pingou",
  2240. )
  2241. self.session.commit()
  2242. self.assertEqual(req.id, 1)
  2243. self.assertEqual(req.title, "test pull-request")
  2244. self.assertEqual(repo.open_requests, 1)
  2245. @patch("pagure.lib.query.REDIS")
  2246. @patch("pagure.lib.notify.send_email", MagicMock(return_value=True))
  2247. def test_add_pull_request_comment(self, mock_redis):
  2248. """ Test add_pull_request_comment of pagure.lib.query. """
  2249. mock_redis.return_value = True
  2250. self.test_new_pull_request()
  2251. request = pagure.lib.query.search_pull_requests(
  2252. self.session, requestid=1
  2253. )
  2254. msg = pagure.lib.query.add_pull_request_comment(
  2255. session=self.session,
  2256. request=request,
  2257. commit="commithash",
  2258. tree_id=None,
  2259. filename="file",
  2260. row=None,
  2261. comment="This is awesome, I got to remember it!",
  2262. user="foo",
  2263. notification=True,
  2264. )
  2265. self.assertEqual(msg, "Comment added")
  2266. self.session.commit()
  2267. self.assertEqual(len(request.discussion), 0)
  2268. self.assertEqual(len(request.comments), 1)
  2269. self.assertEqual(request.score, 0)
  2270. self.assertEqual(mock_redis.publish.call_count, 1)
  2271. @patch("pagure.lib.query.REDIS")
  2272. @patch("pagure.lib.notify.send_email", MagicMock(return_value=True))
  2273. @patch("pagure.lib.query.PAGURE_CI", MagicMock(return_value=True))
  2274. def test_add_pull_request_comment_to_re_run_ci(self, mock_redis):
  2275. """ Test add_pull_request_comment of pagure.lib.query. """
  2276. mock_redis.return_value = True
  2277. self.test_new_pull_request()
  2278. self.assertEqual(mock_redis.publish.call_count, 0)
  2279. # Let's pretend we turned on the CI hook for the project
  2280. project = pagure.lib.query._get_project(self.session, "test")
  2281. if not project.ci_hook or not project.ci_hook.active:
  2282. obj = pagure.hooks.pagure_ci.PagureCITable(
  2283. project_id=project.id, active=True
  2284. )
  2285. self.session.add(obj)
  2286. self.session.commit()
  2287. request = pagure.lib.query.search_pull_requests(
  2288. self.session, requestid=1
  2289. )
  2290. msg = pagure.lib.query.add_pull_request_comment(
  2291. session=self.session,
  2292. request=request,
  2293. commit="commithash",
  2294. tree_id=None,
  2295. filename="file",
  2296. row=None,
  2297. comment="Pretty please pagure-ci rebuild",
  2298. user="foo",
  2299. notification=True,
  2300. trigger_ci=["pretty please pagure-ci rebuild"],
  2301. )
  2302. self.assertEqual(msg, "Comment added")
  2303. self.session.commit()
  2304. self.assertEqual(len(request.discussion), 0)
  2305. self.assertEqual(len(request.comments), 1)
  2306. self.assertEqual(request.score, 0)
  2307. self.assertEqual(mock_redis.publish.call_count, 1)
  2308. @patch("pagure.lib.notify.send_email")
  2309. def test_add_pull_request_flag(self, mockemail):
  2310. """ Test add_pull_request_flag of pagure.lib.query. """
  2311. mockemail.return_value = True
  2312. self.test_new_pull_request()
  2313. tests.create_tokens(self.session)
  2314. request = pagure.lib.query.search_pull_requests(
  2315. self.session, requestid=1
  2316. )
  2317. request.commit_stop = "hash_commit_stop"
  2318. self.session.add(request)
  2319. self.session.commit()
  2320. request = pagure.lib.query.search_pull_requests(
  2321. self.session, requestid=1
  2322. )
  2323. self.assertEqual(len(request.flags), 0)
  2324. msg = pagure.lib.query.add_pull_request_flag(
  2325. session=self.session,
  2326. request=request,
  2327. username="jenkins",
  2328. percent=100,
  2329. comment="Build passes",
  2330. status="success",
  2331. url="http://jenkins.cloud.fedoraproject.org",
  2332. uid="jenkins_build_pagure_34",
  2333. user="foo",
  2334. token="aaabbbcccddd",
  2335. )
  2336. self.assertEqual(msg, ("Flag added", "jenkins_build_pagure_34"))
  2337. self.session.commit()
  2338. # We're no longer adding a PR flag but instead a commit flag to the
  2339. # latest commit of the PR
  2340. self.assertEqual(len(request.flags), 0)
  2341. flags = pagure.lib.query.get_commit_flag(
  2342. self.session, request.project, "hash_commit_stop"
  2343. )
  2344. self.assertEqual(flags[0].comment, "Build passes")
  2345. self.assertEqual(flags[0].token_id, "aaabbbcccddd")
  2346. @patch("pagure.lib.notify.send_email", MagicMock(return_value=True))
  2347. def test_search_pull_requests(self):
  2348. """ Test search_pull_requests of pagure.lib.query. """
  2349. self.test_new_pull_request()
  2350. prs = pagure.lib.query.search_pull_requests(session=self.session)
  2351. self.assertEqual(len(prs), 1)
  2352. prs = pagure.lib.query.search_pull_requests(
  2353. session=self.session, project_id=1
  2354. )
  2355. self.assertEqual(len(prs), 1)
  2356. prs = pagure.lib.query.search_pull_requests(
  2357. session=self.session, project_id_from=4
  2358. )
  2359. self.assertEqual(len(prs), 1)
  2360. prs = pagure.lib.query.search_pull_requests(
  2361. session=self.session, status=False
  2362. )
  2363. self.assertEqual(len(prs), 0)
  2364. # All non-assigned PR
  2365. prs = pagure.lib.query.search_pull_requests(
  2366. session=self.session, assignee=False
  2367. )
  2368. self.assertEqual(len(prs), 1)
  2369. prs[0].assignee_id = 1
  2370. self.session.add(prs[0])
  2371. self.session.commit()
  2372. # All the PR assigned
  2373. prs = pagure.lib.query.search_pull_requests(
  2374. session=self.session, assignee=True
  2375. )
  2376. self.assertEqual(len(prs), 1)
  2377. # Basically the same as above but then for a specific user
  2378. prs = pagure.lib.query.search_pull_requests(
  2379. session=self.session, assignee="pingou"
  2380. )
  2381. self.assertEqual(len(prs), 1)
  2382. # All PR except those assigned to pingou
  2383. prs = pagure.lib.query.search_pull_requests(
  2384. session=self.session, assignee="!pingou"
  2385. )
  2386. self.assertEqual(len(prs), 0)
  2387. # All PR created by the specified author
  2388. prs = pagure.lib.query.search_pull_requests(
  2389. session=self.session, author="pingou"
  2390. )
  2391. self.assertEqual(len(prs), 1)
  2392. # Count the PR instead of listing them
  2393. prs = pagure.lib.query.search_pull_requests(
  2394. session=self.session, author="pingou", count=True
  2395. )
  2396. self.assertEqual(prs, 1)
  2397. dt = datetime.datetime.utcnow()
  2398. # Create the second PR
  2399. repo = pagure.lib.query._get_project(self.session, "test")
  2400. req = pagure.lib.query.new_pull_request(
  2401. session=self.session,
  2402. repo_from=repo,
  2403. branch_from="feature",
  2404. repo_to=repo,
  2405. branch_to="master",
  2406. title="test pull-request #2",
  2407. user="pingou",
  2408. )
  2409. self.session.commit()
  2410. self.assertEqual(req.id, 2)
  2411. self.assertEqual(req.title, "test pull-request #2")
  2412. self.assertEqual(repo.open_requests, 2)
  2413. # Ensure we have 2 PRs
  2414. prs = pagure.lib.query.search_pull_requests(
  2415. session=self.session, author="pingou"
  2416. )
  2417. self.assertEqual(len(prs), 2)
  2418. # Test the offset
  2419. prs = pagure.lib.query.search_pull_requests(
  2420. session=self.session, author="pingou", offset=1
  2421. )
  2422. self.assertEqual(len(prs), 1)
  2423. # Test the updated_after
  2424. # Test updated after before the second PR was created
  2425. prs = pagure.lib.query.search_pull_requests(
  2426. session=self.session, author="pingou", updated_after=dt
  2427. )
  2428. self.assertEqual(len(prs), 1)
  2429. # Test updated after, 1h ago
  2430. prs = pagure.lib.query.search_pull_requests(
  2431. session=self.session,
  2432. author="pingou",
  2433. updated_after=dt - datetime.timedelta(hours=1),
  2434. )
  2435. self.assertEqual(len(prs), 2)
  2436. @patch("pagure.lib.notify.send_email")
  2437. def test_close_pull_request(self, send_email):
  2438. """ Test close_pull_request of pagure.lib.query. """
  2439. send_email.return_value = True
  2440. self.test_new_pull_request()
  2441. repo = pagure.lib.query._get_project(self.session, "test")
  2442. self.assertEqual(repo.open_requests, 1)
  2443. request = pagure.lib.query.search_pull_requests(
  2444. self.session, requestid=1
  2445. )
  2446. pagure.lib.query.close_pull_request(
  2447. session=self.session, request=request, user="pingou", merged=True
  2448. )
  2449. self.session.commit()
  2450. repo = pagure.lib.query._get_project(self.session, "test")
  2451. self.assertEqual(repo.open_requests, 0)
  2452. prs = pagure.lib.query.search_pull_requests(
  2453. session=self.session, status=False
  2454. )
  2455. self.assertEqual(len(prs), 1)
  2456. # Does not change much, just the notification sent
  2457. pagure.lib.query.close_pull_request(
  2458. session=self.session, request=request, user="pingou", merged=False
  2459. )
  2460. self.session.commit()
  2461. prs = pagure.lib.query.search_pull_requests(
  2462. session=self.session, status=False
  2463. )
  2464. self.assertEqual(len(prs), 1)
  2465. @patch("pagure.lib.query.REDIS", MagicMock(return_value=True))
  2466. @patch("pagure.lib.git.update_git", MagicMock(return_value=True))
  2467. @patch("pagure.lib.notify.send_email", MagicMock(return_value=True))
  2468. def test_remove_issue_dependency(self):
  2469. """ Test remove_issue_dependency of pagure.lib.query. """
  2470. self.test_add_issue_dependency()
  2471. repo = pagure.lib.query._get_project(self.session, "test")
  2472. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  2473. issue_blocked = pagure.lib.query.search_issues(
  2474. self.session, repo, issueid=2
  2475. )
  2476. # Before
  2477. self.assertEqual(len(issue.children), 1)
  2478. self.assertEqual(issue.children[0].id, 2)
  2479. self.assertEqual(len(issue.parents), 0)
  2480. self.assertEqual(issue.parents, [])
  2481. self.assertEqual(len(issue_blocked.children), 0)
  2482. self.assertEqual(issue_blocked.children, [])
  2483. self.assertEqual(len(issue_blocked.parents), 1)
  2484. self.assertEqual(issue_blocked.parents[0].id, 1)
  2485. self.assertRaises(
  2486. pagure.exceptions.PagureException,
  2487. pagure.lib.query.remove_issue_dependency,
  2488. session=self.session,
  2489. issue=issue,
  2490. issue_blocked=issue,
  2491. user="pingou",
  2492. )
  2493. # Wrong order of issues
  2494. msg = pagure.lib.query.remove_issue_dependency(
  2495. session=self.session,
  2496. issue=issue,
  2497. issue_blocked=issue_blocked,
  2498. user="pingou",
  2499. )
  2500. self.session.commit()
  2501. self.assertEqual(msg, None)
  2502. # Drop deps
  2503. msg = pagure.lib.query.remove_issue_dependency(
  2504. session=self.session,
  2505. issue=issue_blocked,
  2506. issue_blocked=issue,
  2507. user="pingou",
  2508. )
  2509. self.session.commit()
  2510. self.assertEqual(msg, "Issue **un**marked as depending on: #1")
  2511. # After
  2512. self.assertEqual(issue.parents, [])
  2513. self.assertEqual(issue.children, [])
  2514. self.assertEqual(issue_blocked.parents, [])
  2515. self.assertEqual(issue_blocked.children, [])
  2516. @patch("pagure.lib.git.update_git")
  2517. @patch("pagure.lib.notify.send_email")
  2518. def test_get_issue_comment(self, p_send_email, p_ugt):
  2519. """ Test the get_issue_comment of pagure.lib.query. """
  2520. p_send_email.return_value = True
  2521. p_ugt.return_value = True
  2522. self.test_add_issue_comment()
  2523. repo = pagure.lib.query._get_project(self.session, "test")
  2524. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  2525. self.assertEqual(
  2526. pagure.lib.query.get_issue_comment(self.session, issue.uid, 10),
  2527. None,
  2528. )
  2529. comment = pagure.lib.query.get_issue_comment(
  2530. self.session, issue.uid, 1
  2531. )
  2532. self.assertEqual(comment.comment, "Hey look a comment!")
  2533. @patch("pagure.lib.git.update_git")
  2534. @patch("pagure.lib.notify.send_email")
  2535. def test_get_issue_by_uid(self, p_send_email, p_ugt):
  2536. """ Test the get_issue_by_uid of pagure.lib.query. """
  2537. p_send_email.return_value = True
  2538. p_ugt.return_value = True
  2539. self.test_new_issue()
  2540. repo = pagure.lib.query._get_project(self.session, "test")
  2541. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  2542. self.assertEqual(
  2543. pagure.lib.query.get_issue_by_uid(self.session, "foobar"), None
  2544. )
  2545. new_issue = pagure.lib.query.get_issue_by_uid(self.session, issue.uid)
  2546. self.assertEqual(issue, new_issue)
  2547. @patch("pagure.lib.git.update_git")
  2548. @patch("pagure.lib.notify.send_email")
  2549. def test_update_tags(self, p_send_email, p_ugt):
  2550. """ Test the update_tags of pagure.lib.query. """
  2551. p_send_email.return_value = True
  2552. p_ugt.return_value = True
  2553. self.test_new_issue()
  2554. repo = pagure.lib.query._get_project(self.session, "test")
  2555. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  2556. # before
  2557. self.assertEqual(repo.tags_colored, [])
  2558. self.assertEqual(issue.tags_text, [])
  2559. messages = pagure.lib.query.update_tags(
  2560. self.session, issue, "tag", "pingou"
  2561. )
  2562. self.assertEqual(messages, ["Issue tagged with: tag"])
  2563. # after
  2564. repo = pagure.lib.query.get_authorized_project(self.session, "test")
  2565. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  2566. self.assertEqual([t.tag for t in repo.tags_colored], ["tag"])
  2567. self.assertEqual(issue.tags_text, ["tag"])
  2568. # Replace the tag by two others
  2569. messages = pagure.lib.query.update_tags(
  2570. self.session, issue, ["tag2", "tag3"], "pingou"
  2571. )
  2572. self.assertEqual(
  2573. messages,
  2574. ["Issue tagged with: tag2, tag3", "Issue **un**tagged with: tag"],
  2575. )
  2576. # after
  2577. repo = pagure.lib.query.get_authorized_project(self.session, "test")
  2578. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  2579. self.assertEqual(
  2580. sorted([t.tag for t in repo.tags_colored]), ["tag", "tag2", "tag3"]
  2581. )
  2582. self.assertEqual(sorted(issue.tags_text), ["tag2", "tag3"])
  2583. @patch("pagure.lib.git.update_git")
  2584. @patch("pagure.lib.notify.send_email")
  2585. def test_update_dependency_issue(self, p_send_email, p_ugt):
  2586. """ Test the update_dependency_issue of pagure.lib.query. """
  2587. p_send_email.return_value = True
  2588. p_ugt.return_value = True
  2589. self.test_new_issue()
  2590. repo = pagure.lib.query._get_project(self.session, "test")
  2591. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  2592. self.assertEqual(repo.open_tickets, 2)
  2593. self.assertEqual(repo.open_tickets_public, 2)
  2594. # Create issues to play with
  2595. msg = pagure.lib.query.new_issue(
  2596. session=self.session,
  2597. repo=repo,
  2598. title="Test issue #3",
  2599. content="We should work on this (3rd time!)",
  2600. user="pingou",
  2601. private=True,
  2602. )
  2603. self.session.commit()
  2604. self.assertEqual(msg.title, "Test issue #3")
  2605. self.assertEqual(repo.open_tickets, 3)
  2606. self.assertEqual(repo.open_tickets_public, 2)
  2607. # before
  2608. self.assertEqual(issue.tags_text, [])
  2609. self.assertEqual(issue.depending_text, [])
  2610. self.assertEqual(issue.blocking_text, [])
  2611. messages = pagure.lib.query.update_dependency_issue(
  2612. self.session, repo, issue, "2", "pingou"
  2613. )
  2614. self.assertEqual(messages, ["Issue marked as depending on: #2"])
  2615. messages = pagure.lib.query.update_dependency_issue(
  2616. self.session, repo, issue, ["3", "4", 5], "pingou"
  2617. )
  2618. self.assertEqual(
  2619. messages,
  2620. [
  2621. "Issue marked as depending on: #3",
  2622. "Issue marked as depending on: #4",
  2623. "Issue marked as depending on: #5",
  2624. "Issue **un**marked as depending on: #2",
  2625. ],
  2626. )
  2627. # after
  2628. self.assertEqual(issue.tags_text, [])
  2629. self.assertEqual(issue.depending_text, [3])
  2630. self.assertEqual(issue.blocking_text, [])
  2631. @patch("pagure.lib.git.update_git")
  2632. @patch("pagure.lib.notify.send_email")
  2633. def test_update_blocked_issue(self, p_send_email, p_ugt):
  2634. """ Test the update_blocked_issue of pagure.lib.query. """
  2635. p_send_email.return_value = True
  2636. p_ugt.return_value = True
  2637. self.test_new_issue()
  2638. repo = pagure.lib.query._get_project(self.session, "test")
  2639. issue = pagure.lib.query.search_issues(self.session, repo, issueid=1)
  2640. # Create issues to play with
  2641. msg = pagure.lib.query.new_issue(
  2642. session=self.session,
  2643. repo=repo,
  2644. title="Test issue #3",
  2645. content="We should work on this (3rd time!)",
  2646. user="pingou",
  2647. private=True,
  2648. )
  2649. self.session.commit()
  2650. self.assertEqual(msg.title, "Test issue #3")
  2651. # before
  2652. self.assertEqual(issue.tags_text, [])
  2653. self.assertEqual(issue.depending_text, [])
  2654. self.assertEqual(issue.blocking_text, [])
  2655. messages = pagure.lib.query.update_blocked_issue(
  2656. self.session, repo, issue, "2", "pingou"
  2657. )
  2658. self.assertEqual(messages, ["Issue marked as blocking: #2"])
  2659. messages = pagure.lib.query.update_blocked_issue(
  2660. self.session, repo, issue, ["3", "4", 5], "pingou"
  2661. )
  2662. self.assertEqual(
  2663. messages,
  2664. [
  2665. "Issue marked as blocking: #3",
  2666. "Issue marked as blocking: #4",
  2667. "Issue marked as blocking: #5",
  2668. "Issue **un**marked as blocking: #2",
  2669. ],
  2670. )
  2671. # after
  2672. self.assertEqual(issue.tags_text, [])
  2673. self.assertEqual(issue.depending_text, [])
  2674. self.assertEqual(issue.blocking_text, [3])
  2675. @patch("pagure.lib.notify.send_email")
  2676. def test_add_pull_request_assignee(self, mockemail):
  2677. """ Test add_pull_request_assignee of pagure.lib.query. """
  2678. mockemail.return_value = True
  2679. self.test_new_pull_request()
  2680. request = pagure.lib.query.search_pull_requests(
  2681. self.session, requestid=1
  2682. )
  2683. self.assertRaises(
  2684. pagure.exceptions.PagureException,
  2685. pagure.lib.query.add_pull_request_assignee,
  2686. session=self.session,
  2687. request=request,
  2688. assignee="bar",
  2689. user="foo",
  2690. )
  2691. # Assign
  2692. msg = pagure.lib.query.add_pull_request_assignee(
  2693. session=self.session,
  2694. request=request,
  2695. assignee="pingou",
  2696. user="foo",
  2697. )
  2698. self.assertEqual(msg, "Request assigned")
  2699. # Reset
  2700. msg = pagure.lib.query.add_pull_request_assignee(
  2701. session=self.session, request=request, assignee=None, user="foo"
  2702. )
  2703. self.assertEqual(msg, "Request assignee reset")
  2704. # Try resetting again
  2705. msg = pagure.lib.query.add_pull_request_assignee(
  2706. session=self.session, request=request, assignee=None, user="foo"
  2707. )
  2708. self.assertEqual(msg, None)
  2709. def test_search_pending_email(self):
  2710. """ Test search_pending_email of pagure.lib.query. """
  2711. self.assertEqual(
  2712. pagure.lib.query.search_pending_email(self.session), None
  2713. )
  2714. user = pagure.lib.query.search_user(self.session, username="pingou")
  2715. email_pend = pagure.lib.model.UserEmailPending(
  2716. user_id=user.id, email="foo@fp.o", token="abcdef"
  2717. )
  2718. self.session.add(email_pend)
  2719. self.session.commit()
  2720. self.assertNotEqual(
  2721. pagure.lib.query.search_pending_email(self.session), None
  2722. )
  2723. self.assertNotEqual(
  2724. pagure.lib.query.search_pending_email(
  2725. self.session, token="abcdef"
  2726. ),
  2727. None,
  2728. )
  2729. pend = pagure.lib.query.search_pending_email(
  2730. self.session, token="abcdef"
  2731. )
  2732. self.assertEqual(pend.user.username, "pingou")
  2733. self.assertEqual(pend.email, "foo@fp.o")
  2734. self.assertEqual(pend.token, "abcdef")
  2735. pend = pagure.lib.query.search_pending_email(
  2736. self.session, email="foo@fp.o"
  2737. )
  2738. self.assertEqual(pend.user.username, "pingou")
  2739. self.assertEqual(pend.email, "foo@fp.o")
  2740. self.assertEqual(pend.token, "abcdef")
  2741. def test_generate_hook_token(self):
  2742. """ Test generate_hook_token of pagure.lib.query. """
  2743. tests.create_projects(self.session)
  2744. projects = pagure.lib.query.search_projects(self.session)
  2745. for proj in projects:
  2746. self.assertIn(
  2747. proj.hook_token, ["aaabbbccc", "aaabbbddd", "aaabbbeee"]
  2748. )
  2749. pagure.lib.query.generate_hook_token(self.session)
  2750. projects = pagure.lib.query.search_projects(self.session)
  2751. for proj in projects:
  2752. self.assertNotIn(
  2753. proj.hook_token, ["aaabbbccc", "aaabbbddd", "aaabbbeee"]
  2754. )
  2755. @patch("pagure.lib.notify.send_email")
  2756. def test_pull_request_score(self, mockemail):
  2757. """ Test PullRequest.score of pagure.lib.model. """
  2758. mockemail.return_value = True
  2759. self.test_new_pull_request()
  2760. request = pagure.lib.query.search_pull_requests(
  2761. self.session, requestid=1
  2762. )
  2763. msg = pagure.lib.query.add_pull_request_comment(
  2764. session=self.session,
  2765. request=request,
  2766. commit=None,
  2767. tree_id=None,
  2768. filename=None,
  2769. row=None,
  2770. comment="This looks great :thumbsup:",
  2771. user="foo",
  2772. )
  2773. self.session.commit()
  2774. self.assertEqual(msg, "Comment added")
  2775. msg = pagure.lib.query.add_pull_request_comment(
  2776. session=self.session,
  2777. request=request,
  2778. commit=None,
  2779. tree_id=None,
  2780. filename=None,
  2781. row=None,
  2782. comment="I disagree -1",
  2783. user="pingou",
  2784. )
  2785. self.session.commit()
  2786. self.assertEqual(msg, "Comment added")
  2787. msg = pagure.lib.query.add_pull_request_comment(
  2788. session=self.session,
  2789. request=request,
  2790. commit=None,
  2791. tree_id=None,
  2792. filename=None,
  2793. row=None,
  2794. comment="NM this looks great now +1000",
  2795. user="pingou",
  2796. )
  2797. self.session.commit()
  2798. self.assertEqual(msg, "Comment added")
  2799. self.assertEqual(len(request.discussion), 3)
  2800. self.assertEqual(request.score, 2)
  2801. def test_add_group(self):
  2802. """ Test the add_group method of pagure.lib.query. """
  2803. groups = pagure.lib.query.search_groups(self.session)
  2804. self.assertEqual(len(groups), 0)
  2805. self.assertEqual(groups, [])
  2806. # Invalid type
  2807. self.assertRaises(
  2808. pagure.exceptions.PagureException,
  2809. pagure.lib.query.add_group,
  2810. self.session,
  2811. group_name="foo",
  2812. display_name="foo group",
  2813. description=None,
  2814. group_type="bar",
  2815. user="pingou",
  2816. is_admin=True,
  2817. blacklist=[],
  2818. )
  2819. groups = pagure.lib.query.search_groups(self.session)
  2820. self.assertEqual(len(groups), 0)
  2821. self.assertEqual(groups, [])
  2822. # Invalid user
  2823. self.assertRaises(
  2824. pagure.exceptions.PagureException,
  2825. pagure.lib.query.add_group,
  2826. self.session,
  2827. group_name="foo",
  2828. display_name="foo group",
  2829. description=None,
  2830. group_type="user",
  2831. user="test",
  2832. is_admin=False,
  2833. blacklist=[],
  2834. )
  2835. groups = pagure.lib.query.search_groups(self.session)
  2836. self.assertEqual(len(groups), 0)
  2837. self.assertEqual(groups, [])
  2838. # Invalid group name
  2839. self.assertRaises(
  2840. pagure.exceptions.PagureException,
  2841. pagure.lib.query.add_group,
  2842. self.session,
  2843. group_name="foo group",
  2844. display_name="foo group",
  2845. description=None,
  2846. group_type="user",
  2847. user="test",
  2848. is_admin=False,
  2849. blacklist=[],
  2850. )
  2851. groups = pagure.lib.query.search_groups(self.session)
  2852. self.assertEqual(len(groups), 0)
  2853. self.assertEqual(groups, [])
  2854. msg = pagure.lib.query.add_group(
  2855. self.session,
  2856. group_name="foo",
  2857. display_name="foo group",
  2858. description=None,
  2859. group_type="bar",
  2860. user="pingou",
  2861. is_admin=False,
  2862. blacklist=[],
  2863. )
  2864. self.session.commit()
  2865. self.assertEqual(msg, "User `pingou` added to the group `foo`.")
  2866. groups = pagure.lib.query.search_groups(self.session)
  2867. self.assertEqual(len(groups), 1)
  2868. self.assertEqual(groups[0].group_name, "foo")
  2869. # Group with this name already exists
  2870. self.assertRaises(
  2871. pagure.exceptions.PagureException,
  2872. pagure.lib.query.add_group,
  2873. self.session,
  2874. group_name="foo",
  2875. display_name="foo group",
  2876. description=None,
  2877. group_type="bar",
  2878. user="pingou",
  2879. is_admin=False,
  2880. blacklist=[],
  2881. )
  2882. # Group with this display name already exists
  2883. self.assertRaises(
  2884. pagure.exceptions.PagureException,
  2885. pagure.lib.query.add_group,
  2886. self.session,
  2887. group_name="foo1",
  2888. display_name="foo group",
  2889. description=None,
  2890. group_type="bar",
  2891. user="pingou",
  2892. is_admin=False,
  2893. blacklist=[],
  2894. )
  2895. # Group with a blacklisted prefix
  2896. self.assertRaises(
  2897. pagure.exceptions.PagureException,
  2898. pagure.lib.query.add_group,
  2899. self.session,
  2900. group_name="forks",
  2901. display_name="foo group",
  2902. description=None,
  2903. group_type="bar",
  2904. user="pingou",
  2905. is_admin=False,
  2906. blacklist=["forks"],
  2907. )
  2908. def test_add_user_to_group(self):
  2909. """ Test the add_user_to_group method of pagure.lib.query. """
  2910. self.test_add_group()
  2911. group = pagure.lib.query.search_groups(self.session, group_name="foo")
  2912. self.assertNotEqual(group, None)
  2913. self.assertEqual(group.group_name, "foo")
  2914. # Invalid new user
  2915. self.assertRaises(
  2916. pagure.exceptions.PagureException,
  2917. pagure.lib.query.add_user_to_group,
  2918. self.session,
  2919. username="foobar",
  2920. group=group,
  2921. user="foo",
  2922. is_admin=False,
  2923. )
  2924. # Invalid user
  2925. self.assertRaises(
  2926. pagure.exceptions.PagureException,
  2927. pagure.lib.query.add_user_to_group,
  2928. self.session,
  2929. username="foo",
  2930. group=group,
  2931. user="foobar",
  2932. is_admin=False,
  2933. )
  2934. # User not allowed
  2935. self.assertRaises(
  2936. pagure.exceptions.PagureException,
  2937. pagure.lib.query.add_user_to_group,
  2938. self.session,
  2939. username="foo",
  2940. group=group,
  2941. user="foo",
  2942. is_admin=False,
  2943. )
  2944. msg = pagure.lib.query.add_user_to_group(
  2945. self.session,
  2946. username="foo",
  2947. group=group,
  2948. user="pingou",
  2949. is_admin=False,
  2950. )
  2951. self.session.commit()
  2952. self.assertEqual(msg, "User `foo` added to the group `foo`.")
  2953. msg = pagure.lib.query.add_user_to_group(
  2954. self.session,
  2955. username="foo",
  2956. group=group,
  2957. user="pingou",
  2958. is_admin=False,
  2959. )
  2960. self.session.commit()
  2961. self.assertEqual(
  2962. msg, "User `foo` already in the group, nothing to change."
  2963. )
  2964. def test_is_group_member(self):
  2965. """ Test the is_group_member method of pagure.lib.query. """
  2966. self.test_add_group()
  2967. self.assertFalse(
  2968. pagure.lib.query.is_group_member(self.session, None, "foo")
  2969. )
  2970. self.assertFalse(
  2971. pagure.lib.query.is_group_member(self.session, "bar", "foo")
  2972. )
  2973. self.assertFalse(
  2974. pagure.lib.query.is_group_member(self.session, "foo", "foo")
  2975. )
  2976. self.assertTrue(
  2977. pagure.lib.query.is_group_member(self.session, "pingou", "foo")
  2978. )
  2979. def test_get_user_group(self):
  2980. """ Test the get_user_group method of pagure.lib.query. """
  2981. self.test_add_group()
  2982. item = pagure.lib.query.get_user_group(self.session, 1, 1)
  2983. self.assertEqual(item.user_id, 1)
  2984. self.assertEqual(item.group_id, 1)
  2985. item = pagure.lib.query.get_user_group(self.session, 1, 2)
  2986. self.assertEqual(item, None)
  2987. item = pagure.lib.query.get_user_group(self.session, 2, 1)
  2988. self.assertEqual(item, None)
  2989. def test_get_group_types(self):
  2990. """ Test the get_group_types method of pagure.lib.query. """
  2991. self.test_add_group()
  2992. groups = pagure.lib.query.get_group_types(self.session, "user")
  2993. self.assertEqual(len(groups), 1)
  2994. self.assertEqual(groups[0].group_type, "user")
  2995. groups = pagure.lib.query.get_group_types(self.session)
  2996. self.assertEqual(len(groups), 2)
  2997. self.assertEqual(groups[0].group_type, "admin")
  2998. self.assertEqual(groups[1].group_type, "user")
  2999. def test_search_groups(self):
  3000. """ Test the search_groups method of pagure.lib.query. """
  3001. self.assertEqual(pagure.lib.query.search_groups(self.session), [])
  3002. msg = pagure.lib.query.add_group(
  3003. self.session,
  3004. group_name="foo",
  3005. display_name="foo group",
  3006. description=None,
  3007. group_type="bar",
  3008. user="pingou",
  3009. is_admin=False,
  3010. blacklist=[],
  3011. )
  3012. self.session.commit()
  3013. self.assertEqual(msg, "User `pingou` added to the group `foo`.")
  3014. groups = pagure.lib.query.search_groups(self.session)
  3015. self.assertEqual(len(groups), 1)
  3016. self.assertEqual(groups[0].group_name, "foo")
  3017. msg = pagure.lib.query.add_group(
  3018. self.session,
  3019. group_name="bar",
  3020. display_name="bar group",
  3021. description=None,
  3022. group_type="admin",
  3023. user="pingou",
  3024. is_admin=True,
  3025. blacklist=[],
  3026. )
  3027. self.session.commit()
  3028. self.assertEqual(msg, "User `pingou` added to the group `bar`.")
  3029. groups = pagure.lib.query.search_groups(self.session)
  3030. self.assertEqual(len(groups), 2)
  3031. self.assertEqual(groups[0].group_name, "bar")
  3032. self.assertEqual(groups[1].group_name, "foo")
  3033. groups = pagure.lib.query.search_groups(
  3034. self.session, group_type="user"
  3035. )
  3036. self.assertEqual(len(groups), 1)
  3037. self.assertEqual(groups[0].group_name, "foo")
  3038. groups = pagure.lib.query.search_groups(
  3039. self.session, group_type="admin"
  3040. )
  3041. self.assertEqual(len(groups), 1)
  3042. self.assertEqual(groups[0].group_name, "bar")
  3043. groups = pagure.lib.query.search_groups(self.session, group_name="foo")
  3044. self.assertEqual(groups.group_name, "foo")
  3045. def test_delete_user_of_group(self):
  3046. """ Test the delete_user_of_group method of pagure.lib.query. """
  3047. self.test_add_user_to_group()
  3048. groups = pagure.lib.query.search_groups(self.session)
  3049. self.assertEqual(len(groups), 1)
  3050. self.assertEqual(groups[0].group_name, "foo")
  3051. # Invalid username
  3052. self.assertRaises(
  3053. pagure.exceptions.PagureException,
  3054. pagure.lib.query.delete_user_of_group,
  3055. self.session,
  3056. username="bar",
  3057. groupname="foo",
  3058. user="pingou",
  3059. is_admin=False,
  3060. )
  3061. # Invalid groupname
  3062. self.assertRaises(
  3063. pagure.exceptions.PagureException,
  3064. pagure.lib.query.delete_user_of_group,
  3065. self.session,
  3066. username="foo",
  3067. groupname="bar",
  3068. user="pingou",
  3069. is_admin=False,
  3070. )
  3071. # Invalid user
  3072. self.assertRaises(
  3073. pagure.exceptions.PagureException,
  3074. pagure.lib.query.delete_user_of_group,
  3075. self.session,
  3076. username="foo",
  3077. groupname="foo",
  3078. user="test",
  3079. is_admin=False,
  3080. )
  3081. # User not in the group
  3082. item = pagure.lib.model.User(
  3083. user="bar",
  3084. fullname="bar",
  3085. password="foo",
  3086. default_email="bar@bar.com",
  3087. )
  3088. self.session.add(item)
  3089. item = pagure.lib.model.UserEmail(user_id=3, email="bar@bar.com")
  3090. self.session.add(item)
  3091. self.session.commit()
  3092. self.assertRaises(
  3093. pagure.exceptions.PagureException,
  3094. pagure.lib.query.delete_user_of_group,
  3095. self.session,
  3096. username="bar",
  3097. groupname="foo",
  3098. user="pingou",
  3099. is_admin=False,
  3100. )
  3101. # User is not allowed to remove the username
  3102. self.assertRaises(
  3103. pagure.exceptions.PagureException,
  3104. pagure.lib.query.delete_user_of_group,
  3105. self.session,
  3106. username="foo",
  3107. groupname="foo",
  3108. user="bar",
  3109. is_admin=False,
  3110. )
  3111. # Username is the creator of the group
  3112. self.assertRaises(
  3113. pagure.exceptions.PagureException,
  3114. pagure.lib.query.delete_user_of_group,
  3115. self.session,
  3116. username="pingou",
  3117. groupname="foo",
  3118. user="pingou",
  3119. is_admin=False,
  3120. )
  3121. # All good
  3122. group = pagure.lib.query.search_groups(self.session, group_name="foo")
  3123. self.assertEqual(len(group.users), 2)
  3124. pagure.lib.query.delete_user_of_group(
  3125. self.session,
  3126. username="foo",
  3127. groupname="foo",
  3128. user="pingou",
  3129. is_admin=False,
  3130. )
  3131. self.session.commit()
  3132. group = pagure.lib.query.search_groups(self.session, group_name="foo")
  3133. self.assertEqual(len(group.users), 1)
  3134. def test_edit_group_info(self):
  3135. """ Test the edit_group_info method of pagure.lib.query. """
  3136. self.test_add_group()
  3137. group = pagure.lib.query.search_groups(self.session, group_name="foo")
  3138. self.assertNotEqual(group, None)
  3139. self.assertEqual(group.group_name, "foo")
  3140. # Invalid new user
  3141. self.assertRaises(
  3142. pagure.exceptions.PagureException,
  3143. pagure.lib.query.edit_group_info,
  3144. self.session,
  3145. group=group,
  3146. display_name="edited name",
  3147. description=None,
  3148. user="foo",
  3149. is_admin=False,
  3150. )
  3151. # Invalid user
  3152. self.assertRaises(
  3153. pagure.exceptions.PagureException,
  3154. pagure.lib.query.edit_group_info,
  3155. self.session,
  3156. group=group,
  3157. display_name="edited name",
  3158. description=None,
  3159. user="foobar",
  3160. is_admin=False,
  3161. )
  3162. # User not allowed
  3163. self.assertRaises(
  3164. pagure.exceptions.PagureException,
  3165. pagure.lib.query.edit_group_info,
  3166. self.session,
  3167. group=group,
  3168. display_name="edited name",
  3169. description=None,
  3170. user="bar",
  3171. is_admin=False,
  3172. )
  3173. msg = pagure.lib.query.edit_group_info(
  3174. self.session,
  3175. group=group,
  3176. display_name="edited name",
  3177. description=None,
  3178. user="pingou",
  3179. is_admin=False,
  3180. )
  3181. self.session.commit()
  3182. self.assertEqual(msg, 'Group "edited name" (foo) edited')
  3183. msg = pagure.lib.query.edit_group_info(
  3184. self.session,
  3185. group=group,
  3186. display_name="edited name",
  3187. description=None,
  3188. user="pingou",
  3189. is_admin=False,
  3190. )
  3191. self.session.commit()
  3192. self.assertEqual(msg, "Nothing changed")
  3193. def test_add_group_to_project(self):
  3194. """ Test the add_group_to_project method of pagure.lib.query. """
  3195. tests.create_projects(self.session)
  3196. self.test_add_group()
  3197. project = pagure.lib.query._get_project(self.session, "test2")
  3198. # Group does not exist
  3199. self.assertRaises(
  3200. pagure.exceptions.PagureException,
  3201. pagure.lib.query.add_group_to_project,
  3202. session=self.session,
  3203. project=project,
  3204. new_group="bar",
  3205. user="foo",
  3206. )
  3207. # Group does not exist, but allow creating it
  3208. msg = pagure.lib.query.add_group_to_project(
  3209. session=self.session,
  3210. project=project,
  3211. new_group="bar",
  3212. user="pingou",
  3213. create=True,
  3214. )
  3215. self.session.commit()
  3216. self.assertEqual(msg, "Group added")
  3217. self.assertEqual(project.groups[0].group_name, "bar")
  3218. self.assertEqual(len(project.admin_groups), 1)
  3219. self.assertEqual(project.admin_groups[0].group_name, "bar")
  3220. # User does not exist
  3221. self.assertRaises(
  3222. pagure.exceptions.PagureException,
  3223. pagure.lib.query.add_group_to_project,
  3224. session=self.session,
  3225. project=project,
  3226. new_group="foo",
  3227. user="bar",
  3228. )
  3229. # User not allowed
  3230. self.assertRaises(
  3231. pagure.exceptions.PagureException,
  3232. pagure.lib.query.add_group_to_project,
  3233. session=self.session,
  3234. project=project,
  3235. new_group="foo",
  3236. user="foo",
  3237. )
  3238. # All good
  3239. msg = pagure.lib.query.add_group_to_project(
  3240. session=self.session,
  3241. project=project,
  3242. new_group="foo",
  3243. user="pingou",
  3244. )
  3245. self.session.commit()
  3246. self.assertEqual(msg, "Group added")
  3247. self.assertEqual(project.groups[0].group_name, "bar")
  3248. self.assertEqual(project.groups[1].group_name, "foo")
  3249. self.assertEqual(len(project.admin_groups), 2)
  3250. self.assertEqual(project.admin_groups[0].group_name, "bar")
  3251. self.assertEqual(project.admin_groups[1].group_name, "foo")
  3252. self.assertEqual(len(project.committer_groups), 2)
  3253. # Group already associated with the project
  3254. self.assertRaises(
  3255. pagure.exceptions.PagureException,
  3256. pagure.lib.query.add_group_to_project,
  3257. session=self.session,
  3258. project=project,
  3259. new_group="foo",
  3260. user="pingou",
  3261. )
  3262. # Update the access of group in the project
  3263. msg = pagure.lib.query.add_group_to_project(
  3264. session=self.session,
  3265. project=project,
  3266. new_group="foo",
  3267. user="pingou",
  3268. access="commit",
  3269. )
  3270. self.session.commit()
  3271. self.assertEqual(msg, "Group access updated")
  3272. self.assertEqual(project.groups[0].group_name, "bar")
  3273. self.assertEqual(project.groups[1].group_name, "foo")
  3274. self.assertEqual(len(project.admin_groups), 1)
  3275. self.assertEqual(project.admin_groups[0].group_name, "bar")
  3276. self.assertEqual(len(project.committer_groups), 2)
  3277. # Update the access of group in the project
  3278. msg = pagure.lib.query.add_group_to_project(
  3279. session=self.session,
  3280. project=project,
  3281. new_group="foo",
  3282. user="pingou",
  3283. access="ticket",
  3284. )
  3285. self.session.commit()
  3286. self.assertEqual(msg, "Group access updated")
  3287. self.assertEqual(project.groups[0].group_name, "bar")
  3288. self.assertEqual(project.groups[1].group_name, "foo")
  3289. self.assertEqual(len(project.admin_groups), 1)
  3290. self.assertEqual(project.admin_groups[0].group_name, "bar")
  3291. self.assertEqual(len(project.committer_groups), 1)
  3292. self.assertEqual(project.committer_groups[0].group_name, "bar")
  3293. def test_update_watch_status(self):
  3294. """ Test the update_watch_status method of pagure.lib.query. """
  3295. tests.create_projects(self.session)
  3296. project = pagure.lib.query._get_project(self.session, "test")
  3297. # User does not exist
  3298. self.assertRaises(
  3299. pagure.exceptions.PagureException,
  3300. pagure.lib.query.update_watch_status,
  3301. session=self.session,
  3302. project=project,
  3303. user="aavrug",
  3304. watch="1",
  3305. )
  3306. # Invalid watch status
  3307. self.assertRaises(
  3308. pagure.exceptions.PagureException,
  3309. pagure.lib.query.update_watch_status,
  3310. session=self.session,
  3311. project=project,
  3312. user="pingou",
  3313. watch="me fail",
  3314. )
  3315. # All good and when user selected reset watch option.
  3316. msg = pagure.lib.query.update_watch_status(
  3317. session=self.session, project=project, user="pingou", watch="-1"
  3318. )
  3319. self.session.commit()
  3320. self.assertEqual(msg, "Watch status is already reset")
  3321. # All good and when user selected watch issues option.
  3322. msg = pagure.lib.query.update_watch_status(
  3323. session=self.session, project=project, user="pingou", watch="1"
  3324. )
  3325. self.session.commit()
  3326. self.assertEqual(
  3327. msg, "You are now watching issues and PRs on this project"
  3328. )
  3329. # All good and when user selected unwatch option.
  3330. msg = pagure.lib.query.update_watch_status(
  3331. session=self.session, project=project, user="pingou", watch="0"
  3332. )
  3333. self.session.commit()
  3334. self.assertEqual(msg, "You are no longer watching this project")
  3335. # All good and when user seleted reset watch option.
  3336. msg = pagure.lib.query.update_watch_status(
  3337. session=self.session, project=project, user="pingou", watch="-1"
  3338. )
  3339. self.session.commit()
  3340. self.assertEqual(msg, "Watch status reset")
  3341. def test_get_watch_level_on_repo_invalid(self):
  3342. """ test the get_watch_level_on_repo method of pagure.lib.query. """
  3343. self.assertRaises(
  3344. RuntimeError,
  3345. pagure.lib.query.get_watch_level_on_repo,
  3346. session=self.session,
  3347. user="pingou",
  3348. repo=None,
  3349. repouser=None,
  3350. namespace=None,
  3351. )
  3352. def test_get_watch_level_on_repo(self):
  3353. """ Test the get_watch_level_on_repo method of pagure.lib.query. """
  3354. tests.create_projects(self.session)
  3355. self.test_add_group()
  3356. project = pagure.lib.query._get_project(self.session, "test")
  3357. project2 = pagure.lib.query._get_project(self.session, "test2")
  3358. # If user not logged in
  3359. watch_level = pagure.lib.query.get_watch_level_on_repo(
  3360. session=self.session, user=None, repo="test"
  3361. )
  3362. self.assertEqual(watch_level, [])
  3363. # User does not exist
  3364. user = tests.FakeUser()
  3365. user.username = "aavrug"
  3366. watch_level = pagure.lib.query.get_watch_level_on_repo(
  3367. session=self.session, user=user, repo="test"
  3368. )
  3369. self.assertEqual(watch_level, [])
  3370. # Invalid project
  3371. watch = pagure.lib.query.get_watch_level_on_repo(
  3372. session=self.session, user=user, repo="invalid"
  3373. )
  3374. self.assertFalse(watch)
  3375. pagure.lib.query.add_group_to_project(
  3376. session=self.session,
  3377. project=project,
  3378. new_group="foo",
  3379. user="pingou",
  3380. )
  3381. self.session.commit()
  3382. group = pagure.lib.query.search_groups(self.session, group_name="foo")
  3383. pagure.lib.query.add_user_to_group(
  3384. self.session,
  3385. username="foo",
  3386. group=group,
  3387. user="pingou",
  3388. is_admin=False,
  3389. )
  3390. self.session.commit()
  3391. group = pagure.lib.query.search_groups(self.session, group_name="foo")
  3392. # If user belongs to any group of that project
  3393. user.username = "foo"
  3394. msg = watch_level = pagure.lib.query.get_watch_level_on_repo(
  3395. session=self.session, user=user, repo="test"
  3396. )
  3397. self.assertEqual(watch_level, ["issues"])
  3398. # If user is the creator
  3399. user.username = "pingou"
  3400. watch_level = pagure.lib.query.get_watch_level_on_repo(
  3401. session=self.session, user=user, repo="test"
  3402. )
  3403. self.assertEqual(watch_level, ["issues"])
  3404. # Entry into watchers table for issues and commits
  3405. msg = pagure.lib.query.update_watch_status(
  3406. session=self.session, project=project, user="pingou", watch="3"
  3407. )
  3408. self.session.commit()
  3409. self.assertEqual(
  3410. msg,
  3411. "You are now watching issues, PRs, and commits on this project",
  3412. )
  3413. # From watchers table
  3414. watch_level = pagure.lib.query.get_watch_level_on_repo(
  3415. session=self.session, user=user, repo="test"
  3416. )
  3417. self.assertEqual(["issues", "commits"], watch_level)
  3418. # Make sure that when a user watches more than one repo explicitly
  3419. # they get the correct watch status
  3420. msg = pagure.lib.query.update_watch_status(
  3421. session=self.session, project=project2, user="pingou", watch="1"
  3422. )
  3423. self.session.commit()
  3424. self.assertEqual(
  3425. msg, "You are now watching issues and PRs on this project"
  3426. )
  3427. # From watchers table
  3428. watch_level = pagure.lib.query.get_watch_level_on_repo(
  3429. session=self.session, user=user, repo="test2"
  3430. )
  3431. self.assertEqual(["issues"], watch_level)
  3432. # Entry into watchers table for just commits
  3433. msg = pagure.lib.query.update_watch_status(
  3434. session=self.session, project=project, user="pingou", watch="2"
  3435. )
  3436. self.session.commit()
  3437. self.assertEqual(msg, "You are now watching commits on this project")
  3438. # From watchers table
  3439. watch_level = pagure.lib.query.get_watch_level_on_repo(
  3440. session=self.session, user=user, repo="test"
  3441. )
  3442. self.assertEqual(["commits"], watch_level)
  3443. # Entry into watchers table for issues
  3444. msg = pagure.lib.query.update_watch_status(
  3445. session=self.session, project=project, user="pingou", watch="1"
  3446. )
  3447. self.session.commit()
  3448. self.assertEqual(
  3449. msg, "You are now watching issues and PRs on this project"
  3450. )
  3451. # From watchers table
  3452. watch_level = pagure.lib.query.get_watch_level_on_repo(
  3453. session=self.session, user=user, repo="test"
  3454. )
  3455. self.assertEqual(["issues"], watch_level)
  3456. # Entry into watchers table for no watching
  3457. msg = pagure.lib.query.update_watch_status(
  3458. session=self.session, project=project, user="pingou", watch="0"
  3459. )
  3460. self.session.commit()
  3461. self.assertEqual(msg, "You are no longer watching this project")
  3462. # From watchers table
  3463. watch_level = pagure.lib.query.get_watch_level_on_repo(
  3464. session=self.session, user=user, repo="test"
  3465. )
  3466. self.assertEqual(watch_level, [])
  3467. # Add a contributor to the project
  3468. item = pagure.lib.model.User(
  3469. user="bar",
  3470. fullname="bar foo",
  3471. password="foo",
  3472. default_email="bar@bar.com",
  3473. )
  3474. self.session.add(item)
  3475. item = pagure.lib.model.UserEmail(user_id=3, email="bar@bar.com")
  3476. self.session.add(item)
  3477. msg = pagure.lib.query.add_user_to_project(
  3478. session=self.session,
  3479. project=project,
  3480. new_user="bar",
  3481. user="pingou",
  3482. )
  3483. self.session.commit()
  3484. self.assertEqual(msg, "User added")
  3485. # Check if the new contributor is watching
  3486. user.username = "bar"
  3487. watch_level = pagure.lib.query.get_watch_level_on_repo(
  3488. session=self.session, user=user, repo="test"
  3489. )
  3490. self.assertEqual(watch_level, ["issues"])
  3491. # wrong project
  3492. user.username = "bar"
  3493. watch_level = pagure.lib.query.get_watch_level_on_repo(
  3494. session=self.session,
  3495. user=user,
  3496. repo="test",
  3497. namespace="somenamespace",
  3498. )
  3499. self.assertEqual(watch_level, [])
  3500. def test_user_watch_list(self):
  3501. """ test user watch list method of pagure.lib """
  3502. tests.create_projects(self.session)
  3503. # He should be watching
  3504. user = tests.FakeUser()
  3505. user.username = "pingou"
  3506. watch_list_objs = pagure.lib.query.user_watch_list(
  3507. session=self.session, user="pingou"
  3508. )
  3509. watch_list = [obj.name for obj in watch_list_objs]
  3510. self.assertEqual(watch_list, ["test", "test2", "test3"])
  3511. # Make pingou unwatch the test3 project
  3512. project = pagure.lib.query._get_project(
  3513. self.session, "test3", namespace="somenamespace"
  3514. )
  3515. msg = pagure.lib.query.update_watch_status(
  3516. session=self.session, project=project, user="pingou", watch="0"
  3517. )
  3518. self.session.commit()
  3519. self.assertEqual(msg, "You are no longer watching this project")
  3520. # Re-check the watch list
  3521. watch_list_objs = pagure.lib.query.user_watch_list(
  3522. session=self.session, user="pingou"
  3523. )
  3524. watch_list = [obj.name for obj in watch_list_objs]
  3525. self.assertEqual(watch_list, ["test", "test2"])
  3526. # He isn't in the db, thus not watching anything
  3527. user.username = "vivek"
  3528. watch_list_objs = pagure.lib.query.user_watch_list(
  3529. session=self.session, user="vivek"
  3530. )
  3531. watch_list = [obj.name for obj in watch_list_objs]
  3532. self.assertEqual(watch_list, [])
  3533. # He shouldn't be watching anything
  3534. user.username = "foo"
  3535. watch_list_objs = pagure.lib.query.user_watch_list(
  3536. session=self.session, user="foo"
  3537. )
  3538. watch_list = [obj.name for obj in watch_list_objs]
  3539. self.assertEqual(watch_list, [])
  3540. @patch("pagure.lib.tasks.update_git", MagicMock(return_value=True))
  3541. @patch("pagure.lib.tasks.sync_pull_ref", MagicMock(return_value=True))
  3542. @patch("pagure.lib.notify.send_email", MagicMock(return_value=True))
  3543. @patch("pagure.pfmarkdown._commit_exists", MagicMock(return_value=True))
  3544. def test_text2markdown(self):
  3545. """ Test the text2markdown method in pagure.lib.query. """
  3546. pagure.config.config["TESTING"] = True
  3547. pagure.config.config["SERVER_NAME"] = "localhost.localdomain"
  3548. # This creates:
  3549. # project: test
  3550. # fork: pingou/test
  3551. # PR#1 to project test
  3552. self.test_new_pull_request()
  3553. # create an issue (will be #2) in 'test' project
  3554. repo = pagure.lib.query._get_project(self.session, "test")
  3555. iss = pagure.lib.query.new_issue(
  3556. issue_id=2,
  3557. session=self.session,
  3558. repo=repo,
  3559. title="test issue",
  3560. content="content test issue",
  3561. user="pingou",
  3562. )
  3563. self.session.commit()
  3564. self.assertEqual(iss.id, 2)
  3565. self.assertEqual(iss.title, "test issue")
  3566. # create PR#2 to project pingou/test
  3567. forked_repo = pagure.lib.query._get_project(
  3568. self.session, "test", user="pingou"
  3569. )
  3570. req = pagure.lib.query.new_pull_request(
  3571. requestid=2,
  3572. session=self.session,
  3573. repo_from=forked_repo,
  3574. branch_from="master",
  3575. repo_to=forked_repo,
  3576. branch_to="master",
  3577. title="test pull-request in fork",
  3578. user="pingou",
  3579. )
  3580. self.session.commit()
  3581. self.assertEqual(req.id, 2)
  3582. self.assertEqual(req.title, "test pull-request in fork")
  3583. # Create the project ns/test
  3584. item = pagure.lib.model.Project(
  3585. user_id=1, # pingou
  3586. name="test3",
  3587. namespace="ns",
  3588. description="test project #1",
  3589. hook_token="aaabbbcccdd",
  3590. )
  3591. item.close_status = [
  3592. "Invalid",
  3593. "Insufficient data",
  3594. "Fixed",
  3595. "Duplicate",
  3596. ]
  3597. self.session.add(item)
  3598. self.session.commit()
  3599. iss = pagure.lib.query.new_issue(
  3600. issue_id=4,
  3601. session=self.session,
  3602. repo=item,
  3603. title="test issue",
  3604. content="content test issue",
  3605. user="pingou",
  3606. )
  3607. self.session.commit()
  3608. self.assertEqual(iss.id, 4)
  3609. self.assertEqual(iss.title, "test issue")
  3610. # Fork ns/test to pingou
  3611. item = pagure.lib.model.Project(
  3612. user_id=1, # pingou
  3613. name="test",
  3614. namespace="ns",
  3615. description="Forked namespaced test project #1",
  3616. is_fork=True,
  3617. parent_id=item.id,
  3618. hook_token="aaabbbrrrbb",
  3619. )
  3620. self.session.add(item)
  3621. self.session.commit()
  3622. iss = pagure.lib.query.new_issue(
  3623. issue_id=7,
  3624. session=self.session,
  3625. repo=item,
  3626. title="test issue #7",
  3627. content="content test issue #7 in forked repo",
  3628. user="pingou",
  3629. )
  3630. self.session.commit()
  3631. self.assertEqual(iss.id, 7)
  3632. self.assertEqual(iss.title, "test issue #7")
  3633. iss = pagure.lib.query.new_issue(
  3634. issue_id=8,
  3635. session=self.session,
  3636. repo=item,
  3637. title="private issue #8",
  3638. content="Private content test issue #8 in forked repo",
  3639. user="pingou",
  3640. private=True,
  3641. )
  3642. self.session.commit()
  3643. self.assertEqual(iss.id, 8)
  3644. self.assertEqual(iss.title, "private issue #8")
  3645. # newer bleach allow to customize the protocol supported
  3646. import bleach
  3647. bleach_v = bleach.__version__.split(".")
  3648. for idx, val in enumerate(bleach_v):
  3649. try:
  3650. val = int(val)
  3651. except ValueError:
  3652. pass
  3653. bleach_v[idx] = val
  3654. # old markdown generate other html
  3655. import markdown
  3656. try:
  3657. markdown_v = markdown.__version__.version_info
  3658. except AttributeError: # pragma: no cover
  3659. markdown_v = markdown.__version_info__
  3660. # python-markdown >= 3.2.0 returns to the old behavior on img tag trailing slash
  3661. old_markdown = markdown_v < (2, 6, 0) or markdown_v >= (3, 2, 0)
  3662. mk_321 = markdown_v >= (3, 2, 0)
  3663. texts = [
  3664. "foo bar test#1 see?",
  3665. "foo bar pingou/test#2 I mean, really",
  3666. "foo bar fork/pingou/test#2 bouza!",
  3667. "foo bar forks/pingou/test#2 bouza!",
  3668. "foo bar ns/test3#4 bouza!",
  3669. "foo bar fork/user/ns/test#5 bouza!",
  3670. "foo bar fork/pingou/ns/test#7 bouza!",
  3671. "test#1 bazinga!",
  3672. "pingou opened the PR forks/pingou/test#2",
  3673. "fork/pingou/ns/test#8 is private",
  3674. "implicit link to #1",
  3675. "implicit link .#1. with non-whitespace, non-word characters",
  3676. "#2 - implicit link at start of line",
  3677. "#2. implicit link at start of line with no whitespace after",
  3678. "#regular header",
  3679. "#34 looks like an implicit link, but no issue 34",
  3680. "pingou committed on test#9364354a4555ba17aa60f0dc844d70b74eb1aecd",
  3681. "irc://pagure.io",
  3682. "ircs://pagure.io",
  3683. "http://pagure.io",
  3684. "https://pagure.io",
  3685. "<https://pagure.io/pagure>",
  3686. "~~foo~~",
  3687. "~~foo bar~~",
  3688. "~~[BZ#1435310](https://bugzilla.redhat.com/1435310)~~",
  3689. "~~[BZ#1435310](https://bugzilla.redhat.com/1435310) avc denial "
  3690. "during F26AH boot 'error_name=org.freedesktop.systemd1."
  3691. "NoSuchDynamicUser'~~",
  3692. "``~~foo bar~~``",
  3693. "~~foo bar~~ and ~~another ~~",
  3694. "lets mention @pingou",
  3695. "@pingou at start of line",
  3696. "but not someone@pingou.com",
  3697. "[![Fedora_infinity_small.png]"
  3698. "(/test/issue/raw/Fedora_infinity_small.png)]"
  3699. "(/test/issue/raw/Fedora_infinity_small.png)",
  3700. ]
  3701. expected = [
  3702. # 'foo bar test#1 see?',
  3703. '<div class="markdown"><p>foo bar <a href="/test/pull-request/1"'
  3704. ' title="[Open] test pull-request">test#1</a> see?</p></div>',
  3705. # 'foo bar pingou/test#2 I mean, really', -- unknown namespace
  3706. '<div class="markdown"><p>foo bar pingou/test#2 I mean, really</p></div>',
  3707. # 'foo bar fork/pingou/test#2 bouza!',
  3708. '<div class="markdown"><p>foo bar <a href="/fork/'
  3709. 'pingou/test/pull-request/2" title="[Open] test pull-request in fork">'
  3710. "pingou/test#2</a> bouza!</p></div>",
  3711. # 'foo bar forks/pingou/test#2 bouza!', -- the 's' doesn't matter
  3712. '<div class="markdown"><p>foo bar <a href="/fork/'
  3713. 'pingou/test/pull-request/2" title="[Open] test pull-request in fork">'
  3714. "pingou/test#2</a> bouza!</p></div>",
  3715. # 'foo bar ns/test3#4 bouza!',
  3716. '<div class="markdown"><p>foo bar <a href="/ns/test3/issue/4"'
  3717. ' title="[Open] test issue">ns/test3#4</a> bouza!</p></div>',
  3718. # 'foo bar fork/user/ns/test#5 bouza!', -- unknown fork
  3719. '<div class="markdown"><p>foo bar user/ns/test#5 bouza!</p></div>',
  3720. # 'foo bar fork/pingou/ns/test#7 bouza!',
  3721. '<div class="markdown"><p>foo bar <a href="/fork/pingou/ns/test/issue/7"'
  3722. ' title="[Open] test issue #7">'
  3723. "pingou/ns/test#7</a> bouza!</p></div>",
  3724. # 'test#1 bazinga!',
  3725. '<div class="markdown"><p><a href="/test/pull-request/1" '
  3726. 'title="[Open] test pull-request">test#1</a> bazinga!</p></div>',
  3727. # 'pingou opened the PR forks/pingou/test#2'
  3728. '<div class="markdown"><p>pingou opened the PR <a href="/fork/pingou/test/pull-request/2" '
  3729. 'title="[Open] test pull-request in fork">pingou/test#2</a></p></div>',
  3730. # 'fork/pingou/ns/test#8 is private',
  3731. '<div class="markdown"><p><a href="/fork/pingou/ns/test/issue/8" '
  3732. 'title="Private issue">pingou/ns/test#8</a> is private</p></div>',
  3733. # 'implicit link to #1',
  3734. '<div class="markdown"><p>implicit link to <a href="/test/pull-request/1" title="[Open] test pull-request">#1</a></p></div>',
  3735. # 'implicit link .#1. with non-whitespace, non-word characters',
  3736. '<div class="markdown"><p>implicit link .<a href="/test/pull-request/1" title="[Open] test pull-request">#1</a>. with non-whitespace, non-word characters</p></div>',
  3737. # '#2 - implicit link at start of line',
  3738. '<div class="markdown"><p><a href="/test/issue/2" title="[Open] test issue">#2</a> - implicit link at start of line</p></div>',
  3739. # '#2. implicit link at start of line with no whitespace after',
  3740. '<div class="markdown"><p><a href="/test/issue/2" title="[Open] test issue">#2</a>. implicit link at start of line with no whitespace after</p></div>',
  3741. # '#regular header',
  3742. '<div class="markdown"><h1>regular header</h1></div>',
  3743. # '#34 looks like an implicit link, but no issue 34',
  3744. '<div class="markdown"><h1>34 looks like an implicit link, but no issue 34</h1></div>',
  3745. # 'pingou committed on test#9364354a4555ba17aa60f0dc844d70b74eb1aecd',
  3746. '<div class="markdown"><p>pingou committed on <a href="/test/c/'
  3747. '9364354a4555ba17aa60f0dc844d70b74eb1aecd" '
  3748. 'title="Commit 9364354a4555ba17aa60f0dc844d70b74eb1aecd"'
  3749. ">test#9364354a4555ba17aa60f0dc844d70b74eb1aecd</a></p></div>",
  3750. # 'irc://pagure.io'
  3751. '<div class="markdown"><p><a href="irc://pagure.io">irc://pagure.io</a></p></div>',
  3752. # 'ircs://pagure.io' - This is getting cleaned by python-bleach
  3753. # and the version 1.4.3 that we have won't let us adjust the
  3754. # list of supported protocols
  3755. # '<p><a href="ircs://pagure.io">ircs://pagure.io</a></p>',
  3756. '<div class="markdown"><p><a href="ircs://pagure.io">ircs://pagure.io</a></p></div>'
  3757. if tuple(bleach_v) >= (1, 5, 0)
  3758. else '<div class="markdown"><p><a>ircs://pagure.io</a></p></div>',
  3759. # 'http://pagure.io'
  3760. '<div class="markdown"><p><a href="http://pagure.io">http://pagure.io</a></p></div>',
  3761. # 'https://pagure.io'
  3762. '<div class="markdown"><p><a href="https://pagure.io">https://pagure.io</a></p></div>',
  3763. # '<https://pagure.io/pagure>'
  3764. '<div class="markdown"><p><a href="https://pagure.io/pagure">https://pagure.io/pagure'
  3765. "</a></p></div>",
  3766. # '~~foo~~'
  3767. '<div class="markdown"><p><del>foo</del></p></div>',
  3768. # '~~foo bar~~'
  3769. '<div class="markdown"><p><del>foo bar</del></p></div>',
  3770. # '~~[BZ#1435310](https://bugzilla.redhat.com/1435310)~~'
  3771. '<div class="markdown"><p><del><a href="https://bugzilla.redhat.com/1435310">'
  3772. "BZ#1435310</a></del></p></div>",
  3773. # '~~[BZ#1435310](https://bugzilla.redhat.com/1435310) avc
  3774. # denial during F26AH boot 'error_name=org.freedesktop.systemd1
  3775. # .NoSuchDynamicUser~~'
  3776. '<div class="markdown"><p><del><a href="https://bugzilla.redhat.com/1435310">'
  3777. "BZ#1435310</a> avc denial during F26AH boot 'error_name="
  3778. "org.freedesktop.systemd1.NoSuchDynamicUser'</del></p></div>",
  3779. # '``~~foo bar~~``'
  3780. '<div class="markdown"><p><code>~~foo bar~~</code></p></div>',
  3781. # '~~foo bar~~ and ~~another ~~',
  3782. '<div class="markdown"><p><del>foo bar</del> and <del>another </del></p></div>',
  3783. # 'lets mention @pingou',
  3784. '<div class="markdown"><p>lets mention <a href="http://localhost.localdomain/user/pingou">@pingou</a></p></div>',
  3785. # '@pingou at start of line',
  3786. '<div class="markdown"><p><a href="http://localhost.localdomain/user/pingou">@pingou</a> at start of line</p></div>',
  3787. # 'but not someone@pingou.com',
  3788. '<div class="markdown"><p>but not someone@pingou.com</p></div>',
  3789. ]
  3790. if mk_321:
  3791. print("**** Markdown 3.2.1+ behavior")
  3792. expected.append(
  3793. # '[![Fedora_infinity_small.png]'
  3794. # '(/test/issue/raw/Fedora_infinity_small.png)]'
  3795. # '(/test/issue/raw/Fedora_infinity_small.png)',
  3796. '<div class="markdown"><p><a href="/test/issue/raw/Fedora_infinity_small.png"><span>'
  3797. '<img alt="Fedora_infinity_small.png" class="lazyload" '
  3798. 'data-src="/test/issue/raw/Fedora_infinity_small.png" src="">'
  3799. "<noscript>"
  3800. '&lt;img alt="Fedora_infinity_small.png" '
  3801. 'src="/test/issue/raw/Fedora_infinity_small.png" /&gt;'
  3802. "</noscript></span></a></p></div>"
  3803. )
  3804. elif old_markdown:
  3805. print("**** Old markdown behavior")
  3806. expected.append(
  3807. # '[![Fedora_infinity_small.png]'
  3808. # '(/test/issue/raw/Fedora_infinity_small.png)]'
  3809. # '(/test/issue/raw/Fedora_infinity_small.png)',
  3810. '<div class="markdown"><p><a href="/test/issue/raw/Fedora_infinity_small.png"><span>'
  3811. '<img alt="Fedora_infinity_small.png" class="lazyload" '
  3812. 'data-src="/test/issue/raw/Fedora_infinity_small.png" src="">'
  3813. "<noscript>"
  3814. '<img alt="Fedora_infinity_small.png" '
  3815. 'src="/test/issue/raw/Fedora_infinity_small.png" />'
  3816. "</noscript></span></a></p></div>"
  3817. )
  3818. else:
  3819. print("**** Not old but no longer new markdown")
  3820. expected.append(
  3821. # '[![Fedora_infinity_small.png]'
  3822. # '(/test/issue/raw/Fedora_infinity_small.png)]'
  3823. # '(/test/issue/raw/Fedora_infinity_small.png)',
  3824. '<div class="markdown"><p><a href="/test/issue/raw/Fedora_infinity_small.png"><span>'
  3825. '<img alt="Fedora_infinity_small.png" class="lazyload" '
  3826. 'data-src="/test/issue/raw/Fedora_infinity_small.png" src="">'
  3827. "<noscript>"
  3828. '<img alt="Fedora_infinity_small.png" '
  3829. 'src="/test/issue/raw/Fedora_infinity_small.png">'
  3830. "</noscript></span></a></p></div>"
  3831. )
  3832. with self.app.application.app_context() as ctx:
  3833. ctx.g.session = self.session
  3834. with ctx.app.test_request_context() as reqctx:
  3835. reqctx.request.url_root = "http://localhost.localdomain/"
  3836. reqctx.request.url = (
  3837. "http://localhost.localdomain/test/issue/69"
  3838. )
  3839. reqctx.request.args = Mock()
  3840. reqctx.request.args.get = Mock(return_value=None)
  3841. for idx, text in enumerate(texts):
  3842. print(text)
  3843. html = pagure.lib.query.text2markdown(text)
  3844. self.assertEqual(html, expected[idx])
  3845. def test_text2markdown_exception(self):
  3846. """ Test the text2markdown method in pagure.lib.query. """
  3847. text = "test#1 bazinga!"
  3848. expected_html = "test#1 bazinga!"
  3849. with self.app.application.app_context() as ctx:
  3850. html = pagure.lib.query.text2markdown(text)
  3851. self.assertEqual(html, expected_html)
  3852. def test_text2markdown_empty_string(self):
  3853. """ Test the text2markdown method in pagure.lib.query. """
  3854. text = ""
  3855. expected_html = ""
  3856. html = pagure.lib.query.text2markdown(text)
  3857. self.assertEqual(html, expected_html)
  3858. def test_get_access_levels(self):
  3859. """ Test the get_access_levels method in pagure.lib """
  3860. acls = pagure.lib.query.get_access_levels(self.session)
  3861. self.assertEqual(
  3862. sorted(["admin", "collaborator", "commit", "ticket"]), sorted(acls)
  3863. )
  3864. def test_get_project_users(self):
  3865. """Test the get_project_users method when combine is True"""
  3866. tests.create_projects(self.session)
  3867. project = pagure.lib.query.get_authorized_project(
  3868. self.session, project_name="test"
  3869. )
  3870. # Default value of combine is True
  3871. # which means the an admin is a user, committer as well
  3872. # and a committer is also a user
  3873. # and a user is just a user
  3874. users = project.get_project_users(access="admin")
  3875. # Only pingou is the admin as of now
  3876. # But, he is the creator and
  3877. # the creator of the project is not listed in user_projects
  3878. # table. Thus, get_projec_users won't return him as an admin
  3879. # He has all the access of an admin though
  3880. self.assertEqual(len(users), 0)
  3881. self.assertEqual(project.user.username, "pingou")
  3882. # Wrong access level, should raise Accesslevelnotfound exception
  3883. self.assertRaises(
  3884. pagure.exceptions.AccessLevelNotFound,
  3885. project.get_project_users,
  3886. access="owner",
  3887. )
  3888. # Let's add a new user to the project, 'foo'
  3889. # By default, if no access is specified, he becomes an admin
  3890. msg = pagure.lib.query.add_user_to_project(
  3891. self.session, project=project, new_user="foo", user="pingou"
  3892. )
  3893. self.session.commit()
  3894. # since, he is an admin, the msg should be 'User added'
  3895. self.assertEqual(msg, "User added")
  3896. project = pagure.lib.query.get_authorized_project(
  3897. self.session, project_name="test"
  3898. )
  3899. users = project.get_project_users(access="admin")
  3900. self.assertEqual(len(users), 1)
  3901. self.assertEqual(users[0].username, "foo")
  3902. # foo should be a committer as well, since he is an admin
  3903. users = project.get_project_users(access="commit")
  3904. self.assertEqual(len(users), 1)
  3905. self.assertEqual(users[0].username, "foo")
  3906. # the admin also has ticket access
  3907. users = project.get_project_users(access="ticket")
  3908. self.assertEqual(len(users), 1)
  3909. self.assertEqual(users[0].username, "foo")
  3910. # let's update the access of foo to 'committer'
  3911. msg = pagure.lib.query.add_user_to_project(
  3912. self.session,
  3913. project=project,
  3914. new_user="foo",
  3915. user="pingou",
  3916. access="commit",
  3917. )
  3918. self.session.commit()
  3919. self.assertEqual(msg, "User access updated")
  3920. project = pagure.lib.query.get_authorized_project(
  3921. self.session, project_name="test"
  3922. )
  3923. # No admin now, even though pingou the creator is there
  3924. users = project.get_project_users(access="admin")
  3925. self.assertEqual(len(users), 0)
  3926. users = project.get_project_users(access="commit")
  3927. # foo is the committer currently
  3928. self.assertEqual(len(users), 1)
  3929. self.assertEqual(users[0].username, "foo")
  3930. users = project.get_project_users(access="ticket")
  3931. # foo also has ticket rights
  3932. self.assertEqual(len(users), 1)
  3933. self.assertEqual(users[0].username, "foo")
  3934. # let's update the access of foo to 'ticket'
  3935. msg = pagure.lib.query.add_user_to_project(
  3936. self.session,
  3937. project=project,
  3938. new_user="foo",
  3939. user="pingou",
  3940. access="ticket",
  3941. )
  3942. self.session.commit()
  3943. self.assertEqual(msg, "User access updated")
  3944. project = pagure.lib.query.get_authorized_project(
  3945. self.session, project_name="test"
  3946. )
  3947. # No admin now, even though pingou the creator is there
  3948. users = project.get_project_users(access="admin")
  3949. self.assertEqual(len(users), 0)
  3950. users = project.get_project_users(access="commit")
  3951. # foo deosn't have commit rights now
  3952. self.assertEqual(len(users), 0)
  3953. users = project.get_project_users(access="ticket")
  3954. # foo does have tickets right though
  3955. self.assertEqual(len(users), 1)
  3956. self.assertEqual(users[0].username, "foo")
  3957. def test_get_project_users_combine_false(self):
  3958. """Test the get_project_users method when combine is False"""
  3959. tests.create_projects(self.session)
  3960. project = pagure.lib.query.get_authorized_project(
  3961. self.session, project_name="test"
  3962. )
  3963. # Let's add a new user to the project, 'foo'
  3964. # By default, if no access is specified, he becomes an admin
  3965. msg = pagure.lib.query.add_user_to_project(
  3966. self.session, project=project, new_user="foo", user="pingou"
  3967. )
  3968. self.session.commit()
  3969. # since, he is an admin, the msg should be 'User added'
  3970. self.assertEqual(msg, "User added")
  3971. # only one admin
  3972. users = project.get_project_users(access="admin", combine=False)
  3973. self.assertEqual(len(users), 1)
  3974. self.assertEqual(users[0].username, "foo")
  3975. # No user with only commit access
  3976. users = project.get_project_users(access="commit", combine=False)
  3977. self.assertEqual(len(users), 0)
  3978. # No user with only ticket access
  3979. users = project.get_project_users(access="ticket", combine=False)
  3980. self.assertEqual(len(users), 0)
  3981. # Update the access level of foo user to commit
  3982. msg = pagure.lib.query.add_user_to_project(
  3983. self.session,
  3984. project=project,
  3985. new_user="foo",
  3986. user="pingou",
  3987. access="commit",
  3988. )
  3989. self.session.commit()
  3990. self.assertEqual(msg, "User access updated")
  3991. # He is just a committer
  3992. project = pagure.lib.query.get_authorized_project(
  3993. self.session, project_name="test"
  3994. )
  3995. users = project.get_project_users(access="admin", combine=False)
  3996. self.assertEqual(len(users), 0)
  3997. # He is just a committer
  3998. users = project.get_project_users(access="commit", combine=False)
  3999. self.assertEqual(len(users), 1)
  4000. self.assertEqual(users[0].username, "foo")
  4001. # He is just a committer
  4002. users = project.get_project_users(access="ticket", combine=False)
  4003. self.assertEqual(len(users), 0)
  4004. # Update the access level of foo user to ticket
  4005. msg = pagure.lib.query.add_user_to_project(
  4006. self.session,
  4007. project=project,
  4008. new_user="foo",
  4009. user="pingou",
  4010. access="ticket",
  4011. )
  4012. self.session.commit()
  4013. self.assertEqual(msg, "User access updated")
  4014. # He is just a ticketer
  4015. project = pagure.lib.query.get_authorized_project(
  4016. self.session, project_name="test"
  4017. )
  4018. users = project.get_project_users(access="admin", combine=False)
  4019. self.assertEqual(len(users), 0)
  4020. # He is just a ticketer
  4021. users = project.get_project_users(access="commit", combine=False)
  4022. self.assertEqual(len(users), 0)
  4023. # He is just a ticketer
  4024. users = project.get_project_users(access="ticket", combine=False)
  4025. self.assertEqual(len(users), 1)
  4026. self.assertEqual(users[0].username, "foo")
  4027. def test_get_project_groups(self):
  4028. """Test the get_project_groups method when combine is True"""
  4029. # Create some projects
  4030. tests.create_projects(self.session)
  4031. # Create a group in database
  4032. msg = pagure.lib.query.add_group(
  4033. self.session,
  4034. group_name="JL",
  4035. display_name="Justice League",
  4036. description="Nope, it's not JLA anymore",
  4037. group_type="user",
  4038. user="foo",
  4039. is_admin=False,
  4040. blacklist=pagure.config.config.get("BLACKLISTED_PROJECTS"),
  4041. )
  4042. self.assertEqual(msg, "User `foo` added to the group `JL`.")
  4043. # Add the group to project we just created, test
  4044. # First add it as an admin
  4045. project = pagure.lib.query.get_authorized_project(
  4046. self.session, project_name="test"
  4047. )
  4048. msg = pagure.lib.query.add_group_to_project(
  4049. self.session, project=project, new_group="JL", user="pingou"
  4050. )
  4051. self.session.commit()
  4052. self.assertEqual(msg, "Group added")
  4053. # Now, the group is an admin in the project
  4054. # so, it must have access to everything
  4055. project = pagure.lib.query.get_authorized_project(
  4056. self.session, project_name="test"
  4057. )
  4058. groups = project.get_project_groups(access="admin")
  4059. self.assertEqual(len(groups), 1)
  4060. self.assertEqual(groups[0].display_name, "Justice League")
  4061. self.assertEqual(len(project.admin_groups), 1)
  4062. self.assertEqual(
  4063. project.admin_groups[0].display_name, "Justice League"
  4064. )
  4065. # The group should be committer as well
  4066. groups = project.get_project_groups(access="commit")
  4067. self.assertEqual(len(groups), 1)
  4068. self.assertEqual(groups[0].display_name, "Justice League")
  4069. self.assertEqual(len(project.committer_groups), 1)
  4070. self.assertEqual(
  4071. project.committer_groups[0].display_name, "Justice League"
  4072. )
  4073. # The group should be ticketer as well
  4074. groups = project.get_project_groups(access="ticket")
  4075. self.assertEqual(len(groups), 1)
  4076. self.assertEqual(groups[0].display_name, "Justice League")
  4077. self.assertEqual(len(project.groups), 1)
  4078. self.assertEqual(project.groups[0].display_name, "Justice League")
  4079. # Update the access level of the group, JL to commit
  4080. project = pagure.lib.query.get_authorized_project(
  4081. self.session, project_name="test"
  4082. )
  4083. msg = pagure.lib.query.add_group_to_project(
  4084. self.session,
  4085. project=project,
  4086. new_group="JL",
  4087. user="pingou",
  4088. access="commit",
  4089. )
  4090. self.session.commit()
  4091. self.assertEqual(msg, "Group access updated")
  4092. # It shouldn't be an admin
  4093. project = pagure.lib.query.get_authorized_project(
  4094. self.session, project_name="test"
  4095. )
  4096. groups = project.get_project_groups(access="admin")
  4097. self.assertEqual(len(groups), 0)
  4098. self.assertEqual(len(project.admin_groups), 0)
  4099. # It is a committer
  4100. groups = project.get_project_groups(access="commit")
  4101. self.assertEqual(len(groups), 1)
  4102. self.assertEqual(groups[0].display_name, "Justice League")
  4103. self.assertEqual(len(project.committer_groups), 1)
  4104. self.assertEqual(
  4105. project.committer_groups[0].display_name, "Justice League"
  4106. )
  4107. # The group should be ticketer as well
  4108. groups = project.get_project_groups(access="ticket")
  4109. self.assertEqual(len(groups), 1)
  4110. self.assertEqual(groups[0].display_name, "Justice League")
  4111. self.assertEqual(len(project.groups), 1)
  4112. self.assertEqual(project.groups[0].display_name, "Justice League")
  4113. # Update the access of group JL to ticket
  4114. msg = pagure.lib.query.add_group_to_project(
  4115. self.session,
  4116. project=project,
  4117. new_group="JL",
  4118. user="pingou",
  4119. access="ticket",
  4120. )
  4121. self.session.commit()
  4122. self.assertEqual(msg, "Group access updated")
  4123. # It is not an admin
  4124. project = pagure.lib.query.get_authorized_project(
  4125. self.session, project_name="test"
  4126. )
  4127. groups = project.get_project_groups(access="admin")
  4128. self.assertEqual(len(groups), 0)
  4129. self.assertEqual(len(project.admin_groups), 0)
  4130. # The group shouldn't be a committer
  4131. groups = project.get_project_groups(access="commit")
  4132. self.assertEqual(len(groups), 0)
  4133. self.assertEqual(len(project.committer_groups), 0)
  4134. # The group should be ticketer
  4135. groups = project.get_project_groups(access="ticket")
  4136. self.assertEqual(len(groups), 1)
  4137. self.assertEqual(groups[0].display_name, "Justice League")
  4138. self.assertEqual(len(project.groups), 1)
  4139. self.assertEqual(project.groups[0].display_name, "Justice League")
  4140. def test_get_project_groups_combine_false(self):
  4141. """Test the get_project_groups method when combine is False"""
  4142. # Create some projects
  4143. tests.create_projects(self.session)
  4144. # Create a group in database
  4145. msg = pagure.lib.query.add_group(
  4146. self.session,
  4147. group_name="JL",
  4148. display_name="Justice League",
  4149. description="Nope, it's not JLA anymore",
  4150. group_type="user",
  4151. user="foo",
  4152. is_admin=False,
  4153. blacklist=pagure.config.config.get("BLACKLISTED_PROJECTS"),
  4154. )
  4155. self.assertEqual(msg, "User `foo` added to the group `JL`.")
  4156. # Add the group to project we just created, test
  4157. # First add it as an admin
  4158. project = pagure.lib.query.get_authorized_project(
  4159. self.session, project_name="test"
  4160. )
  4161. msg = pagure.lib.query.add_group_to_project(
  4162. self.session, project=project, new_group="JL", user="pingou"
  4163. )
  4164. self.session.commit()
  4165. self.assertEqual(msg, "Group added")
  4166. # Now, the group is an admin in the project
  4167. # so, it must have access to everything
  4168. project = pagure.lib.query.get_authorized_project(
  4169. self.session, project_name="test"
  4170. )
  4171. groups = project.get_project_groups(access="admin", combine=False)
  4172. self.assertEqual(len(groups), 1)
  4173. self.assertEqual(groups[0].display_name, "Justice League")
  4174. self.assertEqual(len(project.admin_groups), 1)
  4175. self.assertEqual(
  4176. project.admin_groups[0].display_name, "Justice League"
  4177. )
  4178. # The group shoudn't be a committer
  4179. groups = project.get_project_groups(access="commit", combine=False)
  4180. self.assertEqual(len(groups), 0)
  4181. # The group shoudn't be a ticketer
  4182. groups = project.get_project_groups(access="ticket", combine=False)
  4183. self.assertEqual(len(groups), 0)
  4184. # Update the access level of the group, JL to commit
  4185. project = pagure.lib.query.get_authorized_project(
  4186. self.session, project_name="test"
  4187. )
  4188. msg = pagure.lib.query.add_group_to_project(
  4189. self.session,
  4190. project=project,
  4191. new_group="JL",
  4192. user="pingou",
  4193. access="commit",
  4194. )
  4195. self.session.commit()
  4196. self.assertEqual(msg, "Group access updated")
  4197. # It shouldn't be an admin
  4198. project = pagure.lib.query.get_authorized_project(
  4199. self.session, project_name="test"
  4200. )
  4201. groups = project.get_project_groups(access="admin", combine=False)
  4202. self.assertEqual(len(groups), 0)
  4203. # It is a committer
  4204. groups = project.get_project_groups(access="commit", combine=False)
  4205. self.assertEqual(len(groups), 1)
  4206. self.assertEqual(groups[0].display_name, "Justice League")
  4207. self.assertEqual(len(project.committer_groups), 1)
  4208. self.assertEqual(
  4209. project.committer_groups[0].display_name, "Justice League"
  4210. )
  4211. # The group shouldn't be ticketer
  4212. groups = project.get_project_groups(access="ticket", combine=False)
  4213. self.assertEqual(len(groups), 0)
  4214. # Update the access of group JL to ticket
  4215. msg = pagure.lib.query.add_group_to_project(
  4216. self.session,
  4217. project=project,
  4218. new_group="JL",
  4219. user="pingou",
  4220. access="ticket",
  4221. )
  4222. self.session.commit()
  4223. self.assertEqual(msg, "Group access updated")
  4224. # It is not an admin
  4225. project = pagure.lib.query.get_authorized_project(
  4226. self.session, project_name="test"
  4227. )
  4228. groups = project.get_project_groups(access="admin", combine=False)
  4229. self.assertEqual(len(groups), 0)
  4230. # The group shouldn't be a committer
  4231. groups = project.get_project_groups(access="commit", combine=False)
  4232. self.assertEqual(len(groups), 0)
  4233. # The group should be ticketer
  4234. groups = project.get_project_groups(access="ticket", combine=False)
  4235. self.assertEqual(len(groups), 1)
  4236. self.assertEqual(groups[0].display_name, "Justice League")
  4237. self.assertEqual(len(project.groups), 1)
  4238. self.assertEqual(project.groups[0].display_name, "Justice League")
  4239. def test_get_obj_access_user(self):
  4240. """Test the get_obj_access method of pagure.lib
  4241. for model.User object"""
  4242. # Create the projects
  4243. tests.create_projects(self.session)
  4244. # Add a user object - make him an admin first
  4245. project = pagure.lib.query.get_authorized_project(
  4246. self.session, project_name="test"
  4247. )
  4248. msg = pagure.lib.query.add_user_to_project(
  4249. self.session, project=project, new_user="foo", user="pingou"
  4250. )
  4251. self.session.commit()
  4252. self.assertEqual(msg, "User added")
  4253. user = pagure.lib.query.get_user(self.session, key="foo")
  4254. # He should be an admin
  4255. access_obj = pagure.lib.query.get_obj_access(
  4256. self.session, project_obj=project, obj=user
  4257. )
  4258. self.assertEqual(access_obj.access, "admin")
  4259. # Update and check for commit access
  4260. msg = pagure.lib.query.add_user_to_project(
  4261. self.session,
  4262. project=project,
  4263. new_user="foo",
  4264. user="pingou",
  4265. access="commit",
  4266. )
  4267. self.session.commit()
  4268. self.assertEqual(msg, "User access updated")
  4269. project = pagure.lib.query.get_authorized_project(
  4270. self.session, project_name="test"
  4271. )
  4272. # He should be a committer
  4273. access_obj = pagure.lib.query.get_obj_access(
  4274. self.session, project_obj=project, obj=user
  4275. )
  4276. self.assertEqual(access_obj.access, "commit")
  4277. # Update and check for ticket access
  4278. msg = pagure.lib.query.add_user_to_project(
  4279. self.session,
  4280. project=project,
  4281. new_user="foo",
  4282. user="pingou",
  4283. access="ticket",
  4284. )
  4285. self.session.commit()
  4286. self.assertEqual(msg, "User access updated")
  4287. project = pagure.lib.query.get_authorized_project(
  4288. self.session, project_name="test"
  4289. )
  4290. # He should be a ticketer
  4291. access_obj = pagure.lib.query.get_obj_access(
  4292. self.session, project_obj=project, obj=user
  4293. )
  4294. self.assertEqual(access_obj.access, "ticket")
  4295. def test_get_obj_access_group(self):
  4296. """Test the get_obj_access method of pagure.lib
  4297. for model.PagureGroup object"""
  4298. # Create the projects
  4299. tests.create_projects(self.session)
  4300. # Create a group in database
  4301. msg = pagure.lib.query.add_group(
  4302. self.session,
  4303. group_name="JL",
  4304. display_name="Justice League",
  4305. description="Nope, it's not JLA anymore",
  4306. group_type="user",
  4307. user="foo",
  4308. is_admin=False,
  4309. blacklist=pagure.config.config.get("BLACKLISTED_PROJECTS"),
  4310. )
  4311. self.assertEqual(msg, "User `foo` added to the group `JL`.")
  4312. # Add a group object - make him an admin first
  4313. project = pagure.lib.query.get_authorized_project(
  4314. self.session, project_name="test"
  4315. )
  4316. msg = pagure.lib.query.add_group_to_project(
  4317. self.session, project=project, new_group="JL", user="pingou"
  4318. )
  4319. self.session.commit()
  4320. self.assertEqual(msg, "Group added")
  4321. group = pagure.lib.query.search_groups(self.session, group_name="JL")
  4322. # He should be an admin
  4323. access_obj = pagure.lib.query.get_obj_access(
  4324. self.session, project_obj=project, obj=group
  4325. )
  4326. self.assertEqual(access_obj.access, "admin")
  4327. # Update and check for commit access
  4328. msg = pagure.lib.query.add_group_to_project(
  4329. self.session,
  4330. project=project,
  4331. new_group="JL",
  4332. user="pingou",
  4333. access="commit",
  4334. )
  4335. self.session.commit()
  4336. self.assertEqual(msg, "Group access updated")
  4337. project = pagure.lib.query.get_authorized_project(
  4338. self.session, project_name="test"
  4339. )
  4340. # He should be a committer
  4341. access_obj = pagure.lib.query.get_obj_access(
  4342. self.session, project_obj=project, obj=group
  4343. )
  4344. self.assertEqual(access_obj.access, "commit")
  4345. # Update and check for ticket access
  4346. msg = pagure.lib.query.add_group_to_project(
  4347. self.session,
  4348. project=project,
  4349. new_group="JL",
  4350. user="pingou",
  4351. access="ticket",
  4352. )
  4353. self.session.commit()
  4354. self.assertEqual(msg, "Group access updated")
  4355. project = pagure.lib.query.get_authorized_project(
  4356. self.session, project_name="test"
  4357. )
  4358. # He should be a ticketer
  4359. access_obj = pagure.lib.query.get_obj_access(
  4360. self.session, project_obj=project, obj=group
  4361. )
  4362. self.assertEqual(access_obj.access, "ticket")
  4363. def test_set_watch_obj(self):
  4364. """ Test the set_watch_obj method in pagure.lib """
  4365. # Create the project ns/test
  4366. item = pagure.lib.model.Project(
  4367. user_id=1, # pingou
  4368. name="test3",
  4369. namespace="ns",
  4370. description="test project #1",
  4371. hook_token="aaabbbcccdd",
  4372. )
  4373. item.close_status = ["Invalid", "Insufficient data", "Fixed"]
  4374. self.session.add(item)
  4375. self.session.commit()
  4376. # Create the ticket
  4377. iss = pagure.lib.query.new_issue(
  4378. issue_id=4,
  4379. session=self.session,
  4380. repo=item,
  4381. title="test issue",
  4382. content="content test issue",
  4383. user="pingou",
  4384. )
  4385. self.session.commit()
  4386. self.assertEqual(iss.id, 4)
  4387. self.assertEqual(iss.title, "test issue")
  4388. # Unknown user
  4389. self.assertRaises(
  4390. pagure.exceptions.PagureException,
  4391. pagure.lib.query.set_watch_obj,
  4392. self.session,
  4393. "unknown",
  4394. iss,
  4395. True,
  4396. )
  4397. # Invalid object to watch - project
  4398. self.assertRaises(
  4399. pagure.exceptions.InvalidObjectException,
  4400. pagure.lib.query.set_watch_obj,
  4401. self.session,
  4402. "foo",
  4403. iss.project,
  4404. True,
  4405. )
  4406. # Invalid object to watch - string
  4407. self.assertRaises(
  4408. AttributeError,
  4409. pagure.lib.query.set_watch_obj,
  4410. self.session,
  4411. "foo",
  4412. "ticket",
  4413. True,
  4414. )
  4415. # Watch the ticket
  4416. out = pagure.lib.query.set_watch_obj(self.session, "foo", iss, True)
  4417. self.assertEqual(out, "You are now watching this issue")
  4418. # Un-watch the ticket
  4419. out = pagure.lib.query.set_watch_obj(self.session, "foo", iss, False)
  4420. self.assertEqual(out, "You are no longer watching this issue")
  4421. def test_tokenize_search_string(self):
  4422. """ Test the tokenize_search_string function. """
  4423. # These are the tests performed to make sure we tokenize correctly.
  4424. # This is in the form: input string, custom fields, remaining pattern
  4425. tests = [
  4426. ("test123", {}, "test123"),
  4427. ("test:key test123", {"test": "key"}, "test123"),
  4428. (
  4429. 'test:"key with spaces" test123',
  4430. {"test": "key with spaces"},
  4431. "test123",
  4432. ),
  4433. ("test123 test:key test456", {"test": "key"}, "test123 test456"),
  4434. (
  4435. 'test123 test:"key with spaces" key2:value12 test456',
  4436. {"test": "key with spaces", "key2": "value12"},
  4437. "test123 test456",
  4438. ),
  4439. ]
  4440. for inp, flds, rem in tests:
  4441. self.assertEqual(
  4442. pagure.lib.query.tokenize_search_string(inp), (flds, rem)
  4443. )
  4444. def test_save_report(self):
  4445. """ Test the save_report function. """
  4446. # Create the projects
  4447. tests.create_projects(self.session)
  4448. project = pagure.lib.query.get_authorized_project(
  4449. self.session, project_name="test"
  4450. )
  4451. self.assertEqual(project.reports, {})
  4452. name = "test report"
  4453. url = "?foo=bar&baz=biz"
  4454. pagure.lib.query.save_report(
  4455. self.session, repo=project, name=name, url=url, username=None
  4456. )
  4457. project = pagure.lib.query.get_authorized_project(
  4458. self.session, project_name="test"
  4459. )
  4460. self.assertEqual(
  4461. project.reports, {"test report": {"baz": "biz", "foo": "bar"}}
  4462. )
  4463. name = "test report #2"
  4464. url = "?foo=bar&foo=none&foo=baz"
  4465. pagure.lib.query.save_report(
  4466. self.session, repo=project, name=name, url=url, username=None
  4467. )
  4468. project = pagure.lib.query.get_authorized_project(
  4469. self.session, project_name="test"
  4470. )
  4471. self.assertEqual(
  4472. project.reports,
  4473. {
  4474. "test report": {"baz": "biz", "foo": "bar"},
  4475. "test report #2": {"foo": ["bar", "none", "baz"]},
  4476. },
  4477. )
  4478. def test_text2markdown_table(self):
  4479. """ Test the text2markdown function with a markdown table. """
  4480. try:
  4481. md_version = markdown.__version__.version_info
  4482. except AttributeError:
  4483. md_version = markdown.__version_info__
  4484. if md_version < (2, 6, 7):
  4485. raise unittest.case.SkipTest(
  4486. "Skipping on old markdown that do not strip the orientation row"
  4487. )
  4488. text = """
  4489. | Left-aligned | Center-aligned | Right-aligned |
  4490. | :--- | :---: | ---: |
  4491. | git status | git status | git status |
  4492. | git diff | git diff | git diff |
  4493. foo bar
  4494. """
  4495. expected = """<div class="markdown"><table>
  4496. <thead>
  4497. <tr>
  4498. <th align="left">Left-aligned</th>
  4499. <th align="center">Center-aligned</th>
  4500. <th align="right">Right-aligned</th>
  4501. </tr>
  4502. </thead>
  4503. <tbody>
  4504. <tr>
  4505. <td align="left">git status</td>
  4506. <td align="center">git status</td>
  4507. <td align="right">git status</td>
  4508. </tr>
  4509. <tr>
  4510. <td align="left">git diff</td>
  4511. <td align="center">git diff</td>
  4512. <td align="right">git diff</td>
  4513. </tr>
  4514. </tbody>
  4515. </table>
  4516. <p>foo bar</p></div>"""
  4517. with self.app.application.app_context():
  4518. html = pagure.lib.query.text2markdown(text)
  4519. self.assertEqual(html, expected)
  4520. def test_text2markdown_table_old_mk(self):
  4521. """Test the text2markdown function with a markdown table using the old
  4522. format where the orientation instruction are provided next to the column
  4523. delimiter unlike what can be done with more recent version of markdown.
  4524. """
  4525. text = """
  4526. | Left-aligned | Center-aligned | Right-aligned |
  4527. |:--- |:--------------:| ---:|
  4528. | git status | git status | git status |
  4529. | git diff | git diff | git diff |
  4530. foo bar
  4531. """
  4532. expected = """<div class="markdown"><table>
  4533. <thead>
  4534. <tr>
  4535. <th align="left">Left-aligned</th>
  4536. <th align="center">Center-aligned</th>
  4537. <th align="right">Right-aligned</th>
  4538. </tr>
  4539. </thead>
  4540. <tbody>
  4541. <tr>
  4542. <td align="left">git status</td>
  4543. <td align="center">git status</td>
  4544. <td align="right">git status</td>
  4545. </tr>
  4546. <tr>
  4547. <td align="left">git diff</td>
  4548. <td align="center">git diff</td>
  4549. <td align="right">git diff</td>
  4550. </tr>
  4551. </tbody>
  4552. </table>
  4553. <p>foo bar</p></div>"""
  4554. with self.app.application.app_context():
  4555. html = pagure.lib.query.text2markdown(text)
  4556. self.assertEqual(html, expected)
  4557. def test_set_redis(self):
  4558. """ Test the set_redis function of pagure.lib.query. """
  4559. self.assertIsNone(pagure.lib.query.REDIS)
  4560. pagure.lib.query.set_redis("0.0.0.0", 6379, 0)
  4561. self.assertIsNotNone(pagure.lib.query.REDIS)
  4562. def test_set_pagure_ci(self):
  4563. """ Test the set_pagure_ci function of pagure.lib.query. """
  4564. # self.assertIn(pagure.lib.query.PAGURE_CI, [None, ['jenkins']])
  4565. pagure.lib.query.set_pagure_ci(True)
  4566. self.assertIsNotNone(pagure.lib.query.PAGURE_CI)
  4567. self.assertTrue(pagure.lib.query.PAGURE_CI)
  4568. def test_get_user_invalid_user(self):
  4569. """ Test the get_user function of pagure.lib.query. """
  4570. self.assertRaises(
  4571. pagure.exceptions.PagureException,
  4572. pagure.lib.query.get_user,
  4573. self.session,
  4574. "unknown",
  4575. )
  4576. def test_get_user_username(self):
  4577. """ Test the get_user function of pagure.lib.query. """
  4578. user = pagure.lib.query.get_user(self.session, "foo")
  4579. self.assertEqual(user.username, "foo")
  4580. def test_get_user_email(self):
  4581. """ Test the get_user function of pagure.lib.query. """
  4582. user = pagure.lib.query.get_user(self.session, "bar@pingou.com")
  4583. self.assertEqual(user.username, "pingou")
  4584. def test_is_valid_ssh_key_empty(self):
  4585. """ Test the is_valid_ssh_key function of pagure.lib.query. """
  4586. self.assertIsNone(pagure.lib.query.is_valid_ssh_key(""))
  4587. def test_is_valid_ssh_key_invalid(self):
  4588. """ Test the is_valid_ssh_key function of pagure.lib.query. """
  4589. self.assertEqual(pagure.lib.query.is_valid_ssh_key("nonsense"), False)
  4590. def test_is_valid_ssh_key(self):
  4591. """ Test the is_valid_ssh_key function of pagure.lib.query. """
  4592. keyinfo = pagure.lib.query.is_valid_ssh_key(
  4593. """ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDtgzSO9d1IrKdmyBFUvtAJPLgGOhp0lSySkWRSe+/+3KXYjSnsLnCJQlO5M7JfaXhtTHEow86rh4W9+FoJdzo5iocAwH5xPZ5ttHLy7VHgTzNMUeMgKpjy6bBOdPoGPPG4mo7QCMCRJdWBRDv4OSEMLU5jQAvC272YK2V8L918VQ== root@test"""
  4594. )
  4595. keys = keyinfo.split("\n")
  4596. self.assertEqual(len(keys), 2)
  4597. self.assertEqual(keys[1], "")
  4598. key = keys[0].split(" ")
  4599. self.assertEqual(key[0], "1024")
  4600. # We should always get the SHA256 version
  4601. self.assertEqual(
  4602. key[1], "SHA256:ztcRXIq7y/HNfwwscrexCyDF46/Pn/oHTkBZx87GwI8"
  4603. )
  4604. self.assertEqual(key[3], "(RSA)")
  4605. def test_create_deploykeys_ssh_keys_on_disk_empty(self):
  4606. """Test the create_deploykeys_ssh_keys_on_disk function of
  4607. pagure.lib.query."""
  4608. self.assertIsNone(
  4609. pagure.lib.query.create_deploykeys_ssh_keys_on_disk(None, None)
  4610. )
  4611. self.assertFalse(
  4612. os.path.exists(os.path.join(self.path, "deploykeys", "test"))
  4613. )
  4614. def test_create_deploykeys_ssh_keys_on_disk_nokey(self):
  4615. """Test the create_deploykeys_ssh_keys_on_disk function of
  4616. pagure.lib.query."""
  4617. tests.create_projects(self.session)
  4618. project = pagure.lib.query._get_project(self.session, "test")
  4619. self.assertIsNone(
  4620. pagure.lib.query.create_deploykeys_ssh_keys_on_disk(
  4621. project, self.path
  4622. )
  4623. )
  4624. self.assertTrue(
  4625. os.path.exists(os.path.join(self.path, "deploykeys", "test"))
  4626. )
  4627. self.assertEqual(
  4628. os.listdir(os.path.join(self.path, "deploykeys", "test")), []
  4629. )
  4630. @patch(
  4631. "pagure.lib.query.is_valid_ssh_key", MagicMock(return_value="foo bar")
  4632. )
  4633. def test_create_deploykeys_ssh_keys_on_disk(self):
  4634. """Test the create_deploykeys_ssh_keys_on_disk function of
  4635. pagure.lib.query."""
  4636. tests.create_projects(self.session)
  4637. project = pagure.lib.query._get_project(self.session, "test")
  4638. # Add a deploy key to the project
  4639. pingou = pagure.lib.query.get_user(self.session, "pingou")
  4640. msg = pagure.lib.query.add_sshkey_to_project_or_user(
  4641. self.session,
  4642. project=project,
  4643. ssh_key="foo bar",
  4644. pushaccess=False,
  4645. creator=pingou,
  4646. )
  4647. self.assertEqual(msg, "SSH key added")
  4648. self.assertIsNone(
  4649. pagure.lib.query.create_deploykeys_ssh_keys_on_disk(
  4650. project, self.path
  4651. )
  4652. )
  4653. self.assertTrue(
  4654. os.path.exists(os.path.join(self.path, "deploykeys", "test"))
  4655. )
  4656. self.assertEqual(
  4657. os.listdir(os.path.join(self.path, "deploykeys", "test")),
  4658. ["deploykey_test_1.pub"],
  4659. )
  4660. # Remove the deploykey
  4661. project = pagure.lib.query._get_project(self.session, "test")
  4662. self.session.delete(project.deploykeys[0])
  4663. self.session.commit()
  4664. # Remove the file on disk
  4665. self.assertIsNone(
  4666. pagure.lib.query.create_deploykeys_ssh_keys_on_disk(
  4667. project, self.path
  4668. )
  4669. )
  4670. self.assertTrue(
  4671. os.path.exists(os.path.join(self.path, "deploykeys", "test"))
  4672. )
  4673. self.assertEqual(
  4674. os.listdir(os.path.join(self.path, "deploykeys", "test")), []
  4675. )
  4676. @patch(
  4677. "pagure.lib.query.is_valid_ssh_key",
  4678. MagicMock(return_value="\nfoo bar"),
  4679. )
  4680. def test_create_deploykeys_ssh_keys_on_disk_empty_first_key(self):
  4681. """Test the create_deploykeys_ssh_keys_on_disk function of
  4682. pagure.lib.query."""
  4683. tests.create_projects(self.session)
  4684. project = pagure.lib.query._get_project(self.session, "test")
  4685. # Add a deploy key to the project
  4686. new_key_obj = pagure.lib.model.SSHKey(
  4687. project_id=project.id,
  4688. pushaccess=False,
  4689. public_ssh_key="\n foo bar",
  4690. ssh_short_key="\n foo bar",
  4691. ssh_search_key="\n foo bar",
  4692. creator_user_id=1, # pingou
  4693. )
  4694. self.session.add(new_key_obj)
  4695. self.session.commit()
  4696. self.assertIsNone(
  4697. pagure.lib.query.create_deploykeys_ssh_keys_on_disk(
  4698. project, self.path
  4699. )
  4700. )
  4701. self.assertTrue(
  4702. os.path.exists(os.path.join(self.path, "deploykeys", "test"))
  4703. )
  4704. self.assertEqual(
  4705. os.listdir(os.path.join(self.path, "deploykeys", "test")), []
  4706. )
  4707. def test_create_deploykeys_ssh_keys_on_disk_invalid(self):
  4708. """Test the create_deploykeys_ssh_keys_on_disk function of
  4709. pagure.lib.query."""
  4710. tests.create_projects(self.session)
  4711. project = pagure.lib.query._get_project(self.session, "test")
  4712. # Add a deploy key to the project
  4713. new_key_obj = pagure.lib.model.SSHKey(
  4714. project_id=project.id,
  4715. pushaccess=False,
  4716. public_ssh_key="foo bar",
  4717. ssh_short_key="foo bar",
  4718. ssh_search_key="foo bar",
  4719. creator_user_id=1, # pingou
  4720. )
  4721. self.session.add(new_key_obj)
  4722. self.session.commit()
  4723. self.assertIsNone(
  4724. pagure.lib.query.create_deploykeys_ssh_keys_on_disk(
  4725. project, self.path
  4726. )
  4727. )
  4728. self.assertTrue(
  4729. os.path.exists(os.path.join(self.path, "deploykeys", "test"))
  4730. )
  4731. self.assertEqual(
  4732. os.listdir(os.path.join(self.path, "deploykeys", "test")), []
  4733. )
  4734. def test_create_user_ssh_keys_on_disk_none(self):
  4735. """ Test the create_user_ssh_keys_on_disk function of pagure.lib.query. """
  4736. self.assertIsNone(
  4737. pagure.lib.query.create_user_ssh_keys_on_disk(None, None)
  4738. )
  4739. def test_update_user_settings_invalid_user(self):
  4740. """ Test the update_user_settings function of pagure.lib.query. """
  4741. self.assertRaises(
  4742. pagure.exceptions.PagureException,
  4743. pagure.lib.query.update_user_settings,
  4744. session=self.session,
  4745. settings={},
  4746. user="invalid",
  4747. )
  4748. def test_update_user_settings_no_change(self):
  4749. """ Test the update_user_settings function of pagure.lib.query. """
  4750. # First update the setting
  4751. msg = pagure.lib.query.update_user_settings(
  4752. session=self.session,
  4753. settings={"cc_me_to_my_actions": True},
  4754. user="pingou",
  4755. )
  4756. self.assertEqual(msg, "Successfully edited your settings")
  4757. # Then change it back to its default
  4758. msg = pagure.lib.query.update_user_settings(
  4759. session=self.session, settings={}, user="pingou"
  4760. )
  4761. self.assertEqual(msg, "Successfully edited your settings")
  4762. def test_update_user_settings_no_data(self):
  4763. """ Test the update_user_settings function of pagure.lib.query. """
  4764. msg = pagure.lib.query.update_user_settings(
  4765. session=self.session,
  4766. settings={"cc_me_to_my_actions": False},
  4767. user="pingou",
  4768. )
  4769. self.assertEqual(msg, "No settings to change")
  4770. def test_update_user_settings(self):
  4771. """ Test the update_user_settings function of pagure.lib.query. """
  4772. msg = pagure.lib.query.update_user_settings(
  4773. session=self.session,
  4774. settings={"cc_me_to_my_actions": True},
  4775. user="pingou",
  4776. )
  4777. self.assertEqual(msg, "Successfully edited your settings")
  4778. @patch.dict(
  4779. pagure.config.config,
  4780. {"ALLOWED_EMAIL_DOMAINS": ["pingoured.fr"]},
  4781. clear=True,
  4782. )
  4783. def test_add_email_to_user_with_logs(self):
  4784. """Test the add_email_to_user function of pagure.lib when there
  4785. are log entries associated to the email added.
  4786. """
  4787. user = pagure.lib.query.search_user(self.session, username="pingou")
  4788. # Add a couple of log entries associated with the new email
  4789. for i in range(3):
  4790. log = pagure.lib.model.PagureLog(
  4791. user_email="new_email@pingoured.fr",
  4792. log_type="commit",
  4793. ref_id=i,
  4794. )
  4795. self.session.add(log)
  4796. self.session.commit()
  4797. # Check emails before
  4798. self.assertEqual(len(user.emails), 2)
  4799. # Add the new_email to the user
  4800. pagure.lib.query.add_email_to_user(
  4801. self.session, user, "new_email@pingoured.fr"
  4802. )
  4803. self.session.commit()
  4804. # Check emails after
  4805. self.assertEqual(len(user.emails), 3)
  4806. # add another email address that is not in an allowed domain
  4807. self.assertRaises(
  4808. pagure.exceptions.PagureException,
  4809. pagure.lib.query.add_email_to_user,
  4810. session=self.session,
  4811. user=user,
  4812. user_email="foo@bar.com",
  4813. )
  4814. self.session.commit()
  4815. # Check emails after
  4816. self.assertEqual(len(user.emails), 3)
  4817. # redo tests with 2 allowed domains
  4818. pagure.config.config["ALLOWED_EMAIL_DOMAINS"] = [
  4819. "pingoured.fr",
  4820. "example.com",
  4821. ]
  4822. # Add the new_email to the user
  4823. pagure.lib.query.add_email_to_user(
  4824. self.session, user, "another_email@example.com"
  4825. )
  4826. self.session.commit()
  4827. # Check emails after
  4828. self.assertEqual(len(user.emails), 4)
  4829. # add another email address that is not in an allowed domain
  4830. self.assertRaises(
  4831. pagure.exceptions.PagureException,
  4832. pagure.lib.query.add_email_to_user,
  4833. session=self.session,
  4834. user=user,
  4835. user_email="another_foo@bar.com",
  4836. )
  4837. self.session.commit()
  4838. # Check emails after
  4839. self.assertEqual(len(user.emails), 4)
  4840. @patch(
  4841. "pagure.lib.query.is_valid_ssh_key", MagicMock(return_value="foo bar")
  4842. )
  4843. def test_update_user_ssh_valid_key(self):
  4844. """ Test the update_user_ssh function of pagure.lib.query. """
  4845. pagure.SESSION = self.session
  4846. pagure.lib.query.update_user_ssh(
  4847. self.session, user="pingou", ssh_key="foo key", keydir=self.path
  4848. )
  4849. self.session.commit()
  4850. self.assertTrue(os.path.exists(os.path.join(self.path, "keys_0")))
  4851. self.assertEqual(
  4852. os.listdir(os.path.join(self.path, "keys_0")), ["pingou.pub"]
  4853. )
  4854. def test_add_user_pending_email_existing_email(self):
  4855. """ Test the add_user_pending_email function of pagure.lib.query. """
  4856. user = pagure.lib.query.search_user(self.session, username="pingou")
  4857. self.assertRaises(
  4858. pagure.exceptions.PagureException,
  4859. pagure.lib.query.add_user_pending_email,
  4860. session=self.session,
  4861. userobj=user,
  4862. email="foo@bar.com",
  4863. )
  4864. @patch("pagure.lib.notify.notify_new_email", MagicMock(return_value=True))
  4865. @patch.dict(
  4866. pagure.config.config,
  4867. {"ALLOWED_EMAIL_DOMAINS": ["pingoured.fr"]},
  4868. clear=True,
  4869. )
  4870. def test_add_user_pending_email(self):
  4871. """ Test the add_user_pending_email function of pagure.lib.query. """
  4872. user = pagure.lib.query.search_user(self.session, username="pingou")
  4873. self.assertEqual(len(user.emails), 2)
  4874. self.assertEqual(len(user.emails_pending), 0)
  4875. pagure.lib.query.add_user_pending_email(
  4876. session=self.session, userobj=user, email="new_mail@pingoured.fr"
  4877. )
  4878. self.session.commit()
  4879. self.assertEqual(len(user.emails), 2)
  4880. self.assertEqual(len(user.emails_pending), 1)
  4881. # add another email address that is not allowed
  4882. self.assertRaises(
  4883. pagure.exceptions.PagureException,
  4884. pagure.lib.query.add_user_pending_email,
  4885. session=self.session,
  4886. userobj=user,
  4887. email="foo@bar.com",
  4888. )
  4889. self.session.commit()
  4890. self.assertEqual(len(user.emails), 2)
  4891. self.assertEqual(len(user.emails_pending), 1)
  4892. def test_resend_pending_email_someone_else_email(self):
  4893. """ Test the resend_pending_email function of pagure.lib.query. """
  4894. user = pagure.lib.query.search_user(self.session, username="pingou")
  4895. self.assertRaises(
  4896. pagure.exceptions.PagureException,
  4897. pagure.lib.query.resend_pending_email,
  4898. session=self.session,
  4899. userobj=user,
  4900. email="foo@bar.com",
  4901. )
  4902. def test_resend_pending_email_email_validated(self):
  4903. """ Test the resend_pending_email function of pagure.lib.query. """
  4904. user = pagure.lib.query.search_user(self.session, username="pingou")
  4905. self.assertRaises(
  4906. pagure.exceptions.PagureException,
  4907. pagure.lib.query.resend_pending_email,
  4908. session=self.session,
  4909. userobj=user,
  4910. email="foo@pingou.com",
  4911. )
  4912. def test_get_acls(self):
  4913. """ Test the get_acls function of pagure.lib.query. """
  4914. acls = pagure.lib.query.get_acls(self.session)
  4915. self.assertEqual(
  4916. sorted([a.name for a in acls]),
  4917. [
  4918. "commit",
  4919. "commit_flag",
  4920. "create_branch",
  4921. "create_git_alias",
  4922. "create_project",
  4923. "delete_git_alias",
  4924. "fork_project",
  4925. "generate_acls_project",
  4926. "internal_access",
  4927. "issue_assign",
  4928. "issue_change_status",
  4929. "issue_comment",
  4930. "issue_create",
  4931. "issue_subscribe",
  4932. "issue_update",
  4933. "issue_update_custom_fields",
  4934. "issue_update_milestone",
  4935. "modify_git_alias",
  4936. "modify_project",
  4937. "pull_request_assign",
  4938. "pull_request_close",
  4939. "pull_request_comment",
  4940. "pull_request_create",
  4941. "pull_request_flag",
  4942. "pull_request_merge",
  4943. "pull_request_rebase",
  4944. "pull_request_subscribe",
  4945. "pull_request_update",
  4946. "tag_project",
  4947. "update_watch_status",
  4948. ],
  4949. )
  4950. def test_get_acls_restrict_one(self):
  4951. """ Test the get_acls function of pagure.lib.query. """
  4952. acls = pagure.lib.query.get_acls(
  4953. self.session, restrict="create_project"
  4954. )
  4955. self.assertEqual([a.name for a in acls], ["create_project"])
  4956. def test_get_acls_restrict_two(self):
  4957. """ Test the get_acls function of pagure.lib.query. """
  4958. acls = pagure.lib.query.get_acls(
  4959. self.session, restrict=["create_project", "issue_create"]
  4960. )
  4961. self.assertEqual(
  4962. [a.name for a in acls], ["create_project", "issue_create"]
  4963. )
  4964. def test_filter_img_src(self):
  4965. """ Test the filter_img_src function of pagure.lib.query. """
  4966. for name in ("alt", "height", "width", "class"):
  4967. self.assertTrue(pagure.lib.query.filter_img_src(name, "caption"))
  4968. self.assertTrue(
  4969. pagure.lib.query.filter_img_src("src", "/path/to/image")
  4970. )
  4971. self.assertTrue(
  4972. pagure.lib.query.filter_img_src(
  4973. "src", "http://localhost.localdomain/path/to/image"
  4974. )
  4975. )
  4976. self.assertFalse(
  4977. pagure.lib.query.filter_img_src(
  4978. "src", "http://foo.org/path/to/image"
  4979. )
  4980. )
  4981. self.assertFalse(
  4982. pagure.lib.query.filter_img_src(
  4983. "anything", "http://foo.org/path/to/image"
  4984. )
  4985. )
  4986. def test_clean_input(self):
  4987. """ Test the clean_input function of pagure.lib.query. """
  4988. text = '<a href="/path" title="click me!">Click here</a>'
  4989. output = pagure.lib.query.clean_input(text)
  4990. self.assertEqual(output, text)
  4991. def test_could_be_text(self):
  4992. """ Test the could_be_text function of pagure.lib.query. """
  4993. self.assertTrue(pagure.lib.query.could_be_text(b"foo"))
  4994. self.assertTrue(pagure.lib.query.could_be_text("fâö".encode("utf-8")))
  4995. self.assertFalse(
  4996. pagure.lib.query.could_be_text(b"\x89PNG\r\n\x1a\n\x00")
  4997. )
  4998. def test_set_custom_key_fields_empty(self):
  4999. """ Test the set_custom_key_fields function of pagure.lib.query. """
  5000. tests.create_projects(self.session)
  5001. project = pagure.lib.query._get_project(self.session, "test")
  5002. self.assertIsNotNone(project)
  5003. msg = pagure.lib.query.set_custom_key_fields(
  5004. session=self.session,
  5005. project=project,
  5006. fields=[],
  5007. types=[],
  5008. data=[],
  5009. notify=False,
  5010. )
  5011. self.session.commit()
  5012. self.assertEqual(msg, "List of custom fields updated")
  5013. def test_set_custom_key_fields(self):
  5014. """ Test the set_custom_key_fields function of pagure.lib.query. """
  5015. tests.create_projects(self.session)
  5016. project = pagure.lib.query._get_project(self.session, "test")
  5017. self.assertIsNotNone(project)
  5018. # Set a custom key
  5019. msg = pagure.lib.query.set_custom_key_fields(
  5020. session=self.session,
  5021. project=project,
  5022. fields=["upstream"],
  5023. types=["url"],
  5024. data=[None],
  5025. notify=False,
  5026. )
  5027. self.session.commit()
  5028. self.assertEqual(msg, "List of custom fields updated")
  5029. # Set another one, with notifications on
  5030. msg = pagure.lib.query.set_custom_key_fields(
  5031. session=self.session,
  5032. project=project,
  5033. fields=["bugzilla_url"],
  5034. types=["url"],
  5035. data=[None],
  5036. notify=["on"],
  5037. )
  5038. self.session.commit()
  5039. self.assertEqual(msg, "List of custom fields updated")
  5040. # Re-set the second one but with notifications off
  5041. msg = pagure.lib.query.set_custom_key_fields(
  5042. session=self.session,
  5043. project=project,
  5044. fields=["bugzilla_url"],
  5045. types=["url"],
  5046. data=[None],
  5047. notify=["off"],
  5048. )
  5049. self.session.commit()
  5050. self.assertEqual(msg, "List of custom fields updated")
  5051. @patch("pagure.lib.query.REDIS")
  5052. def test_set_custom_key_value_boolean(self, mock_redis):
  5053. """ Test the set_custom_key_value function of pagure.lib.query. """
  5054. mock_redis.return_value = True
  5055. tests.create_projects(self.session)
  5056. project = pagure.lib.query._get_project(self.session, "test")
  5057. self.assertIsNotNone(project)
  5058. # Set a custom key
  5059. msg = pagure.lib.query.set_custom_key_fields(
  5060. session=self.session,
  5061. project=project,
  5062. fields=["tested"],
  5063. types=["boolean"],
  5064. data=[None],
  5065. notify=False,
  5066. )
  5067. self.session.commit()
  5068. self.assertEqual(msg, "List of custom fields updated")
  5069. # Create issues
  5070. msg = pagure.lib.query.new_issue(
  5071. session=self.session,
  5072. repo=project,
  5073. title="Test issue",
  5074. content="We should work on this",
  5075. user="pingou",
  5076. )
  5077. self.session.commit()
  5078. self.assertEqual(msg.title, "Test issue")
  5079. issue = pagure.lib.query.search_issues(
  5080. self.session, project, issueid=1
  5081. )
  5082. self.assertEqual(len(project.issue_keys), 1)
  5083. self.assertEqual(project.issue_keys[0].key_type, "boolean")
  5084. msg = pagure.lib.query.set_custom_key_value(
  5085. session=self.session,
  5086. issue=issue,
  5087. key=project.issue_keys[0],
  5088. value=True,
  5089. )
  5090. self.session.commit()
  5091. self.assertEqual(msg, "Custom field tested adjusted to True")
  5092. # Update it a second time to trigger edit
  5093. msg = pagure.lib.query.set_custom_key_value(
  5094. session=self.session,
  5095. issue=issue,
  5096. key=project.issue_keys[0],
  5097. value=False,
  5098. )
  5099. if str(self.session.bind.engine.url).startswith("sqlite"):
  5100. self.assertEqual(msg, "Custom field tested reset (from 1)")
  5101. else:
  5102. self.assertEqual(msg, "Custom field tested reset (from true)")
  5103. self.assertEqual(mock_redis.publish.call_count, 2)
  5104. @patch("pagure.lib.query.REDIS")
  5105. def test_set_custom_key_value_boolean_private_issue(self, mock_redis):
  5106. """ Test the set_custom_key_value function of pagure.lib.query. """
  5107. mock_redis.return_value = True
  5108. tests.create_projects(self.session)
  5109. project = pagure.lib.query._get_project(self.session, "test")
  5110. self.assertIsNotNone(project)
  5111. # Set a custom key
  5112. msg = pagure.lib.query.set_custom_key_fields(
  5113. session=self.session,
  5114. project=project,
  5115. fields=["tested"],
  5116. types=["boolean"],
  5117. data=[None],
  5118. notify=False,
  5119. )
  5120. self.session.commit()
  5121. self.assertEqual(msg, "List of custom fields updated")
  5122. # Create issues
  5123. msg = pagure.lib.query.new_issue(
  5124. session=self.session,
  5125. repo=project,
  5126. title="Test issue",
  5127. content="We should work on this",
  5128. user="pingou",
  5129. private=True,
  5130. )
  5131. self.session.commit()
  5132. self.assertEqual(msg.title, "Test issue")
  5133. issue = pagure.lib.query.search_issues(
  5134. self.session, project, issueid=1
  5135. )
  5136. self.assertEqual(len(project.issue_keys), 1)
  5137. self.assertEqual(project.issue_keys[0].key_type, "boolean")
  5138. msg = pagure.lib.query.set_custom_key_value(
  5139. session=self.session,
  5140. issue=issue,
  5141. key=project.issue_keys[0],
  5142. value=True,
  5143. )
  5144. self.session.commit()
  5145. self.assertEqual(msg, "Custom field tested adjusted to True")
  5146. # Update it a second time to trigger edit
  5147. msg = pagure.lib.query.set_custom_key_value(
  5148. session=self.session,
  5149. issue=issue,
  5150. key=project.issue_keys[0],
  5151. value=False,
  5152. )
  5153. self.session.commit()
  5154. if str(self.session.bind.engine.url).startswith("sqlite"):
  5155. self.assertEqual(msg, "Custom field tested reset (from 1)")
  5156. else:
  5157. self.assertEqual(msg, "Custom field tested reset (from true)")
  5158. self.assertEqual(mock_redis.publish.call_count, 2)
  5159. @patch("pagure.lib.query.REDIS")
  5160. def test_set_custom_key_value_text(self, mock_redis):
  5161. """ Test the set_custom_key_value function of pagure.lib.query. """
  5162. mock_redis.return_value = True
  5163. tests.create_projects(self.session)
  5164. project = pagure.lib.query._get_project(self.session, "test")
  5165. self.assertIsNotNone(project)
  5166. # Set a custom key
  5167. msg = pagure.lib.query.set_custom_key_fields(
  5168. session=self.session,
  5169. project=project,
  5170. fields=["tested"],
  5171. types=["text"],
  5172. data=[None],
  5173. notify=False,
  5174. )
  5175. self.session.commit()
  5176. self.assertEqual(msg, "List of custom fields updated")
  5177. # Create issues
  5178. msg = pagure.lib.query.new_issue(
  5179. session=self.session,
  5180. repo=project,
  5181. title="Test issue",
  5182. content="We should work on this",
  5183. user="pingou",
  5184. )
  5185. self.session.commit()
  5186. self.assertEqual(msg.title, "Test issue")
  5187. issue = pagure.lib.query.search_issues(
  5188. self.session, project, issueid=1
  5189. )
  5190. self.assertEqual(len(project.issue_keys), 1)
  5191. self.assertEqual(project.issue_keys[0].key_type, "text")
  5192. msg = pagure.lib.query.set_custom_key_value(
  5193. session=self.session,
  5194. issue=issue,
  5195. key=project.issue_keys[0],
  5196. value="In progress",
  5197. )
  5198. self.session.commit()
  5199. self.assertEqual(msg, "Custom field tested adjusted to In progress")
  5200. # Update it a second time to trigger edit
  5201. msg = pagure.lib.query.set_custom_key_value(
  5202. session=self.session,
  5203. issue=issue,
  5204. key=project.issue_keys[0],
  5205. value="Done",
  5206. )
  5207. self.assertEqual(
  5208. msg, "Custom field tested adjusted to Done (was: In progress)"
  5209. )
  5210. self.assertEqual(mock_redis.publish.call_count, 2)
  5211. def test_log_action_invalid(self):
  5212. """ Test the log_action function of pagure.lib.query. """
  5213. obj = MagicMock
  5214. obj.isa = "invalid"
  5215. self.assertRaises(
  5216. pagure.exceptions.InvalidObjectException,
  5217. pagure.lib.query.log_action,
  5218. session=self.session,
  5219. action="foo",
  5220. obj=obj,
  5221. user_obj=None,
  5222. )
  5223. def test_search_token_no_acls(self):
  5224. """ Test the search_token function of pagure.lib.query. """
  5225. tests.create_projects(self.session)
  5226. tests.create_tokens(self.session)
  5227. tests.create_tokens_acl(self.session)
  5228. out = pagure.lib.query.search_token(self.session, [])
  5229. self.assertEqual(len(out), 1)
  5230. self.assertEqual(out[0].id, "aaabbbcccddd")
  5231. def test_search_token_single_acls(self):
  5232. """ Test the search_token function of pagure.lib.query. """
  5233. tests.create_projects(self.session)
  5234. tests.create_tokens(self.session)
  5235. tests.create_tokens_acl(self.session)
  5236. out = pagure.lib.query.search_token(self.session, "issue_create")
  5237. self.assertEqual(len(out), 1)
  5238. self.assertEqual(out[0].id, "aaabbbcccddd")
  5239. def test_search_token_single_acls_user(self):
  5240. """ Test the search_token function of pagure.lib.query. """
  5241. tests.create_projects(self.session)
  5242. tests.create_tokens(self.session)
  5243. tests.create_tokens_acl(self.session)
  5244. out = pagure.lib.query.search_token(
  5245. self.session, "issue_create", user="pingou"
  5246. )
  5247. self.assertEqual(len(out), 1)
  5248. self.assertEqual(out[0].id, "aaabbbcccddd")
  5249. out = pagure.lib.query.search_token(
  5250. self.session, "issue_create", user="foo"
  5251. )
  5252. self.assertEqual(len(out), 0)
  5253. def test_search_token_single_acls_active(self):
  5254. """ Test the search_token function of pagure.lib.query. """
  5255. tests.create_projects(self.session)
  5256. tests.create_tokens(self.session)
  5257. tests.create_tokens_acl(self.session)
  5258. out = pagure.lib.query.search_token(
  5259. self.session, "issue_create", active=True
  5260. )
  5261. self.assertEqual(len(out), 1)
  5262. self.assertEqual(out[0].id, "aaabbbcccddd")
  5263. out = pagure.lib.query.search_token(
  5264. self.session, "issue_create", expired=True
  5265. )
  5266. self.assertEqual(len(out), 0)
  5267. def test_update_read_only_mode(self):
  5268. """ Test the update_read_only_mode method of pagure.lib """
  5269. tests.create_projects(self.session)
  5270. project_obj = pagure.lib.query._get_project(self.session, "test")
  5271. # Default mode of project is read only
  5272. self.assertEqual(project_obj.read_only, True)
  5273. # Remove read only
  5274. pagure.lib.query.update_read_only_mode(
  5275. self.session, project_obj, False
  5276. )
  5277. self.session.commit()
  5278. project_obj = pagure.lib.query._get_project(self.session, "test")
  5279. self.assertEqual(project_obj.read_only, False)
  5280. # Try reversing it back
  5281. pagure.lib.query.update_read_only_mode(self.session, project_obj, True)
  5282. self.session.commit()
  5283. project_obj = pagure.lib.query._get_project(self.session, "test")
  5284. self.assertEqual(project_obj.read_only, True)
  5285. if __name__ == "__main__":
  5286. unittest.main(verbosity=2)