index.js 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335
  1. // @ts-check
  2. const os = require('os');
  3. const throng = require('throng');
  4. const dotenv = require('dotenv');
  5. const express = require('express');
  6. const http = require('http');
  7. const redis = require('redis');
  8. const pg = require('pg');
  9. const log = require('npmlog');
  10. const url = require('url');
  11. const uuid = require('uuid');
  12. const fs = require('fs');
  13. const WebSocket = require('ws');
  14. const { JSDOM } = require('jsdom');
  15. const env = process.env.NODE_ENV || 'development';
  16. const alwaysRequireAuth = process.env.LIMITED_FEDERATION_MODE === 'true' || process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true';
  17. dotenv.config({
  18. path: env === 'production' ? '.env.production' : '.env',
  19. });
  20. log.level = process.env.LOG_LEVEL || 'verbose';
  21. /**
  22. * @param {string} dbUrl
  23. * @return {Object.<string, any>}
  24. */
  25. const dbUrlToConfig = (dbUrl) => {
  26. if (!dbUrl) {
  27. return {};
  28. }
  29. const params = url.parse(dbUrl, true);
  30. const config = {};
  31. if (params.auth) {
  32. [config.user, config.password] = params.auth.split(':');
  33. }
  34. if (params.hostname) {
  35. config.host = params.hostname;
  36. }
  37. if (params.port) {
  38. config.port = params.port;
  39. }
  40. if (params.pathname) {
  41. config.database = params.pathname.split('/')[1];
  42. }
  43. const ssl = params.query && params.query.ssl;
  44. if (ssl && ssl === 'true' || ssl === '1') {
  45. config.ssl = true;
  46. }
  47. return config;
  48. };
  49. /**
  50. * @param {Object.<string, any>} defaultConfig
  51. * @param {string} redisUrl
  52. */
  53. const redisUrlToClient = async (defaultConfig, redisUrl) => {
  54. const config = defaultConfig;
  55. let client;
  56. if (!redisUrl) {
  57. client = redis.createClient(config);
  58. } else if (redisUrl.startsWith('unix://')) {
  59. client = redis.createClient(Object.assign(config, {
  60. socket: {
  61. path: redisUrl.slice(7),
  62. },
  63. }));
  64. } else {
  65. client = redis.createClient(Object.assign(config, {
  66. url: redisUrl,
  67. }));
  68. }
  69. client.on('error', (err) => log.error('Redis Client Error!', err));
  70. await client.connect();
  71. return client;
  72. };
  73. const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
  74. /**
  75. * Attempts to safely parse a string as JSON, used when both receiving a message
  76. * from redis and when receiving a message from a client over a websocket
  77. * connection, this is why it accepts a `req` argument.
  78. * @param {string} json
  79. * @param {any?} req
  80. * @returns {Object.<string, any>|null}
  81. */
  82. const parseJSON = (json, req) => {
  83. try {
  84. return JSON.parse(json);
  85. } catch (err) {
  86. /* FIXME: This logging isn't great, and should probably be done at the
  87. * call-site of parseJSON, not in the method, but this would require changing
  88. * the signature of parseJSON to return something akin to a Result type:
  89. * [Error|null, null|Object<string,any}], and then handling the error
  90. * scenarios.
  91. */
  92. if (req) {
  93. if (req.accountId) {
  94. log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`);
  95. } else {
  96. log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`);
  97. }
  98. } else {
  99. log.warn(`Error parsing message from redis: ${err}`);
  100. }
  101. return null;
  102. }
  103. };
  104. const startMaster = () => {
  105. if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
  106. log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
  107. }
  108. log.warn(`Starting streaming API server master with ${numWorkers} workers`);
  109. };
  110. const startWorker = async (workerId) => {
  111. log.warn(`Starting worker ${workerId}`);
  112. const pgConfigs = {
  113. development: {
  114. user: process.env.DB_USER || pg.defaults.user,
  115. password: process.env.DB_PASS || pg.defaults.password,
  116. database: process.env.DB_NAME || 'mastodon_development',
  117. host: process.env.DB_HOST || pg.defaults.host,
  118. port: process.env.DB_PORT || pg.defaults.port,
  119. },
  120. production: {
  121. user: process.env.DB_USER || 'mastodon',
  122. password: process.env.DB_PASS || '',
  123. database: process.env.DB_NAME || 'mastodon_production',
  124. host: process.env.DB_HOST || 'localhost',
  125. port: process.env.DB_PORT || 5432,
  126. },
  127. };
  128. const app = express();
  129. app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal');
  130. const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL), {
  131. max: process.env.DB_POOL || 10,
  132. connectionTimeoutMillis: 15000,
  133. ssl: !!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable',
  134. }));
  135. const server = http.createServer(app);
  136. const redisNamespace = process.env.REDIS_NAMESPACE || null;
  137. const redisParams = {
  138. socket: {
  139. host: process.env.REDIS_HOST || '127.0.0.1',
  140. port: process.env.REDIS_PORT || 6379,
  141. },
  142. database: process.env.REDIS_DB || 0,
  143. password: process.env.REDIS_PASSWORD || undefined,
  144. };
  145. if (redisNamespace) {
  146. redisParams.namespace = redisNamespace;
  147. }
  148. const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
  149. /**
  150. * @type {Object.<string, Array.<function(Object<string, any>): void>>}
  151. */
  152. const subs = {};
  153. const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
  154. const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
  155. /**
  156. * @param {string[]} channels
  157. * @return {function(): void}
  158. */
  159. const subscriptionHeartbeat = channels => {
  160. const interval = 6 * 60;
  161. const tellSubscribed = () => {
  162. channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3));
  163. };
  164. tellSubscribed();
  165. const heartbeat = setInterval(tellSubscribed, interval * 1000);
  166. return () => {
  167. clearInterval(heartbeat);
  168. };
  169. };
  170. /**
  171. * @param {string} message
  172. * @param {string} channel
  173. */
  174. const onRedisMessage = (message, channel) => {
  175. const callbacks = subs[channel];
  176. log.silly(`New message on channel ${channel}`);
  177. if (!callbacks) {
  178. return;
  179. }
  180. const json = parseJSON(message, null);
  181. if (!json) return;
  182. callbacks.forEach(callback => callback(json));
  183. };
  184. /**
  185. * @param {string} channel
  186. * @param {function(string): void} callback
  187. */
  188. const subscribe = (channel, callback) => {
  189. log.silly(`Adding listener for ${channel}`);
  190. subs[channel] = subs[channel] || [];
  191. if (subs[channel].length === 0) {
  192. log.verbose(`Subscribe ${channel}`);
  193. redisSubscribeClient.subscribe(channel, onRedisMessage);
  194. }
  195. subs[channel].push(callback);
  196. };
  197. /**
  198. * @param {string} channel
  199. * @param {function(Object<string, any>): void} callback
  200. */
  201. const unsubscribe = (channel, callback) => {
  202. log.silly(`Removing listener for ${channel}`);
  203. if (!subs[channel]) {
  204. return;
  205. }
  206. subs[channel] = subs[channel].filter(item => item !== callback);
  207. if (subs[channel].length === 0) {
  208. log.verbose(`Unsubscribe ${channel}`);
  209. redisSubscribeClient.unsubscribe(channel);
  210. delete subs[channel];
  211. }
  212. };
  213. const FALSE_VALUES = [
  214. false,
  215. 0,
  216. '0',
  217. 'f',
  218. 'F',
  219. 'false',
  220. 'FALSE',
  221. 'off',
  222. 'OFF',
  223. ];
  224. /**
  225. * @param {any} value
  226. * @return {boolean}
  227. */
  228. const isTruthy = value =>
  229. value && !FALSE_VALUES.includes(value);
  230. /**
  231. * @param {any} req
  232. * @param {any} res
  233. * @param {function(Error=): void}
  234. */
  235. const allowCrossDomain = (req, res, next) => {
  236. res.header('Access-Control-Allow-Origin', '*');
  237. res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control');
  238. res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
  239. next();
  240. };
  241. /**
  242. * @param {any} req
  243. * @param {any} res
  244. * @param {function(Error=): void}
  245. */
  246. const setRequestId = (req, res, next) => {
  247. req.requestId = uuid.v4();
  248. res.header('X-Request-Id', req.requestId);
  249. next();
  250. };
  251. /**
  252. * @param {any} req
  253. * @param {any} res
  254. * @param {function(Error=): void}
  255. */
  256. const setRemoteAddress = (req, res, next) => {
  257. req.remoteAddress = req.connection.remoteAddress;
  258. next();
  259. };
  260. /**
  261. * @param {any} req
  262. * @param {string[]} necessaryScopes
  263. * @return {boolean}
  264. */
  265. const isInScope = (req, necessaryScopes) =>
  266. req.scopes.some(scope => necessaryScopes.includes(scope));
  267. /**
  268. * @param {string} token
  269. * @param {any} req
  270. * @return {Promise.<void>}
  271. */
  272. const accountFromToken = (token, req) => new Promise((resolve, reject) => {
  273. pgPool.connect((err, client, done) => {
  274. if (err) {
  275. reject(err);
  276. return;
  277. }
  278. client.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
  279. done();
  280. if (err) {
  281. reject(err);
  282. return;
  283. }
  284. if (result.rows.length === 0) {
  285. err = new Error('Invalid access token');
  286. err.status = 401;
  287. reject(err);
  288. return;
  289. }
  290. req.accessTokenId = result.rows[0].id;
  291. req.scopes = result.rows[0].scopes.split(' ');
  292. req.accountId = result.rows[0].account_id;
  293. req.chosenLanguages = result.rows[0].chosen_languages;
  294. req.deviceId = result.rows[0].device_id;
  295. resolve();
  296. });
  297. });
  298. });
  299. /**
  300. * @param {any} req
  301. * @param {boolean=} required
  302. * @return {Promise.<void>}
  303. */
  304. const accountFromRequest = (req, required = true) => new Promise((resolve, reject) => {
  305. const authorization = req.headers.authorization;
  306. const location = url.parse(req.url, true);
  307. const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
  308. if (!authorization && !accessToken) {
  309. if (required) {
  310. const err = new Error('Missing access token');
  311. err.status = 401;
  312. reject(err);
  313. return;
  314. } else {
  315. resolve();
  316. return;
  317. }
  318. }
  319. const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
  320. resolve(accountFromToken(token, req));
  321. });
  322. /**
  323. * @param {any} req
  324. * @returns {string|undefined}
  325. */
  326. const channelNameFromPath = req => {
  327. const { path, query } = req;
  328. const onlyMedia = isTruthy(query.only_media);
  329. switch (path) {
  330. case '/api/v1/streaming/user':
  331. return 'user';
  332. case '/api/v1/streaming/user/notification':
  333. return 'user:notification';
  334. case '/api/v1/streaming/public':
  335. return onlyMedia ? 'public:media' : 'public';
  336. case '/api/v1/streaming/public/local':
  337. return onlyMedia ? 'public:local:media' : 'public:local';
  338. case '/api/v1/streaming/public/remote':
  339. return onlyMedia ? 'public:remote:media' : 'public:remote';
  340. case '/api/v1/streaming/hashtag':
  341. return 'hashtag';
  342. case '/api/v1/streaming/hashtag/local':
  343. return 'hashtag:local';
  344. case '/api/v1/streaming/direct':
  345. return 'direct';
  346. case '/api/v1/streaming/list':
  347. return 'list';
  348. default:
  349. return undefined;
  350. }
  351. };
  352. const PUBLIC_CHANNELS = [
  353. 'public',
  354. 'public:media',
  355. 'public:local',
  356. 'public:local:media',
  357. 'public:remote',
  358. 'public:remote:media',
  359. 'hashtag',
  360. 'hashtag:local',
  361. ];
  362. /**
  363. * @param {any} req
  364. * @param {string} channelName
  365. * @return {Promise.<void>}
  366. */
  367. const checkScopes = (req, channelName) => new Promise((resolve, reject) => {
  368. log.silly(req.requestId, `Checking OAuth scopes for ${channelName}`);
  369. // When accessing public channels, no scopes are needed
  370. if (PUBLIC_CHANNELS.includes(channelName)) {
  371. resolve();
  372. return;
  373. }
  374. // The `read` scope has the highest priority, if the token has it
  375. // then it can access all streams
  376. const requiredScopes = ['read'];
  377. // When accessing specifically the notifications stream,
  378. // we need a read:notifications, while in all other cases,
  379. // we can allow access with read:statuses. Mind that the
  380. // user stream will not contain notifications unless
  381. // the token has either read or read:notifications scope
  382. // as well, this is handled separately.
  383. if (channelName === 'user:notification') {
  384. requiredScopes.push('read:notifications');
  385. } else {
  386. requiredScopes.push('read:statuses');
  387. }
  388. if (req.scopes && requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) {
  389. resolve();
  390. return;
  391. }
  392. const err = new Error('Access token does not cover required scopes');
  393. err.status = 401;
  394. reject(err);
  395. });
  396. /**
  397. * @param {any} info
  398. * @param {function(boolean, number, string): void} callback
  399. */
  400. const wsVerifyClient = (info, callback) => {
  401. // When verifying the websockets connection, we no longer pre-emptively
  402. // check OAuth scopes and drop the connection if they're missing. We only
  403. // drop the connection if access without token is not allowed by environment
  404. // variables. OAuth scope checks are moved to the point of subscription
  405. // to a specific stream.
  406. accountFromRequest(info.req, alwaysRequireAuth).then(() => {
  407. callback(true, undefined, undefined);
  408. }).catch(err => {
  409. log.error(info.req.requestId, err.toString());
  410. callback(false, 401, 'Unauthorized');
  411. });
  412. };
  413. /**
  414. * @typedef SystemMessageHandlers
  415. * @property {function(): void} onKill
  416. */
  417. /**
  418. * @param {any} req
  419. * @param {SystemMessageHandlers} eventHandlers
  420. * @returns {function(object): void}
  421. */
  422. const createSystemMessageListener = (req, eventHandlers) => {
  423. return message => {
  424. const { event } = message;
  425. log.silly(req.requestId, `System message for ${req.accountId}: ${event}`);
  426. if (event === 'kill') {
  427. log.verbose(req.requestId, `Closing connection for ${req.accountId} due to expired access token`);
  428. eventHandlers.onKill();
  429. } else if (event === 'filters_changed') {
  430. log.verbose(req.requestId, `Invalidating filters cache for ${req.accountId}`);
  431. req.cachedFilters = null;
  432. }
  433. };
  434. };
  435. /**
  436. * @param {any} req
  437. * @param {any} res
  438. */
  439. const subscribeHttpToSystemChannel = (req, res) => {
  440. const accessTokenChannelId = `timeline:access_token:${req.accessTokenId}`;
  441. const systemChannelId = `timeline:system:${req.accountId}`;
  442. const listener = createSystemMessageListener(req, {
  443. onKill() {
  444. res.end();
  445. },
  446. });
  447. res.on('close', () => {
  448. unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  449. unsubscribe(`${redisPrefix}${systemChannelId}`, listener);
  450. });
  451. subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  452. subscribe(`${redisPrefix}${systemChannelId}`, listener);
  453. };
  454. /**
  455. * @param {any} req
  456. * @param {any} res
  457. * @param {function(Error=): void} next
  458. */
  459. const authenticationMiddleware = (req, res, next) => {
  460. if (req.method === 'OPTIONS') {
  461. next();
  462. return;
  463. }
  464. accountFromRequest(req, alwaysRequireAuth).then(() => checkScopes(req, channelNameFromPath(req))).then(() => {
  465. subscribeHttpToSystemChannel(req, res);
  466. }).then(() => {
  467. next();
  468. }).catch(err => {
  469. next(err);
  470. });
  471. };
  472. /**
  473. * @param {Error} err
  474. * @param {any} req
  475. * @param {any} res
  476. * @param {function(Error=): void} next
  477. */
  478. const errorMiddleware = (err, req, res, next) => {
  479. log.error(req.requestId, err.toString());
  480. if (res.headersSent) {
  481. next(err);
  482. return;
  483. }
  484. res.writeHead(err.status || 500, { 'Content-Type': 'application/json' });
  485. res.end(JSON.stringify({ error: err.status ? err.toString() : 'An unexpected error occurred' }));
  486. };
  487. /**
  488. * @param {array} arr
  489. * @param {number=} shift
  490. * @return {string}
  491. */
  492. const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
  493. /**
  494. * @param {string} listId
  495. * @param {any} req
  496. * @return {Promise.<void>}
  497. */
  498. const authorizeListAccess = (listId, req) => new Promise((resolve, reject) => {
  499. const { accountId } = req;
  500. pgPool.connect((err, client, done) => {
  501. if (err) {
  502. reject();
  503. return;
  504. }
  505. client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [listId], (err, result) => {
  506. done();
  507. if (err || result.rows.length === 0 || result.rows[0].account_id !== accountId) {
  508. reject();
  509. return;
  510. }
  511. resolve();
  512. });
  513. });
  514. });
  515. /**
  516. * @param {string[]} ids
  517. * @param {any} req
  518. * @param {function(string, string): void} output
  519. * @param {function(string[], function(string): void): void} attachCloseHandler
  520. * @param {boolean=} needsFiltering
  521. * @returns {function(object): void}
  522. */
  523. const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
  524. const accountId = req.accountId || req.remoteAddress;
  525. log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
  526. // Currently message is of type string, soon it'll be Record<string, any>
  527. const listener = message => {
  528. const { event, payload, queued_at } = message;
  529. const transmit = () => {
  530. const now = new Date().getTime();
  531. const delta = now - queued_at;
  532. const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
  533. log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
  534. output(event, encodedPayload);
  535. };
  536. // Only messages that may require filtering are statuses, since notifications
  537. // are already personalized and deletes do not matter
  538. if (!needsFiltering || event !== 'update') {
  539. transmit();
  540. return;
  541. }
  542. const unpackedPayload = payload;
  543. const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
  544. const accountDomain = unpackedPayload.account.acct.split('@')[1];
  545. if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
  546. log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
  547. return;
  548. }
  549. // When the account is not logged in, it is not necessary to confirm the block or mute
  550. if (!req.accountId) {
  551. transmit();
  552. return;
  553. }
  554. pgPool.connect((err, client, done) => {
  555. if (err) {
  556. log.error(err);
  557. return;
  558. }
  559. const queries = [
  560. client.query(`SELECT 1
  561. FROM blocks
  562. WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)}))
  563. OR (account_id = $2 AND target_account_id = $1)
  564. UNION
  565. SELECT 1
  566. FROM mutes
  567. WHERE account_id = $1
  568. AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
  569. ];
  570. if (accountDomain) {
  571. queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
  572. }
  573. if (!unpackedPayload.filtered && !req.cachedFilters) {
  574. queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId]));
  575. }
  576. Promise.all(queries).then(values => {
  577. done();
  578. if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
  579. return;
  580. }
  581. if (!unpackedPayload.filtered && !req.cachedFilters) {
  582. const filterRows = values[accountDomain ? 2 : 1].rows;
  583. req.cachedFilters = filterRows.reduce((cache, row) => {
  584. if (cache[row.id]) {
  585. cache[row.id].keywords.push([row.keyword, row.whole_word]);
  586. } else {
  587. cache[row.id] = {
  588. keywords: [[row.keyword, row.whole_word]],
  589. expires_at: row.expires_at,
  590. repr: {
  591. id: row.id,
  592. title: row.title,
  593. context: row.context,
  594. expires_at: row.expires_at,
  595. filter_action: ['warn', 'hide'][row.filter_action],
  596. },
  597. };
  598. }
  599. return cache;
  600. }, {});
  601. Object.keys(req.cachedFilters).forEach((key) => {
  602. req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
  603. let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
  604. if (whole_word) {
  605. if (/^[\w]/.test(expr)) {
  606. expr = `\\b${expr}`;
  607. }
  608. if (/[\w]$/.test(expr)) {
  609. expr = `${expr}\\b`;
  610. }
  611. }
  612. return expr;
  613. }).join('|'), 'i');
  614. });
  615. }
  616. // Check filters
  617. if (req.cachedFilters && !unpackedPayload.filtered) {
  618. const status = unpackedPayload;
  619. const searchContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
  620. const searchIndex = JSDOM.fragment(searchContent).textContent;
  621. const now = new Date();
  622. payload.filtered = [];
  623. Object.values(req.cachedFilters).forEach((cachedFilter) => {
  624. if ((cachedFilter.expires_at === null || cachedFilter.expires_at > now)) {
  625. const keyword_matches = searchIndex.match(cachedFilter.regexp);
  626. if (keyword_matches) {
  627. payload.filtered.push({
  628. filter: cachedFilter.repr,
  629. keyword_matches,
  630. });
  631. }
  632. }
  633. });
  634. }
  635. transmit();
  636. }).catch(err => {
  637. log.error(err);
  638. done();
  639. });
  640. });
  641. };
  642. ids.forEach(id => {
  643. subscribe(`${redisPrefix}${id}`, listener);
  644. });
  645. if (attachCloseHandler) {
  646. attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
  647. }
  648. return listener;
  649. };
  650. /**
  651. * @param {any} req
  652. * @param {any} res
  653. * @return {function(string, string): void}
  654. */
  655. const streamToHttp = (req, res) => {
  656. const accountId = req.accountId || req.remoteAddress;
  657. res.setHeader('Content-Type', 'text/event-stream');
  658. res.setHeader('Cache-Control', 'no-store');
  659. res.setHeader('Transfer-Encoding', 'chunked');
  660. res.write(':)\n');
  661. const heartbeat = setInterval(() => res.write(':thump\n'), 15000);
  662. req.on('close', () => {
  663. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  664. clearInterval(heartbeat);
  665. });
  666. return (event, payload) => {
  667. res.write(`event: ${event}\n`);
  668. res.write(`data: ${payload}\n\n`);
  669. };
  670. };
  671. /**
  672. * @param {any} req
  673. * @param {function(): void} [closeHandler]
  674. * @return {function(string[]): void}
  675. */
  676. const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
  677. req.on('close', () => {
  678. ids.forEach(id => {
  679. unsubscribe(id);
  680. });
  681. if (closeHandler) {
  682. closeHandler();
  683. }
  684. });
  685. };
  686. /**
  687. * @param {any} req
  688. * @param {any} ws
  689. * @param {string[]} streamName
  690. * @return {function(string, string): void}
  691. */
  692. const streamToWs = (req, ws, streamName) => (event, payload) => {
  693. if (ws.readyState !== ws.OPEN) {
  694. log.error(req.requestId, 'Tried writing to closed socket');
  695. return;
  696. }
  697. ws.send(JSON.stringify({ stream: streamName, event, payload }));
  698. };
  699. /**
  700. * @param {any} res
  701. */
  702. const httpNotFound = res => {
  703. res.writeHead(404, { 'Content-Type': 'application/json' });
  704. res.end(JSON.stringify({ error: 'Not found' }));
  705. };
  706. app.use(setRequestId);
  707. app.use(setRemoteAddress);
  708. app.use(allowCrossDomain);
  709. app.get('/api/v1/streaming/health', (req, res) => {
  710. res.writeHead(200, { 'Content-Type': 'text/plain' });
  711. res.end('OK');
  712. });
  713. app.get('/metrics', (req, res) => server.getConnections((err, count) => {
  714. res.writeHeader(200, { 'Content-Type': 'application/openmetrics-text; version=1.0.0; charset=utf-8' });
  715. res.write('# TYPE connected_clients gauge\n');
  716. res.write('# HELP connected_clients The number of clients connected to the streaming server\n');
  717. res.write(`connected_clients ${count}.0\n`);
  718. res.write('# TYPE connected_channels gauge\n');
  719. res.write('# HELP connected_channels The number of Redis channels the streaming server is subscribed to\n');
  720. res.write(`connected_channels ${Object.keys(subs).length}.0\n`);
  721. res.write('# TYPE pg_pool_total_connections gauge\n');
  722. res.write('# HELP pg_pool_total_connections The total number of clients existing within the pool\n');
  723. res.write(`pg_pool_total_connections ${pgPool.totalCount}.0\n`);
  724. res.write('# TYPE pg_pool_idle_connections gauge\n');
  725. res.write('# HELP pg_pool_idle_connections The number of clients which are not checked out but are currently idle in the pool\n');
  726. res.write(`pg_pool_idle_connections ${pgPool.idleCount}.0\n`);
  727. res.write('# TYPE pg_pool_waiting_queries gauge\n');
  728. res.write('# HELP pg_pool_waiting_queries The number of queued requests waiting on a client when all clients are checked out\n');
  729. res.write(`pg_pool_waiting_queries ${pgPool.waitingCount}.0\n`);
  730. res.write('# EOF\n');
  731. res.end();
  732. }));
  733. app.use(authenticationMiddleware);
  734. app.use(errorMiddleware);
  735. app.get('/api/v1/streaming/*', (req, res) => {
  736. channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
  737. const onSend = streamToHttp(req, res);
  738. const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
  739. streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering);
  740. }).catch(err => {
  741. log.verbose(req.requestId, 'Subscription error:', err.toString());
  742. httpNotFound(res);
  743. });
  744. });
  745. const wss = new WebSocket.Server({ server, verifyClient: wsVerifyClient });
  746. /**
  747. * @typedef StreamParams
  748. * @property {string} [tag]
  749. * @property {string} [list]
  750. * @property {string} [only_media]
  751. */
  752. /**
  753. * @param {any} req
  754. * @return {string[]}
  755. */
  756. const channelsForUserStream = req => {
  757. const arr = [`timeline:${req.accountId}`];
  758. if (isInScope(req, ['crypto']) && req.deviceId) {
  759. arr.push(`timeline:${req.accountId}:${req.deviceId}`);
  760. }
  761. if (isInScope(req, ['read', 'read:notifications'])) {
  762. arr.push(`timeline:${req.accountId}:notifications`);
  763. }
  764. return arr;
  765. };
  766. /**
  767. * See app/lib/ascii_folder.rb for the canon definitions
  768. * of these constants
  769. */
  770. const NON_ASCII_CHARS = 'ÀÁÂÃÄÅàáâãäåĀāĂ㥹ÇçĆćĈĉĊċČčÐðĎďĐđÈÉÊËèéêëĒēĔĕĖėĘęĚěĜĝĞğĠġĢģĤĥĦħÌÍÎÏìíîïĨĩĪīĬĭĮįİıĴĵĶķĸĹĺĻļĽľĿŀŁłÑñŃńŅņŇňʼnŊŋÒÓÔÕÖØòóôõöøŌōŎŏŐőŔŕŖŗŘřŚśŜŝŞşŠšſŢţŤťŦŧÙÚÛÜùúûüŨũŪūŬŭŮůŰűŲųŴŵÝýÿŶŷŸŹźŻżŽž';
  771. const EQUIVALENT_ASCII_CHARS = 'AAAAAAaaaaaaAaAaAaCcCcCcCcCcDdDdDdEEEEeeeeEeEeEeEeEeGgGgGgGgHhHhIIIIiiiiIiIiIiIiIiJjKkkLlLlLlLlLlNnNnNnNnnNnOOOOOOooooooOoOoOoRrRrRrSsSsSsSssTtTtTtUUUUuuuuUuUuUuUuUuUuWwYyyYyYZzZzZz';
  772. /**
  773. * @param {string} str
  774. * @return {string}
  775. */
  776. const foldToASCII = str => {
  777. const regex = new RegExp(NON_ASCII_CHARS.split('').join('|'), 'g');
  778. return str.replace(regex, match => {
  779. const index = NON_ASCII_CHARS.indexOf(match);
  780. return EQUIVALENT_ASCII_CHARS[index];
  781. });
  782. };
  783. /**
  784. * @param {string} str
  785. * @return {string}
  786. */
  787. const normalizeHashtag = str => {
  788. return foldToASCII(str.normalize('NFKC').toLowerCase()).replace(/[^\p{L}\p{N}_\u00b7\u200c]/gu, '');
  789. };
  790. /**
  791. * @param {any} req
  792. * @param {string} name
  793. * @param {StreamParams} params
  794. * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
  795. */
  796. const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
  797. switch (name) {
  798. case 'user':
  799. resolve({
  800. channelIds: channelsForUserStream(req),
  801. options: { needsFiltering: false },
  802. });
  803. break;
  804. case 'user:notification':
  805. resolve({
  806. channelIds: [`timeline:${req.accountId}:notifications`],
  807. options: { needsFiltering: false },
  808. });
  809. break;
  810. case 'public':
  811. resolve({
  812. channelIds: ['timeline:public'],
  813. options: { needsFiltering: true },
  814. });
  815. break;
  816. case 'public:local':
  817. resolve({
  818. channelIds: ['timeline:public:local'],
  819. options: { needsFiltering: true },
  820. });
  821. break;
  822. case 'public:remote':
  823. resolve({
  824. channelIds: ['timeline:public:remote'],
  825. options: { needsFiltering: true },
  826. });
  827. break;
  828. case 'public:media':
  829. resolve({
  830. channelIds: ['timeline:public:media'],
  831. options: { needsFiltering: true },
  832. });
  833. break;
  834. case 'public:local:media':
  835. resolve({
  836. channelIds: ['timeline:public:local:media'],
  837. options: { needsFiltering: true },
  838. });
  839. break;
  840. case 'public:remote:media':
  841. resolve({
  842. channelIds: ['timeline:public:remote:media'],
  843. options: { needsFiltering: true },
  844. });
  845. break;
  846. case 'direct':
  847. resolve({
  848. channelIds: [`timeline:direct:${req.accountId}`],
  849. options: { needsFiltering: false },
  850. });
  851. break;
  852. case 'hashtag':
  853. if (!params.tag || params.tag.length === 0) {
  854. reject('No tag for stream provided');
  855. } else {
  856. resolve({
  857. channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`],
  858. options: { needsFiltering: true },
  859. });
  860. }
  861. break;
  862. case 'hashtag:local':
  863. if (!params.tag || params.tag.length === 0) {
  864. reject('No tag for stream provided');
  865. } else {
  866. resolve({
  867. channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`],
  868. options: { needsFiltering: true },
  869. });
  870. }
  871. break;
  872. case 'list':
  873. authorizeListAccess(params.list, req).then(() => {
  874. resolve({
  875. channelIds: [`timeline:list:${params.list}`],
  876. options: { needsFiltering: false },
  877. });
  878. }).catch(() => {
  879. reject('Not authorized to stream this list');
  880. });
  881. break;
  882. default:
  883. reject('Unknown stream type');
  884. }
  885. });
  886. /**
  887. * @param {string} channelName
  888. * @param {StreamParams} params
  889. * @return {string[]}
  890. */
  891. const streamNameFromChannelName = (channelName, params) => {
  892. if (channelName === 'list') {
  893. return [channelName, params.list];
  894. } else if (['hashtag', 'hashtag:local'].includes(channelName)) {
  895. return [channelName, params.tag];
  896. } else {
  897. return [channelName];
  898. }
  899. };
  900. /**
  901. * @typedef WebSocketSession
  902. * @property {any} socket
  903. * @property {any} request
  904. * @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions
  905. */
  906. /**
  907. * @param {WebSocketSession} session
  908. * @param {string} channelName
  909. * @param {StreamParams} params
  910. */
  911. const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) =>
  912. checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({
  913. channelIds,
  914. options,
  915. }) => {
  916. if (subscriptions[channelIds.join(';')]) {
  917. return;
  918. }
  919. const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
  920. const stopHeartbeat = subscriptionHeartbeat(channelIds);
  921. const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
  922. subscriptions[channelIds.join(';')] = {
  923. listener,
  924. stopHeartbeat,
  925. };
  926. }).catch(err => {
  927. log.verbose(request.requestId, 'Subscription error:', err.toString());
  928. socket.send(JSON.stringify({ error: err.toString() }));
  929. });
  930. /**
  931. * @param {WebSocketSession} session
  932. * @param {string} channelName
  933. * @param {StreamParams} params
  934. */
  935. const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) =>
  936. channelNameToIds(request, channelName, params).then(({ channelIds }) => {
  937. log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`);
  938. const subscription = subscriptions[channelIds.join(';')];
  939. if (!subscription) {
  940. return;
  941. }
  942. const { listener, stopHeartbeat } = subscription;
  943. channelIds.forEach(channelId => {
  944. unsubscribe(`${redisPrefix}${channelId}`, listener);
  945. });
  946. stopHeartbeat();
  947. delete subscriptions[channelIds.join(';')];
  948. }).catch(err => {
  949. log.verbose(request.requestId, 'Unsubscription error:', err);
  950. socket.send(JSON.stringify({ error: err.toString() }));
  951. });
  952. /**
  953. * @param {WebSocketSession} session
  954. */
  955. const subscribeWebsocketToSystemChannel = ({ socket, request, subscriptions }) => {
  956. const accessTokenChannelId = `timeline:access_token:${request.accessTokenId}`;
  957. const systemChannelId = `timeline:system:${request.accountId}`;
  958. const listener = createSystemMessageListener(request, {
  959. onKill() {
  960. socket.close();
  961. },
  962. });
  963. subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  964. subscribe(`${redisPrefix}${systemChannelId}`, listener);
  965. subscriptions[accessTokenChannelId] = {
  966. listener,
  967. stopHeartbeat: () => {
  968. },
  969. };
  970. subscriptions[systemChannelId] = {
  971. listener,
  972. stopHeartbeat: () => {
  973. },
  974. };
  975. };
  976. /**
  977. * @param {string|string[]} arrayOrString
  978. * @return {string}
  979. */
  980. const firstParam = arrayOrString => {
  981. if (Array.isArray(arrayOrString)) {
  982. return arrayOrString[0];
  983. } else {
  984. return arrayOrString;
  985. }
  986. };
  987. wss.on('connection', (ws, req) => {
  988. const location = url.parse(req.url, true);
  989. req.requestId = uuid.v4();
  990. req.remoteAddress = ws._socket.remoteAddress;
  991. ws.isAlive = true;
  992. ws.on('pong', () => {
  993. ws.isAlive = true;
  994. });
  995. /**
  996. * @type {WebSocketSession}
  997. */
  998. const session = {
  999. socket: ws,
  1000. request: req,
  1001. subscriptions: {},
  1002. };
  1003. const onEnd = () => {
  1004. const keys = Object.keys(session.subscriptions);
  1005. keys.forEach(channelIds => {
  1006. const { listener, stopHeartbeat } = session.subscriptions[channelIds];
  1007. channelIds.split(';').forEach(channelId => {
  1008. unsubscribe(`${redisPrefix}${channelId}`, listener);
  1009. });
  1010. stopHeartbeat();
  1011. });
  1012. };
  1013. ws.on('close', onEnd);
  1014. ws.on('error', onEnd);
  1015. ws.on('message', (data, isBinary) => {
  1016. if (isBinary) {
  1017. log.warn('socket', 'Received binary data, closing connection');
  1018. ws.close(1003, 'The mastodon streaming server does not support binary messages');
  1019. return;
  1020. }
  1021. const message = data.toString('utf8');
  1022. const json = parseJSON(message, session.request);
  1023. if (!json) return;
  1024. const { type, stream, ...params } = json;
  1025. if (type === 'subscribe') {
  1026. subscribeWebsocketToChannel(session, firstParam(stream), params);
  1027. } else if (type === 'unsubscribe') {
  1028. unsubscribeWebsocketFromChannel(session, firstParam(stream), params);
  1029. } else {
  1030. // Unknown action type
  1031. }
  1032. });
  1033. subscribeWebsocketToSystemChannel(session);
  1034. if (location.query.stream) {
  1035. subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query);
  1036. }
  1037. });
  1038. setInterval(() => {
  1039. wss.clients.forEach(ws => {
  1040. if (ws.isAlive === false) {
  1041. ws.terminate();
  1042. return;
  1043. }
  1044. ws.isAlive = false;
  1045. ws.ping('', false);
  1046. });
  1047. }, 30000);
  1048. attachServerWithConfig(server, address => {
  1049. log.warn(`Worker ${workerId} now listening on ${address}`);
  1050. });
  1051. const onExit = () => {
  1052. log.warn(`Worker ${workerId} exiting`);
  1053. server.close();
  1054. process.exit(0);
  1055. };
  1056. const onError = (err) => {
  1057. log.error(err);
  1058. server.close();
  1059. process.exit(0);
  1060. };
  1061. process.on('SIGINT', onExit);
  1062. process.on('SIGTERM', onExit);
  1063. process.on('exit', onExit);
  1064. process.on('uncaughtException', onError);
  1065. };
  1066. /**
  1067. * @param {any} server
  1068. * @param {function(string): void} [onSuccess]
  1069. */
  1070. const attachServerWithConfig = (server, onSuccess) => {
  1071. if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
  1072. server.listen(process.env.SOCKET || process.env.PORT, () => {
  1073. if (onSuccess) {
  1074. fs.chmodSync(server.address(), 0o666);
  1075. onSuccess(server.address());
  1076. }
  1077. });
  1078. } else {
  1079. server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => {
  1080. if (onSuccess) {
  1081. onSuccess(`${server.address().address}:${server.address().port}`);
  1082. }
  1083. });
  1084. }
  1085. };
  1086. /**
  1087. * @param {function(Error=): void} onSuccess
  1088. */
  1089. const onPortAvailable = onSuccess => {
  1090. const testServer = http.createServer();
  1091. testServer.once('error', err => {
  1092. onSuccess(err);
  1093. });
  1094. testServer.once('listening', () => {
  1095. testServer.once('close', () => onSuccess());
  1096. testServer.close();
  1097. });
  1098. attachServerWithConfig(testServer);
  1099. };
  1100. onPortAvailable(err => {
  1101. if (err) {
  1102. log.error('Could not start server, the port or socket is in use');
  1103. return;
  1104. }
  1105. throng({
  1106. workers: numWorkers,
  1107. lifetime: Infinity,
  1108. start: startWorker,
  1109. master: startMaster,
  1110. });
  1111. });