index.js 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471
  1. // @ts-check
  2. const fs = require('fs');
  3. const http = require('http');
  4. const url = require('url');
  5. const dotenv = require('dotenv');
  6. const express = require('express');
  7. const Redis = require('ioredis');
  8. const { JSDOM } = require('jsdom');
  9. const log = require('npmlog');
  10. const pg = require('pg');
  11. const dbUrlToConfig = require('pg-connection-string').parse;
  12. const metrics = require('prom-client');
  13. const uuid = require('uuid');
  14. const WebSocket = require('ws');
  15. const environment = process.env.NODE_ENV || 'development';
  16. dotenv.config({
  17. path: environment === 'production' ? '.env.production' : '.env',
  18. });
  19. log.level = process.env.LOG_LEVEL || 'verbose';
  20. /**
  21. * @param {Object.<string, any>} config
  22. */
  23. const createRedisClient = async (config) => {
  24. const { redisParams, redisUrl } = config;
  25. const client = new Redis(redisUrl, redisParams);
  26. client.on('error', (err) => log.error('Redis Client Error!', err));
  27. return client;
  28. };
  29. /**
  30. * Attempts to safely parse a string as JSON, used when both receiving a message
  31. * from redis and when receiving a message from a client over a websocket
  32. * connection, this is why it accepts a `req` argument.
  33. * @param {string} json
  34. * @param {any?} req
  35. * @returns {Object.<string, any>|null}
  36. */
  37. const parseJSON = (json, req) => {
  38. try {
  39. return JSON.parse(json);
  40. } catch (err) {
  41. /* FIXME: This logging isn't great, and should probably be done at the
  42. * call-site of parseJSON, not in the method, but this would require changing
  43. * the signature of parseJSON to return something akin to a Result type:
  44. * [Error|null, null|Object<string,any}], and then handling the error
  45. * scenarios.
  46. */
  47. if (req) {
  48. if (req.accountId) {
  49. log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`);
  50. } else {
  51. log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`);
  52. }
  53. } else {
  54. log.warn(`Error parsing message from redis: ${err}`);
  55. }
  56. return null;
  57. }
  58. };
  59. /**
  60. * @param {Object.<string, any>} env the `process.env` value to read configuration from
  61. * @returns {Object.<string, any>} the configuration for the PostgreSQL connection
  62. */
  63. const pgConfigFromEnv = (env) => {
  64. const pgConfigs = {
  65. development: {
  66. user: env.DB_USER || pg.defaults.user,
  67. password: env.DB_PASS || pg.defaults.password,
  68. database: env.DB_NAME || 'mastodon_development',
  69. host: env.DB_HOST || pg.defaults.host,
  70. port: env.DB_PORT || pg.defaults.port,
  71. },
  72. production: {
  73. user: env.DB_USER || 'mastodon',
  74. password: env.DB_PASS || '',
  75. database: env.DB_NAME || 'mastodon_production',
  76. host: env.DB_HOST || 'localhost',
  77. port: env.DB_PORT || 5432,
  78. },
  79. };
  80. let baseConfig;
  81. if (env.DATABASE_URL) {
  82. baseConfig = dbUrlToConfig(env.DATABASE_URL);
  83. // Support overriding the database password in the connection URL
  84. if (!baseConfig.password && env.DB_PASS) {
  85. baseConfig.password = env.DB_PASS;
  86. }
  87. } else {
  88. baseConfig = pgConfigs[environment];
  89. if (env.DB_SSLMODE) {
  90. switch(env.DB_SSLMODE) {
  91. case 'disable':
  92. case '':
  93. baseConfig.ssl = false;
  94. break;
  95. case 'no-verify':
  96. baseConfig.ssl = { rejectUnauthorized: false };
  97. break;
  98. default:
  99. baseConfig.ssl = {};
  100. break;
  101. }
  102. }
  103. }
  104. return {
  105. ...baseConfig,
  106. max: env.DB_POOL || 10,
  107. connectionTimeoutMillis: 15000,
  108. application_name: '',
  109. };
  110. };
  111. /**
  112. * @param {Object.<string, any>} env the `process.env` value to read configuration from
  113. * @returns {Object.<string, any>} configuration for the Redis connection
  114. */
  115. const redisConfigFromEnv = (env) => {
  116. // ioredis *can* transparently add prefixes for us, but it doesn't *in some cases*,
  117. // which means we can't use it. But this is something that should be looked into.
  118. const redisPrefix = env.REDIS_NAMESPACE ? `${env.REDIS_NAMESPACE}:` : '';
  119. const redisParams = {
  120. host: env.REDIS_HOST || '127.0.0.1',
  121. port: env.REDIS_PORT || 6379,
  122. db: env.REDIS_DB || 0,
  123. password: env.REDIS_PASSWORD || undefined,
  124. };
  125. // redisParams.path takes precedence over host and port.
  126. if (env.REDIS_URL && env.REDIS_URL.startsWith('unix://')) {
  127. redisParams.path = env.REDIS_URL.slice(7);
  128. }
  129. return {
  130. redisParams,
  131. redisPrefix,
  132. redisUrl: env.REDIS_URL,
  133. };
  134. };
  135. const startServer = async () => {
  136. const app = express();
  137. app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal');
  138. const pgPool = new pg.Pool(pgConfigFromEnv(process.env));
  139. const server = http.createServer(app);
  140. /**
  141. * @type {Object.<string, Array.<function(Object<string, any>): void>>}
  142. */
  143. const subs = {};
  144. const redisConfig = redisConfigFromEnv(process.env);
  145. const redisSubscribeClient = await createRedisClient(redisConfig);
  146. const redisClient = await createRedisClient(redisConfig);
  147. const { redisPrefix } = redisConfig;
  148. // Collect metrics from Node.js
  149. metrics.collectDefaultMetrics();
  150. new metrics.Gauge({
  151. name: 'pg_pool_total_connections',
  152. help: 'The total number of clients existing within the pool',
  153. collect() {
  154. this.set(pgPool.totalCount);
  155. },
  156. });
  157. new metrics.Gauge({
  158. name: 'pg_pool_idle_connections',
  159. help: 'The number of clients which are not checked out but are currently idle in the pool',
  160. collect() {
  161. this.set(pgPool.idleCount);
  162. },
  163. });
  164. new metrics.Gauge({
  165. name: 'pg_pool_waiting_queries',
  166. help: 'The number of queued requests waiting on a client when all clients are checked out',
  167. collect() {
  168. this.set(pgPool.waitingCount);
  169. },
  170. });
  171. const connectedClients = new metrics.Gauge({
  172. name: 'connected_clients',
  173. help: 'The number of clients connected to the streaming server',
  174. labelNames: ['type'],
  175. });
  176. connectedClients.set({ type: 'websocket' }, 0);
  177. connectedClients.set({ type: 'eventsource' }, 0);
  178. const connectedChannels = new metrics.Gauge({
  179. name: 'connected_channels',
  180. help: 'The number of channels the streaming server is streaming to',
  181. labelNames: [ 'type', 'channel' ]
  182. });
  183. const redisSubscriptions = new metrics.Gauge({
  184. name: 'redis_subscriptions',
  185. help: 'The number of Redis channels the streaming server is subscribed to',
  186. });
  187. // When checking metrics in the browser, the favicon is requested this
  188. // prevents the request from falling through to the API Router, which would
  189. // error for this endpoint:
  190. app.get('/favicon.ico', (req, res) => res.status(404).end());
  191. app.get('/api/v1/streaming/health', (req, res) => {
  192. res.writeHead(200, { 'Content-Type': 'text/plain' });
  193. res.end('OK');
  194. });
  195. app.get('/metrics', async (req, res) => {
  196. try {
  197. res.set('Content-Type', metrics.register.contentType);
  198. res.end(await metrics.register.metrics());
  199. } catch (ex) {
  200. log.error(ex);
  201. res.status(500).end();
  202. }
  203. });
  204. /**
  205. * @param {string[]} channels
  206. * @returns {function(): void}
  207. */
  208. const subscriptionHeartbeat = channels => {
  209. const interval = 6 * 60;
  210. const tellSubscribed = () => {
  211. channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3));
  212. };
  213. tellSubscribed();
  214. const heartbeat = setInterval(tellSubscribed, interval * 1000);
  215. return () => {
  216. clearInterval(heartbeat);
  217. };
  218. };
  219. /**
  220. * @param {string} channel
  221. * @param {string} message
  222. */
  223. const onRedisMessage = (channel, message) => {
  224. const callbacks = subs[channel];
  225. log.silly(`New message on channel ${redisPrefix}${channel}`);
  226. if (!callbacks) {
  227. return;
  228. }
  229. const json = parseJSON(message, null);
  230. if (!json) return;
  231. callbacks.forEach(callback => callback(json));
  232. };
  233. redisSubscribeClient.on("message", onRedisMessage);
  234. /**
  235. * @callback SubscriptionListener
  236. * @param {ReturnType<parseJSON>} json of the message
  237. * @returns void
  238. */
  239. /**
  240. * @param {string} channel
  241. * @param {SubscriptionListener} callback
  242. */
  243. const subscribe = (channel, callback) => {
  244. log.silly(`Adding listener for ${channel}`);
  245. subs[channel] = subs[channel] || [];
  246. if (subs[channel].length === 0) {
  247. log.verbose(`Subscribe ${channel}`);
  248. redisSubscribeClient.subscribe(channel, (err, count) => {
  249. if (err) {
  250. log.error(`Error subscribing to ${channel}`);
  251. }
  252. else {
  253. redisSubscriptions.set(count);
  254. }
  255. });
  256. }
  257. subs[channel].push(callback);
  258. };
  259. /**
  260. * @param {string} channel
  261. * @param {SubscriptionListener} callback
  262. */
  263. const unsubscribe = (channel, callback) => {
  264. log.silly(`Removing listener for ${channel}`);
  265. if (!subs[channel]) {
  266. return;
  267. }
  268. subs[channel] = subs[channel].filter(item => item !== callback);
  269. if (subs[channel].length === 0) {
  270. log.verbose(`Unsubscribe ${channel}`);
  271. redisSubscribeClient.unsubscribe(channel, (err, count) => {
  272. if (err) {
  273. log.error(`Error unsubscribing to ${channel}`);
  274. }
  275. else {
  276. redisSubscriptions.set(count);
  277. }
  278. });
  279. delete subs[channel];
  280. }
  281. };
  282. const FALSE_VALUES = [
  283. false,
  284. 0,
  285. '0',
  286. 'f',
  287. 'F',
  288. 'false',
  289. 'FALSE',
  290. 'off',
  291. 'OFF',
  292. ];
  293. /**
  294. * @param {any} value
  295. * @returns {boolean}
  296. */
  297. const isTruthy = value =>
  298. value && !FALSE_VALUES.includes(value);
  299. /**
  300. * @param {any} req
  301. * @param {any} res
  302. * @param {function(Error=): void} next
  303. */
  304. const allowCrossDomain = (req, res, next) => {
  305. res.header('Access-Control-Allow-Origin', '*');
  306. res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control');
  307. res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
  308. next();
  309. };
  310. /**
  311. * @param {any} req
  312. * @param {any} res
  313. * @param {function(Error=): void} next
  314. */
  315. const setRequestId = (req, res, next) => {
  316. req.requestId = uuid.v4();
  317. res.header('X-Request-Id', req.requestId);
  318. next();
  319. };
  320. /**
  321. * @param {any} req
  322. * @param {any} res
  323. * @param {function(Error=): void} next
  324. */
  325. const setRemoteAddress = (req, res, next) => {
  326. req.remoteAddress = req.connection.remoteAddress;
  327. next();
  328. };
  329. /**
  330. * @param {any} req
  331. * @param {string[]} necessaryScopes
  332. * @returns {boolean}
  333. */
  334. const isInScope = (req, necessaryScopes) =>
  335. req.scopes.some(scope => necessaryScopes.includes(scope));
  336. /**
  337. * @param {string} token
  338. * @param {any} req
  339. * @returns {Promise.<void>}
  340. */
  341. const accountFromToken = (token, req) => new Promise((resolve, reject) => {
  342. pgPool.connect((err, client, done) => {
  343. if (err) {
  344. reject(err);
  345. return;
  346. }
  347. client.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
  348. done();
  349. if (err) {
  350. reject(err);
  351. return;
  352. }
  353. if (result.rows.length === 0) {
  354. err = new Error('Invalid access token');
  355. err.status = 401;
  356. reject(err);
  357. return;
  358. }
  359. req.accessTokenId = result.rows[0].id;
  360. req.scopes = result.rows[0].scopes.split(' ');
  361. req.accountId = result.rows[0].account_id;
  362. req.chosenLanguages = result.rows[0].chosen_languages;
  363. req.deviceId = result.rows[0].device_id;
  364. resolve();
  365. });
  366. });
  367. });
  368. /**
  369. * @param {any} req
  370. * @returns {Promise.<void>}
  371. */
  372. const accountFromRequest = (req) => new Promise((resolve, reject) => {
  373. const authorization = req.headers.authorization;
  374. const location = url.parse(req.url, true);
  375. const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
  376. if (!authorization && !accessToken) {
  377. const err = new Error('Missing access token');
  378. err.status = 401;
  379. reject(err);
  380. return;
  381. }
  382. const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
  383. resolve(accountFromToken(token, req));
  384. });
  385. /**
  386. * @param {any} req
  387. * @returns {string|undefined}
  388. */
  389. const channelNameFromPath = req => {
  390. const { path, query } = req;
  391. const onlyMedia = isTruthy(query.only_media);
  392. switch (path) {
  393. case '/api/v1/streaming/user':
  394. return 'user';
  395. case '/api/v1/streaming/user/notification':
  396. return 'user:notification';
  397. case '/api/v1/streaming/public':
  398. return onlyMedia ? 'public:media' : 'public';
  399. case '/api/v1/streaming/public/local':
  400. return onlyMedia ? 'public:local:media' : 'public:local';
  401. case '/api/v1/streaming/public/remote':
  402. return onlyMedia ? 'public:remote:media' : 'public:remote';
  403. case '/api/v1/streaming/hashtag':
  404. return 'hashtag';
  405. case '/api/v1/streaming/hashtag/local':
  406. return 'hashtag:local';
  407. case '/api/v1/streaming/direct':
  408. return 'direct';
  409. case '/api/v1/streaming/list':
  410. return 'list';
  411. default:
  412. return undefined;
  413. }
  414. };
  415. const PUBLIC_CHANNELS = [
  416. 'public',
  417. 'public:media',
  418. 'public:local',
  419. 'public:local:media',
  420. 'public:remote',
  421. 'public:remote:media',
  422. 'hashtag',
  423. 'hashtag:local',
  424. ];
  425. /**
  426. * @param {any} req
  427. * @param {string|undefined} channelName
  428. * @returns {Promise.<void>}
  429. */
  430. const checkScopes = (req, channelName) => new Promise((resolve, reject) => {
  431. log.silly(req.requestId, `Checking OAuth scopes for ${channelName}`);
  432. // When accessing public channels, no scopes are needed
  433. if (PUBLIC_CHANNELS.includes(channelName)) {
  434. resolve();
  435. return;
  436. }
  437. // The `read` scope has the highest priority, if the token has it
  438. // then it can access all streams
  439. const requiredScopes = ['read'];
  440. // When accessing specifically the notifications stream,
  441. // we need a read:notifications, while in all other cases,
  442. // we can allow access with read:statuses. Mind that the
  443. // user stream will not contain notifications unless
  444. // the token has either read or read:notifications scope
  445. // as well, this is handled separately.
  446. if (channelName === 'user:notification') {
  447. requiredScopes.push('read:notifications');
  448. } else {
  449. requiredScopes.push('read:statuses');
  450. }
  451. if (req.scopes && requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) {
  452. resolve();
  453. return;
  454. }
  455. const err = new Error('Access token does not cover required scopes');
  456. err.status = 401;
  457. reject(err);
  458. });
  459. /**
  460. * @param {any} info
  461. * @param {function(boolean, number, string): void} callback
  462. */
  463. const wsVerifyClient = (info, callback) => {
  464. // When verifying the websockets connection, we no longer pre-emptively
  465. // check OAuth scopes and drop the connection if they're missing. We only
  466. // drop the connection if access without token is not allowed by environment
  467. // variables. OAuth scope checks are moved to the point of subscription
  468. // to a specific stream.
  469. accountFromRequest(info.req).then(() => {
  470. callback(true, undefined, undefined);
  471. }).catch(err => {
  472. log.error(info.req.requestId, err.toString());
  473. callback(false, 401, 'Unauthorized');
  474. });
  475. };
  476. /**
  477. * @typedef SystemMessageHandlers
  478. * @property {function(): void} onKill
  479. */
  480. /**
  481. * @param {any} req
  482. * @param {SystemMessageHandlers} eventHandlers
  483. * @returns {function(object): void}
  484. */
  485. const createSystemMessageListener = (req, eventHandlers) => {
  486. return message => {
  487. const { event } = message;
  488. log.silly(req.requestId, `System message for ${req.accountId}: ${event}`);
  489. if (event === 'kill') {
  490. log.verbose(req.requestId, `Closing connection for ${req.accountId} due to expired access token`);
  491. eventHandlers.onKill();
  492. } else if (event === 'filters_changed') {
  493. log.verbose(req.requestId, `Invalidating filters cache for ${req.accountId}`);
  494. req.cachedFilters = null;
  495. }
  496. };
  497. };
  498. /**
  499. * @param {any} req
  500. * @param {any} res
  501. */
  502. const subscribeHttpToSystemChannel = (req, res) => {
  503. const accessTokenChannelId = `timeline:access_token:${req.accessTokenId}`;
  504. const systemChannelId = `timeline:system:${req.accountId}`;
  505. const listener = createSystemMessageListener(req, {
  506. onKill() {
  507. res.end();
  508. },
  509. });
  510. res.on('close', () => {
  511. unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  512. unsubscribe(`${redisPrefix}${systemChannelId}`, listener);
  513. connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2);
  514. });
  515. subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  516. subscribe(`${redisPrefix}${systemChannelId}`, listener);
  517. connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2);
  518. };
  519. /**
  520. * @param {any} req
  521. * @param {any} res
  522. * @param {function(Error=): void} next
  523. */
  524. const authenticationMiddleware = (req, res, next) => {
  525. if (req.method === 'OPTIONS') {
  526. next();
  527. return;
  528. }
  529. const channelName = channelNameFromPath(req);
  530. // If no channelName can be found for the request, then we should terminate
  531. // the connection, as there's nothing to stream back
  532. if (!channelName) {
  533. const err = new Error('Unknown channel requested');
  534. err.status = 400;
  535. next(err);
  536. return;
  537. }
  538. accountFromRequest(req).then(() => checkScopes(req, channelName)).then(() => {
  539. subscribeHttpToSystemChannel(req, res);
  540. }).then(() => {
  541. next();
  542. }).catch(err => {
  543. next(err);
  544. });
  545. };
  546. /**
  547. * @param {Error} err
  548. * @param {any} req
  549. * @param {any} res
  550. * @param {function(Error=): void} next
  551. */
  552. const errorMiddleware = (err, req, res, next) => {
  553. log.error(req.requestId, err.toString());
  554. if (res.headersSent) {
  555. next(err);
  556. return;
  557. }
  558. res.writeHead(err.status || 500, { 'Content-Type': 'application/json' });
  559. res.end(JSON.stringify({ error: err.status ? err.toString() : 'An unexpected error occurred' }));
  560. };
  561. /**
  562. * @param {array} arr
  563. * @param {number=} shift
  564. * @returns {string}
  565. */
  566. const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
  567. /**
  568. * @param {string} listId
  569. * @param {any} req
  570. * @returns {Promise.<void>}
  571. */
  572. const authorizeListAccess = (listId, req) => new Promise((resolve, reject) => {
  573. const { accountId } = req;
  574. pgPool.connect((err, client, done) => {
  575. if (err) {
  576. reject();
  577. return;
  578. }
  579. client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [listId], (err, result) => {
  580. done();
  581. if (err || result.rows.length === 0 || result.rows[0].account_id !== accountId) {
  582. reject();
  583. return;
  584. }
  585. resolve();
  586. });
  587. });
  588. });
  589. /**
  590. * @param {string[]} ids
  591. * @param {any} req
  592. * @param {function(string, string): void} output
  593. * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
  594. * @param {boolean=} needsFiltering
  595. * @returns {SubscriptionListener}
  596. */
  597. const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
  598. const accountId = req.accountId || req.remoteAddress;
  599. log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
  600. const transmit = (event, payload) => {
  601. // TODO: Replace "string"-based delete payloads with object payloads:
  602. const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
  603. log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`);
  604. output(event, encodedPayload);
  605. };
  606. // The listener used to process each message off the redis subscription,
  607. // message here is an object with an `event` and `payload` property. Some
  608. // events also include a queued_at value, but this is being removed shortly.
  609. /** @type {SubscriptionListener} */
  610. const listener = message => {
  611. const { event, payload } = message;
  612. // Streaming only needs to apply filtering to some channels and only to
  613. // some events. This is because majority of the filtering happens on the
  614. // Ruby on Rails side when producing the event for streaming.
  615. //
  616. // The only events that require filtering from the streaming server are
  617. // `update` and `status.update`, all other events are transmitted to the
  618. // client as soon as they're received (pass-through).
  619. //
  620. // The channels that need filtering are determined in the function
  621. // `channelNameToIds` defined below:
  622. if (!needsFiltering || (event !== 'update' && event !== 'status.update')) {
  623. transmit(event, payload);
  624. return;
  625. }
  626. // The rest of the logic from here on in this function is to handle
  627. // filtering of statuses:
  628. // Filter based on language:
  629. if (Array.isArray(req.chosenLanguages) && payload.language !== null && req.chosenLanguages.indexOf(payload.language) === -1) {
  630. log.silly(req.requestId, `Message ${payload.id} filtered by language (${payload.language})`);
  631. return;
  632. }
  633. // When the account is not logged in, it is not necessary to confirm the block or mute
  634. if (!req.accountId) {
  635. transmit(event, payload);
  636. return;
  637. }
  638. // Filter based on domain blocks, blocks, mutes, or custom filters:
  639. const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id));
  640. const accountDomain = payload.account.acct.split('@')[1];
  641. // TODO: Move this logic out of the message handling loop
  642. pgPool.connect((err, client, releasePgConnection) => {
  643. if (err) {
  644. log.error(err);
  645. return;
  646. }
  647. const queries = [
  648. client.query(`SELECT 1
  649. FROM blocks
  650. WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)}))
  651. OR (account_id = $2 AND target_account_id = $1)
  652. UNION
  653. SELECT 1
  654. FROM mutes
  655. WHERE account_id = $1
  656. AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.account.id].concat(targetAccountIds)),
  657. ];
  658. if (accountDomain) {
  659. queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
  660. }
  661. if (!payload.filtered && !req.cachedFilters) {
  662. queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId]));
  663. }
  664. Promise.all(queries).then(values => {
  665. releasePgConnection();
  666. // Handling blocks & mutes and domain blocks: If one of those applies,
  667. // then we don't transmit the payload of the event to the client
  668. if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
  669. return;
  670. }
  671. // If the payload already contains the `filtered` property, it means
  672. // that filtering has been applied on the ruby on rails side, as
  673. // such, we don't need to construct or apply the filters in streaming:
  674. if (Object.prototype.hasOwnProperty.call(payload, "filtered")) {
  675. transmit(event, payload);
  676. return;
  677. }
  678. // Handling for constructing the custom filters and caching them on the request
  679. // TODO: Move this logic out of the message handling lifecycle
  680. if (!req.cachedFilters) {
  681. const filterRows = values[accountDomain ? 2 : 1].rows;
  682. req.cachedFilters = filterRows.reduce((cache, filter) => {
  683. if (cache[filter.id]) {
  684. cache[filter.id].keywords.push([filter.keyword, filter.whole_word]);
  685. } else {
  686. cache[filter.id] = {
  687. keywords: [[filter.keyword, filter.whole_word]],
  688. expires_at: filter.expires_at,
  689. filter: {
  690. id: filter.id,
  691. title: filter.title,
  692. context: filter.context,
  693. expires_at: filter.expires_at,
  694. // filter.filter_action is the value from the
  695. // custom_filters.action database column, it is an integer
  696. // representing a value in an enum defined by Ruby on Rails:
  697. //
  698. // enum { warn: 0, hide: 1 }
  699. filter_action: ['warn', 'hide'][filter.filter_action],
  700. },
  701. };
  702. }
  703. return cache;
  704. }, {});
  705. // Construct the regular expressions for the custom filters: This
  706. // needs to be done in a separate loop as the database returns one
  707. // filterRow per keyword, so we need all the keywords before
  708. // constructing the regular expression
  709. Object.keys(req.cachedFilters).forEach((key) => {
  710. req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
  711. let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
  712. if (whole_word) {
  713. if (/^[\w]/.test(expr)) {
  714. expr = `\\b${expr}`;
  715. }
  716. if (/[\w]$/.test(expr)) {
  717. expr = `${expr}\\b`;
  718. }
  719. }
  720. return expr;
  721. }).join('|'), 'i');
  722. });
  723. }
  724. // Apply cachedFilters against the payload, constructing a
  725. // `filter_results` array of FilterResult entities
  726. if (req.cachedFilters) {
  727. const status = payload;
  728. // TODO: Calculate searchableContent in Ruby on Rails:
  729. const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
  730. const searchableTextContent = JSDOM.fragment(searchableContent).textContent;
  731. const now = new Date();
  732. const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => {
  733. // Check the filter hasn't expired before applying:
  734. if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) {
  735. return results;
  736. }
  737. // Just in-case JSDOM fails to find textContent in searchableContent
  738. if (!searchableTextContent) {
  739. return results;
  740. }
  741. const keyword_matches = searchableTextContent.match(cachedFilter.regexp);
  742. if (keyword_matches) {
  743. // results is an Array of FilterResult; status_matches is always
  744. // null as we only are only applying the keyword-based custom
  745. // filters, not the status-based custom filters.
  746. // https://docs.joinmastodon.org/entities/FilterResult/
  747. results.push({
  748. filter: cachedFilter.filter,
  749. keyword_matches,
  750. status_matches: null
  751. });
  752. }
  753. return results;
  754. }, []);
  755. // Send the payload + the FilterResults as the `filtered` property
  756. // to the streaming connection. To reach this code, the `event` must
  757. // have been either `update` or `status.update`, meaning the
  758. // `payload` is a Status entity, which has a `filtered` property:
  759. //
  760. // filtered: https://docs.joinmastodon.org/entities/Status/#filtered
  761. transmit(event, {
  762. ...payload,
  763. filtered: filter_results
  764. });
  765. } else {
  766. transmit(event, payload);
  767. }
  768. }).catch(err => {
  769. log.error(err);
  770. releasePgConnection();
  771. });
  772. });
  773. };
  774. ids.forEach(id => {
  775. subscribe(`${redisPrefix}${id}`, listener);
  776. });
  777. if (typeof attachCloseHandler === 'function') {
  778. attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
  779. }
  780. return listener;
  781. };
  782. /**
  783. * @param {any} req
  784. * @param {any} res
  785. * @returns {function(string, string): void}
  786. */
  787. const streamToHttp = (req, res) => {
  788. const accountId = req.accountId || req.remoteAddress;
  789. const channelName = channelNameFromPath(req);
  790. connectedClients.labels({ type: 'eventsource' }).inc();
  791. // In theory we'll always have a channel name, but channelNameFromPath can return undefined:
  792. if (typeof channelName === 'string') {
  793. connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc();
  794. }
  795. res.setHeader('Content-Type', 'text/event-stream');
  796. res.setHeader('Cache-Control', 'no-store');
  797. res.setHeader('Transfer-Encoding', 'chunked');
  798. res.write(':)\n');
  799. const heartbeat = setInterval(() => res.write(':thump\n'), 15000);
  800. req.on('close', () => {
  801. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  802. // We decrement these counters here instead of in streamHttpEnd as in that
  803. // method we don't have knowledge of the channel names
  804. connectedClients.labels({ type: 'eventsource' }).dec();
  805. // In theory we'll always have a channel name, but channelNameFromPath can return undefined:
  806. if (typeof channelName === 'string') {
  807. connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec();
  808. }
  809. clearInterval(heartbeat);
  810. });
  811. return (event, payload) => {
  812. res.write(`event: ${event}\n`);
  813. res.write(`data: ${payload}\n\n`);
  814. };
  815. };
  816. /**
  817. * @param {any} req
  818. * @param {function(): void} [closeHandler]
  819. * @returns {function(string[], SubscriptionListener): void}
  820. */
  821. const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
  822. req.on('close', () => {
  823. ids.forEach(id => {
  824. unsubscribe(id, listener);
  825. });
  826. if (closeHandler) {
  827. closeHandler();
  828. }
  829. });
  830. };
  831. /**
  832. * @param {any} req
  833. * @param {any} ws
  834. * @param {string[]} streamName
  835. * @returns {function(string, string): void}
  836. */
  837. const streamToWs = (req, ws, streamName) => (event, payload) => {
  838. if (ws.readyState !== ws.OPEN) {
  839. log.error(req.requestId, 'Tried writing to closed socket');
  840. return;
  841. }
  842. ws.send(JSON.stringify({ stream: streamName, event, payload }), (err) => {
  843. if (err) {
  844. log.error(req.requestId, `Failed to send to websocket: ${err}`);
  845. }
  846. });
  847. };
  848. /**
  849. * @param {any} res
  850. */
  851. const httpNotFound = res => {
  852. res.writeHead(404, { 'Content-Type': 'application/json' });
  853. res.end(JSON.stringify({ error: 'Not found' }));
  854. };
  855. const api = express.Router();
  856. app.use(api);
  857. api.use(setRequestId);
  858. api.use(setRemoteAddress);
  859. api.use(allowCrossDomain);
  860. api.use(authenticationMiddleware);
  861. api.use(errorMiddleware);
  862. api.get('/api/v1/streaming/*', (req, res) => {
  863. channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
  864. const onSend = streamToHttp(req, res);
  865. const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
  866. streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering);
  867. }).catch(err => {
  868. log.verbose(req.requestId, 'Subscription error:', err.toString());
  869. httpNotFound(res);
  870. });
  871. });
  872. const wss = new WebSocket.Server({ server, verifyClient: wsVerifyClient });
  873. /**
  874. * @typedef StreamParams
  875. * @property {string} [tag]
  876. * @property {string} [list]
  877. * @property {string} [only_media]
  878. */
  879. /**
  880. * @param {any} req
  881. * @returns {string[]}
  882. */
  883. const channelsForUserStream = req => {
  884. const arr = [`timeline:${req.accountId}`];
  885. if (isInScope(req, ['crypto']) && req.deviceId) {
  886. arr.push(`timeline:${req.accountId}:${req.deviceId}`);
  887. }
  888. if (isInScope(req, ['read', 'read:notifications'])) {
  889. arr.push(`timeline:${req.accountId}:notifications`);
  890. }
  891. return arr;
  892. };
  893. /**
  894. * See app/lib/ascii_folder.rb for the canon definitions
  895. * of these constants
  896. */
  897. const NON_ASCII_CHARS = 'ÀÁÂÃÄÅàáâãäåĀāĂ㥹ÇçĆćĈĉĊċČčÐðĎďĐđÈÉÊËèéêëĒēĔĕĖėĘęĚěĜĝĞğĠġĢģĤĥĦħÌÍÎÏìíîïĨĩĪīĬĭĮįİıĴĵĶķĸĹĺĻļĽľĿŀŁłÑñŃńŅņŇňʼnŊŋÒÓÔÕÖØòóôõöøŌōŎŏŐőŔŕŖŗŘřŚśŜŝŞşŠšſŢţŤťŦŧÙÚÛÜùúûüŨũŪūŬŭŮůŰűŲųŴŵÝýÿŶŷŸŹźŻżŽž';
  898. const EQUIVALENT_ASCII_CHARS = 'AAAAAAaaaaaaAaAaAaCcCcCcCcCcDdDdDdEEEEeeeeEeEeEeEeEeGgGgGgGgHhHhIIIIiiiiIiIiIiIiIiJjKkkLlLlLlLlLlNnNnNnNnnNnOOOOOOooooooOoOoOoRrRrRrSsSsSsSssTtTtTtUUUUuuuuUuUuUuUuUuUuWwYyyYyYZzZzZz';
  899. /**
  900. * @param {string} str
  901. * @returns {string}
  902. */
  903. const foldToASCII = str => {
  904. const regex = new RegExp(NON_ASCII_CHARS.split('').join('|'), 'g');
  905. return str.replace(regex, match => {
  906. const index = NON_ASCII_CHARS.indexOf(match);
  907. return EQUIVALENT_ASCII_CHARS[index];
  908. });
  909. };
  910. /**
  911. * @param {string} str
  912. * @returns {string}
  913. */
  914. const normalizeHashtag = str => {
  915. return foldToASCII(str.normalize('NFKC').toLowerCase()).replace(/[^\p{L}\p{N}_\u00b7\u200c]/gu, '');
  916. };
  917. /**
  918. * @param {any} req
  919. * @param {string} name
  920. * @param {StreamParams} params
  921. * @returns {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
  922. */
  923. const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
  924. switch (name) {
  925. case 'user':
  926. resolve({
  927. channelIds: channelsForUserStream(req),
  928. options: { needsFiltering: false },
  929. });
  930. break;
  931. case 'user:notification':
  932. resolve({
  933. channelIds: [`timeline:${req.accountId}:notifications`],
  934. options: { needsFiltering: false },
  935. });
  936. break;
  937. case 'public':
  938. resolve({
  939. channelIds: ['timeline:public'],
  940. options: { needsFiltering: true },
  941. });
  942. break;
  943. case 'public:local':
  944. resolve({
  945. channelIds: ['timeline:public:local'],
  946. options: { needsFiltering: true },
  947. });
  948. break;
  949. case 'public:remote':
  950. resolve({
  951. channelIds: ['timeline:public:remote'],
  952. options: { needsFiltering: true },
  953. });
  954. break;
  955. case 'public:media':
  956. resolve({
  957. channelIds: ['timeline:public:media'],
  958. options: { needsFiltering: true },
  959. });
  960. break;
  961. case 'public:local:media':
  962. resolve({
  963. channelIds: ['timeline:public:local:media'],
  964. options: { needsFiltering: true },
  965. });
  966. break;
  967. case 'public:remote:media':
  968. resolve({
  969. channelIds: ['timeline:public:remote:media'],
  970. options: { needsFiltering: true },
  971. });
  972. break;
  973. case 'direct':
  974. resolve({
  975. channelIds: [`timeline:direct:${req.accountId}`],
  976. options: { needsFiltering: false },
  977. });
  978. break;
  979. case 'hashtag':
  980. if (!params.tag || params.tag.length === 0) {
  981. reject('No tag for stream provided');
  982. } else {
  983. resolve({
  984. channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`],
  985. options: { needsFiltering: true },
  986. });
  987. }
  988. break;
  989. case 'hashtag:local':
  990. if (!params.tag || params.tag.length === 0) {
  991. reject('No tag for stream provided');
  992. } else {
  993. resolve({
  994. channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`],
  995. options: { needsFiltering: true },
  996. });
  997. }
  998. break;
  999. case 'list':
  1000. authorizeListAccess(params.list, req).then(() => {
  1001. resolve({
  1002. channelIds: [`timeline:list:${params.list}`],
  1003. options: { needsFiltering: false },
  1004. });
  1005. }).catch(() => {
  1006. reject('Not authorized to stream this list');
  1007. });
  1008. break;
  1009. default:
  1010. reject('Unknown stream type');
  1011. }
  1012. });
  1013. /**
  1014. * @param {string} channelName
  1015. * @param {StreamParams} params
  1016. * @returns {string[]}
  1017. */
  1018. const streamNameFromChannelName = (channelName, params) => {
  1019. if (channelName === 'list') {
  1020. return [channelName, params.list];
  1021. } else if (['hashtag', 'hashtag:local'].includes(channelName)) {
  1022. return [channelName, params.tag];
  1023. } else {
  1024. return [channelName];
  1025. }
  1026. };
  1027. /**
  1028. * @typedef WebSocketSession
  1029. * @property {any} socket
  1030. * @property {any} request
  1031. * @property {Object.<string, { channelName: string, listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
  1032. */
  1033. /**
  1034. * @param {WebSocketSession} session
  1035. * @param {string} channelName
  1036. * @param {StreamParams} params
  1037. * @returns {void}
  1038. */
  1039. const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) => {
  1040. checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({
  1041. channelIds,
  1042. options,
  1043. }) => {
  1044. if (subscriptions[channelIds.join(';')]) {
  1045. return;
  1046. }
  1047. const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
  1048. const stopHeartbeat = subscriptionHeartbeat(channelIds);
  1049. const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
  1050. connectedChannels.labels({ type: 'websocket', channel: channelName }).inc();
  1051. subscriptions[channelIds.join(';')] = {
  1052. channelName,
  1053. listener,
  1054. stopHeartbeat,
  1055. };
  1056. }).catch(err => {
  1057. log.verbose(request.requestId, 'Subscription error:', err.toString());
  1058. socket.send(JSON.stringify({ error: err.toString() }));
  1059. });
  1060. }
  1061. const removeSubscription = (subscriptions, channelIds, request) => {
  1062. log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`);
  1063. const subscription = subscriptions[channelIds.join(';')];
  1064. if (!subscription) {
  1065. return;
  1066. }
  1067. channelIds.forEach(channelId => {
  1068. unsubscribe(`${redisPrefix}${channelId}`, subscription.listener);
  1069. });
  1070. connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec();
  1071. subscription.stopHeartbeat();
  1072. delete subscriptions[channelIds.join(';')];
  1073. }
  1074. /**
  1075. * @param {WebSocketSession} session
  1076. * @param {string} channelName
  1077. * @param {StreamParams} params
  1078. * @returns {void}
  1079. */
  1080. const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) => {
  1081. channelNameToIds(request, channelName, params).then(({ channelIds }) => {
  1082. removeSubscription(subscriptions, channelIds, request);
  1083. }).catch(err => {
  1084. log.verbose(request.requestId, 'Unsubscribe error:', err);
  1085. // If we have a socket that is alive and open still, send the error back to the client:
  1086. // FIXME: In other parts of the code ws === socket
  1087. if (socket.isAlive && socket.readyState === socket.OPEN) {
  1088. socket.send(JSON.stringify({ error: "Error unsubscribing from channel" }));
  1089. }
  1090. });
  1091. }
  1092. /**
  1093. * @param {WebSocketSession} session
  1094. */
  1095. const subscribeWebsocketToSystemChannel = ({ socket, request, subscriptions }) => {
  1096. const accessTokenChannelId = `timeline:access_token:${request.accessTokenId}`;
  1097. const systemChannelId = `timeline:system:${request.accountId}`;
  1098. const listener = createSystemMessageListener(request, {
  1099. onKill() {
  1100. socket.close();
  1101. },
  1102. });
  1103. subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  1104. subscribe(`${redisPrefix}${systemChannelId}`, listener);
  1105. subscriptions[accessTokenChannelId] = {
  1106. channelName: 'system',
  1107. listener,
  1108. stopHeartbeat: () => {
  1109. },
  1110. };
  1111. subscriptions[systemChannelId] = {
  1112. channelName: 'system',
  1113. listener,
  1114. stopHeartbeat: () => {
  1115. },
  1116. };
  1117. connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2);
  1118. };
  1119. /**
  1120. * @param {string|string[]} arrayOrString
  1121. * @returns {string}
  1122. */
  1123. const firstParam = arrayOrString => {
  1124. if (Array.isArray(arrayOrString)) {
  1125. return arrayOrString[0];
  1126. } else {
  1127. return arrayOrString;
  1128. }
  1129. };
  1130. wss.on('connection', (ws, req) => {
  1131. const location = url.parse(req.url, true);
  1132. req.requestId = uuid.v4();
  1133. req.remoteAddress = ws._socket.remoteAddress;
  1134. ws.isAlive = true;
  1135. ws.on('pong', () => {
  1136. ws.isAlive = true;
  1137. });
  1138. connectedClients.labels({ type: 'websocket' }).inc();
  1139. /**
  1140. * @type {WebSocketSession}
  1141. */
  1142. const session = {
  1143. socket: ws,
  1144. request: req,
  1145. subscriptions: {},
  1146. };
  1147. const onEnd = () => {
  1148. const subscriptions = Object.keys(session.subscriptions);
  1149. subscriptions.forEach(channelIds => {
  1150. removeSubscription(session.subscriptions, channelIds.split(';'), req)
  1151. });
  1152. // ensure garbage collection:
  1153. session.socket = null;
  1154. session.request = null;
  1155. session.subscriptions = {};
  1156. connectedClients.labels({ type: 'websocket' }).dec();
  1157. };
  1158. ws.on('close', onEnd);
  1159. ws.on('error', onEnd);
  1160. ws.on('message', (data, isBinary) => {
  1161. if (isBinary) {
  1162. log.warn('socket', 'Received binary data, closing connection');
  1163. ws.close(1003, 'The mastodon streaming server does not support binary messages');
  1164. return;
  1165. }
  1166. const message = data.toString('utf8');
  1167. const json = parseJSON(message, session.request);
  1168. if (!json) return;
  1169. const { type, stream, ...params } = json;
  1170. if (type === 'subscribe') {
  1171. subscribeWebsocketToChannel(session, firstParam(stream), params);
  1172. } else if (type === 'unsubscribe') {
  1173. unsubscribeWebsocketFromChannel(session, firstParam(stream), params);
  1174. } else {
  1175. // Unknown action type
  1176. }
  1177. });
  1178. subscribeWebsocketToSystemChannel(session);
  1179. if (location.query.stream) {
  1180. subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query);
  1181. }
  1182. });
  1183. setInterval(() => {
  1184. wss.clients.forEach(ws => {
  1185. if (ws.isAlive === false) {
  1186. ws.terminate();
  1187. return;
  1188. }
  1189. ws.isAlive = false;
  1190. ws.ping('', false);
  1191. });
  1192. }, 30000);
  1193. attachServerWithConfig(server, address => {
  1194. log.warn(`Streaming API now listening on ${address}`);
  1195. });
  1196. const onExit = () => {
  1197. server.close();
  1198. process.exit(0);
  1199. };
  1200. const onError = (err) => {
  1201. log.error(err);
  1202. server.close();
  1203. process.exit(0);
  1204. };
  1205. process.on('SIGINT', onExit);
  1206. process.on('SIGTERM', onExit);
  1207. process.on('exit', onExit);
  1208. process.on('uncaughtException', onError);
  1209. };
  1210. /**
  1211. * @param {any} server
  1212. * @param {function(string): void} [onSuccess]
  1213. */
  1214. const attachServerWithConfig = (server, onSuccess) => {
  1215. if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
  1216. server.listen(process.env.SOCKET || process.env.PORT, () => {
  1217. if (onSuccess) {
  1218. fs.chmodSync(server.address(), 0o666);
  1219. onSuccess(server.address());
  1220. }
  1221. });
  1222. } else {
  1223. server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => {
  1224. if (onSuccess) {
  1225. onSuccess(`${server.address().address}:${server.address().port}`);
  1226. }
  1227. });
  1228. }
  1229. };
  1230. startServer();