NewsfeedPlugin.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. import time
  2. import re
  3. from Plugin import PluginManager
  4. from Db import DbQuery
  5. from Debug import Debug
  6. from util import helper
  7. @PluginManager.registerTo("UiWebsocket")
  8. class UiWebsocketPlugin(object):
  9. def formatSiteInfo(self, site, create_user=True):
  10. site_info = super(UiWebsocketPlugin, self).formatSiteInfo(site, create_user=create_user)
  11. feed_following = self.user.sites.get(site.address, {}).get("follow", None)
  12. if feed_following == None:
  13. site_info["feed_follow_num"] = None
  14. else:
  15. site_info["feed_follow_num"] = len(feed_following)
  16. return site_info
  17. def actionFeedFollow(self, to, feeds):
  18. self.user.setFeedFollow(self.site.address, feeds)
  19. self.user.save()
  20. self.response(to, "ok")
  21. def actionFeedListFollow(self, to):
  22. feeds = self.user.sites[self.site.address].get("follow", {})
  23. self.response(to, feeds)
  24. def actionFeedQuery(self, to, limit=10, day_limit=3):
  25. if "ADMIN" not in self.site.settings["permissions"]:
  26. return self.response(to, "FeedQuery not allowed")
  27. from Site import SiteManager
  28. rows = []
  29. stats = []
  30. total_s = time.time()
  31. num_sites = 0
  32. for address, site_data in self.user.sites.items():
  33. feeds = site_data.get("follow")
  34. if not feeds:
  35. continue
  36. if type(feeds) is not dict:
  37. self.log.debug("Invalid feed for site %s" % address)
  38. continue
  39. num_sites += 1
  40. for name, query_set in feeds.iteritems():
  41. site = SiteManager.site_manager.get(address)
  42. if not site or not site.storage.has_db:
  43. continue
  44. s = time.time()
  45. try:
  46. query_raw, params = query_set
  47. query_parts = re.split(r"UNION(?:\s+ALL|)", query_raw)
  48. for i, query_part in enumerate(query_parts):
  49. db_query = DbQuery(query_part)
  50. if day_limit:
  51. where = " WHERE %s > strftime('%%s', 'now', '-%s day')" % (db_query.fields.get("date_added", "date_added"), day_limit)
  52. if "WHERE" in query_part:
  53. query_part = re.sub("WHERE (.*?)(?=$| GROUP BY)", where+" AND (\\1)", query_part)
  54. else:
  55. query_part += where
  56. query_parts[i] = query_part
  57. query = " UNION ".join(query_parts)
  58. if ":params" in query:
  59. query_params = map(helper.sqlquote, params)
  60. query = query.replace(":params", ",".join(query_params))
  61. res = site.storage.query(query + " ORDER BY date_added DESC LIMIT %s" % limit)
  62. except Exception as err: # Log error
  63. self.log.error("%s feed query %s error: %s" % (address, name, Debug.formatException(err)))
  64. stats.append({"site": site.address, "feed_name": name, "error": str(err)})
  65. continue
  66. for row in res:
  67. row = dict(row)
  68. if not isinstance(row["date_added"], (int, long, float, complex)):
  69. self.log.debug("Invalid date_added from site %s: %r" % (address, row["date_added"]))
  70. continue
  71. if row["date_added"] > 1000000000000: # Formatted as millseconds
  72. row["date_added"] = row["date_added"] / 1000
  73. if "date_added" not in row or row["date_added"] > time.time() + 120:
  74. self.log.debug("Newsfeed item from the future from from site %s" % address)
  75. continue # Feed item is in the future, skip it
  76. row["site"] = address
  77. row["feed_name"] = name
  78. rows.append(row)
  79. stats.append({"site": site.address, "feed_name": name, "taken": round(time.time() - s, 3)})
  80. time.sleep(0.0001)
  81. return self.response(to, {"rows": rows, "stats": stats, "num": len(rows), "sites": num_sites, "taken": round(time.time() - total_s, 3)})
  82. def parseSearch(self, search):
  83. parts = re.split("(site|type):", search)
  84. if len(parts) > 1: # Found filter
  85. search_text = parts[0]
  86. parts = [part.strip() for part in parts]
  87. filters = dict(zip(parts[1::2], parts[2::2]))
  88. else:
  89. search_text = search
  90. filters = {}
  91. return [search_text, filters]
  92. def actionFeedSearch(self, to, search, limit=30, day_limit=30):
  93. if "ADMIN" not in self.site.settings["permissions"]:
  94. return self.response(to, "FeedSearch not allowed")
  95. from Site import SiteManager
  96. rows = []
  97. stats = []
  98. num_sites = 0
  99. total_s = time.time()
  100. search_text, filters = self.parseSearch(search)
  101. for address, site in SiteManager.site_manager.list().iteritems():
  102. if not site.storage.has_db:
  103. continue
  104. if "site" in filters:
  105. if filters["site"].lower() not in [site.address, site.content_manager.contents["content.json"].get("title").lower()]:
  106. continue
  107. if site.storage.db: # Database loaded
  108. feeds = site.storage.db.schema.get("feeds")
  109. else:
  110. try:
  111. feeds = site.storage.loadJson("dbschema.json").get("feeds")
  112. except:
  113. continue
  114. if not feeds:
  115. continue
  116. num_sites += 1
  117. for name, query in feeds.iteritems():
  118. s = time.time()
  119. try:
  120. db_query = DbQuery(query)
  121. params = []
  122. # Filters
  123. if search_text:
  124. db_query.wheres.append("(%s LIKE ? OR %s LIKE ?)" % (db_query.fields["body"], db_query.fields["title"]))
  125. search_like = "%" + search_text.replace(" ", "%") + "%"
  126. params.append(search_like)
  127. params.append(search_like)
  128. if filters.get("type") and filters["type"] not in query:
  129. continue
  130. if day_limit:
  131. db_query.wheres.append(
  132. "%s > strftime('%%s', 'now', '-%s day')" % (db_query.fields.get("date_added", "date_added"), day_limit)
  133. )
  134. # Order
  135. db_query.parts["ORDER BY"] = "date_added DESC"
  136. db_query.parts["LIMIT"] = str(limit)
  137. res = site.storage.query(str(db_query), params)
  138. except Exception, err:
  139. self.log.error("%s feed query %s error: %s" % (address, name, Debug.formatException(err)))
  140. stats.append({"site": site.address, "feed_name": name, "error": str(err), "query": query})
  141. continue
  142. for row in res:
  143. row = dict(row)
  144. if row["date_added"] > time.time() + 120:
  145. continue # Feed item is in the future, skip it
  146. row["site"] = address
  147. row["feed_name"] = name
  148. rows.append(row)
  149. stats.append({"site": site.address, "feed_name": name, "taken": round(time.time() - s, 3)})
  150. return self.response(to, {"rows": rows, "num": len(rows), "sites": num_sites, "taken": round(time.time() - total_s, 3), "stats": stats})
  151. @PluginManager.registerTo("User")
  152. class UserPlugin(object):
  153. # Set queries that user follows
  154. def setFeedFollow(self, address, feeds):
  155. site_data = self.getSiteData(address)
  156. site_data["follow"] = feeds
  157. self.save()
  158. return site_data