index.js 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446
  1. // @ts-check
  2. const fs = require('fs');
  3. const http = require('http');
  4. const path = require('path');
  5. const url = require('url');
  6. const dotenv = require('dotenv');
  7. const express = require('express');
  8. const Redis = require('ioredis');
  9. const { JSDOM } = require('jsdom');
  10. const log = require('npmlog');
  11. const pg = require('pg');
  12. const dbUrlToConfig = require('pg-connection-string').parse;
  13. const uuid = require('uuid');
  14. const WebSocket = require('ws');
  15. const { setupMetrics } = require('./metrics');
  16. const { isTruthy } = require("./utils");
  17. const environment = process.env.NODE_ENV || 'development';
  18. // Correctly detect and load .env or .env.production file based on environment:
  19. const dotenvFile = environment === 'production' ? '.env.production' : '.env';
  20. dotenv.config({
  21. path: path.resolve(__dirname, path.join('..', dotenvFile))
  22. });
  23. log.level = process.env.LOG_LEVEL || 'verbose';
  24. /**
  25. * @param {Object.<string, any>} config
  26. */
  27. const createRedisClient = async (config) => {
  28. const { redisParams, redisUrl } = config;
  29. const client = new Redis(redisUrl, redisParams);
  30. client.on('error', (err) => log.error('Redis Client Error!', err));
  31. return client;
  32. };
  33. /**
  34. * Attempts to safely parse a string as JSON, used when both receiving a message
  35. * from redis and when receiving a message from a client over a websocket
  36. * connection, this is why it accepts a `req` argument.
  37. * @param {string} json
  38. * @param {any?} req
  39. * @returns {Object.<string, any>|null}
  40. */
  41. const parseJSON = (json, req) => {
  42. try {
  43. return JSON.parse(json);
  44. } catch (err) {
  45. /* FIXME: This logging isn't great, and should probably be done at the
  46. * call-site of parseJSON, not in the method, but this would require changing
  47. * the signature of parseJSON to return something akin to a Result type:
  48. * [Error|null, null|Object<string,any}], and then handling the error
  49. * scenarios.
  50. */
  51. if (req) {
  52. if (req.accountId) {
  53. log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`);
  54. } else {
  55. log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`);
  56. }
  57. } else {
  58. log.warn(`Error parsing message from redis: ${err}`);
  59. }
  60. return null;
  61. }
  62. };
  63. /**
  64. * @param {Object.<string, any>} env the `process.env` value to read configuration from
  65. * @returns {Object.<string, any>} the configuration for the PostgreSQL connection
  66. */
  67. const pgConfigFromEnv = (env) => {
  68. const pgConfigs = {
  69. development: {
  70. user: env.DB_USER || pg.defaults.user,
  71. password: env.DB_PASS || pg.defaults.password,
  72. database: env.DB_NAME || 'mastodon_development',
  73. host: env.DB_HOST || pg.defaults.host,
  74. port: env.DB_PORT || pg.defaults.port,
  75. },
  76. production: {
  77. user: env.DB_USER || 'mastodon',
  78. password: env.DB_PASS || '',
  79. database: env.DB_NAME || 'mastodon_production',
  80. host: env.DB_HOST || 'localhost',
  81. port: env.DB_PORT || 5432,
  82. },
  83. };
  84. let baseConfig;
  85. if (env.DATABASE_URL) {
  86. baseConfig = dbUrlToConfig(env.DATABASE_URL);
  87. // Support overriding the database password in the connection URL
  88. if (!baseConfig.password && env.DB_PASS) {
  89. baseConfig.password = env.DB_PASS;
  90. }
  91. } else {
  92. baseConfig = pgConfigs[environment];
  93. if (env.DB_SSLMODE) {
  94. switch(env.DB_SSLMODE) {
  95. case 'disable':
  96. case '':
  97. baseConfig.ssl = false;
  98. break;
  99. case 'no-verify':
  100. baseConfig.ssl = { rejectUnauthorized: false };
  101. break;
  102. default:
  103. baseConfig.ssl = {};
  104. break;
  105. }
  106. }
  107. }
  108. return {
  109. ...baseConfig,
  110. max: env.DB_POOL || 10,
  111. connectionTimeoutMillis: 15000,
  112. application_name: '',
  113. };
  114. };
  115. /**
  116. * @param {Object.<string, any>} env the `process.env` value to read configuration from
  117. * @returns {Object.<string, any>} configuration for the Redis connection
  118. */
  119. const redisConfigFromEnv = (env) => {
  120. // ioredis *can* transparently add prefixes for us, but it doesn't *in some cases*,
  121. // which means we can't use it. But this is something that should be looked into.
  122. const redisPrefix = env.REDIS_NAMESPACE ? `${env.REDIS_NAMESPACE}:` : '';
  123. const redisParams = {
  124. host: env.REDIS_HOST || '127.0.0.1',
  125. port: env.REDIS_PORT || 6379,
  126. db: env.REDIS_DB || 0,
  127. password: env.REDIS_PASSWORD || undefined,
  128. };
  129. // redisParams.path takes precedence over host and port.
  130. if (env.REDIS_URL && env.REDIS_URL.startsWith('unix://')) {
  131. redisParams.path = env.REDIS_URL.slice(7);
  132. }
  133. return {
  134. redisParams,
  135. redisPrefix,
  136. redisUrl: env.REDIS_URL,
  137. };
  138. };
  139. const PUBLIC_CHANNELS = [
  140. 'public',
  141. 'public:media',
  142. 'public:local',
  143. 'public:local:media',
  144. 'public:remote',
  145. 'public:remote:media',
  146. 'hashtag',
  147. 'hashtag:local',
  148. ];
  149. // Used for priming the counters/gauges for the various metrics that are
  150. // per-channel
  151. const CHANNEL_NAMES = [
  152. 'system',
  153. 'user',
  154. 'user:notification',
  155. 'list',
  156. 'direct',
  157. ...PUBLIC_CHANNELS
  158. ];
  159. const startServer = async () => {
  160. const app = express();
  161. app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal');
  162. const pgPool = new pg.Pool(pgConfigFromEnv(process.env));
  163. const server = http.createServer(app);
  164. /**
  165. * @type {Object.<string, Array.<function(Object<string, any>): void>>}
  166. */
  167. const subs = {};
  168. const redisConfig = redisConfigFromEnv(process.env);
  169. const redisSubscribeClient = await createRedisClient(redisConfig);
  170. const redisClient = await createRedisClient(redisConfig);
  171. const { redisPrefix } = redisConfig;
  172. const metrics = setupMetrics(CHANNEL_NAMES, pgPool);
  173. // TODO: migrate all metrics to metrics.X.method() instead of just X.method()
  174. const {
  175. connectedClients,
  176. connectedChannels,
  177. redisSubscriptions,
  178. redisMessagesReceived,
  179. messagesSent,
  180. } = metrics;
  181. // When checking metrics in the browser, the favicon is requested this
  182. // prevents the request from falling through to the API Router, which would
  183. // error for this endpoint:
  184. app.get('/favicon.ico', (req, res) => res.status(404).end());
  185. app.get('/api/v1/streaming/health', (req, res) => {
  186. res.writeHead(200, { 'Content-Type': 'text/plain' });
  187. res.end('OK');
  188. });
  189. app.get('/metrics', async (req, res) => {
  190. try {
  191. res.set('Content-Type', metrics.register.contentType);
  192. res.end(await metrics.register.metrics());
  193. } catch (ex) {
  194. log.error(ex);
  195. res.status(500).end();
  196. }
  197. });
  198. /**
  199. * @param {string[]} channels
  200. * @returns {function(): void}
  201. */
  202. const subscriptionHeartbeat = channels => {
  203. const interval = 6 * 60;
  204. const tellSubscribed = () => {
  205. channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3));
  206. };
  207. tellSubscribed();
  208. const heartbeat = setInterval(tellSubscribed, interval * 1000);
  209. return () => {
  210. clearInterval(heartbeat);
  211. };
  212. };
  213. /**
  214. * @param {string} channel
  215. * @param {string} message
  216. */
  217. const onRedisMessage = (channel, message) => {
  218. redisMessagesReceived.inc();
  219. const callbacks = subs[channel];
  220. log.silly(`New message on channel ${redisPrefix}${channel}`);
  221. if (!callbacks) {
  222. return;
  223. }
  224. const json = parseJSON(message, null);
  225. if (!json) return;
  226. callbacks.forEach(callback => callback(json));
  227. };
  228. redisSubscribeClient.on("message", onRedisMessage);
  229. /**
  230. * @callback SubscriptionListener
  231. * @param {ReturnType<parseJSON>} json of the message
  232. * @returns void
  233. */
  234. /**
  235. * @param {string} channel
  236. * @param {SubscriptionListener} callback
  237. */
  238. const subscribe = (channel, callback) => {
  239. log.silly(`Adding listener for ${channel}`);
  240. subs[channel] = subs[channel] || [];
  241. if (subs[channel].length === 0) {
  242. log.verbose(`Subscribe ${channel}`);
  243. redisSubscribeClient.subscribe(channel, (err, count) => {
  244. if (err) {
  245. log.error(`Error subscribing to ${channel}`);
  246. }
  247. else {
  248. redisSubscriptions.set(count);
  249. }
  250. });
  251. }
  252. subs[channel].push(callback);
  253. };
  254. /**
  255. * @param {string} channel
  256. * @param {SubscriptionListener} callback
  257. */
  258. const unsubscribe = (channel, callback) => {
  259. log.silly(`Removing listener for ${channel}`);
  260. if (!subs[channel]) {
  261. return;
  262. }
  263. subs[channel] = subs[channel].filter(item => item !== callback);
  264. if (subs[channel].length === 0) {
  265. log.verbose(`Unsubscribe ${channel}`);
  266. redisSubscribeClient.unsubscribe(channel, (err, count) => {
  267. if (err) {
  268. log.error(`Error unsubscribing to ${channel}`);
  269. }
  270. else {
  271. redisSubscriptions.set(count);
  272. }
  273. });
  274. delete subs[channel];
  275. }
  276. };
  277. /**
  278. * @param {any} req
  279. * @param {any} res
  280. * @param {function(Error=): void} next
  281. */
  282. const allowCrossDomain = (req, res, next) => {
  283. res.header('Access-Control-Allow-Origin', '*');
  284. res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control');
  285. res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
  286. next();
  287. };
  288. /**
  289. * @param {any} req
  290. * @param {any} res
  291. * @param {function(Error=): void} next
  292. */
  293. const setRequestId = (req, res, next) => {
  294. req.requestId = uuid.v4();
  295. res.header('X-Request-Id', req.requestId);
  296. next();
  297. };
  298. /**
  299. * @param {any} req
  300. * @param {any} res
  301. * @param {function(Error=): void} next
  302. */
  303. const setRemoteAddress = (req, res, next) => {
  304. req.remoteAddress = req.connection.remoteAddress;
  305. next();
  306. };
  307. /**
  308. * @param {any} req
  309. * @param {string[]} necessaryScopes
  310. * @returns {boolean}
  311. */
  312. const isInScope = (req, necessaryScopes) =>
  313. req.scopes.some(scope => necessaryScopes.includes(scope));
  314. /**
  315. * @param {string} token
  316. * @param {any} req
  317. * @returns {Promise.<void>}
  318. */
  319. const accountFromToken = (token, req) => new Promise((resolve, reject) => {
  320. pgPool.connect((err, client, done) => {
  321. if (err) {
  322. reject(err);
  323. return;
  324. }
  325. client.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
  326. done();
  327. if (err) {
  328. reject(err);
  329. return;
  330. }
  331. if (result.rows.length === 0) {
  332. err = new Error('Invalid access token');
  333. err.status = 401;
  334. reject(err);
  335. return;
  336. }
  337. req.accessTokenId = result.rows[0].id;
  338. req.scopes = result.rows[0].scopes.split(' ');
  339. req.accountId = result.rows[0].account_id;
  340. req.chosenLanguages = result.rows[0].chosen_languages;
  341. req.deviceId = result.rows[0].device_id;
  342. resolve();
  343. });
  344. });
  345. });
  346. /**
  347. * @param {any} req
  348. * @returns {Promise.<void>}
  349. */
  350. const accountFromRequest = (req) => new Promise((resolve, reject) => {
  351. const authorization = req.headers.authorization;
  352. const location = url.parse(req.url, true);
  353. const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
  354. if (!authorization && !accessToken) {
  355. const err = new Error('Missing access token');
  356. err.status = 401;
  357. reject(err);
  358. return;
  359. }
  360. const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
  361. resolve(accountFromToken(token, req));
  362. });
  363. /**
  364. * @param {any} req
  365. * @returns {string|undefined}
  366. */
  367. const channelNameFromPath = req => {
  368. const { path, query } = req;
  369. const onlyMedia = isTruthy(query.only_media);
  370. switch (path) {
  371. case '/api/v1/streaming/user':
  372. return 'user';
  373. case '/api/v1/streaming/user/notification':
  374. return 'user:notification';
  375. case '/api/v1/streaming/public':
  376. return onlyMedia ? 'public:media' : 'public';
  377. case '/api/v1/streaming/public/local':
  378. return onlyMedia ? 'public:local:media' : 'public:local';
  379. case '/api/v1/streaming/public/remote':
  380. return onlyMedia ? 'public:remote:media' : 'public:remote';
  381. case '/api/v1/streaming/hashtag':
  382. return 'hashtag';
  383. case '/api/v1/streaming/hashtag/local':
  384. return 'hashtag:local';
  385. case '/api/v1/streaming/direct':
  386. return 'direct';
  387. case '/api/v1/streaming/list':
  388. return 'list';
  389. default:
  390. return undefined;
  391. }
  392. };
  393. /**
  394. * @param {any} req
  395. * @param {string|undefined} channelName
  396. * @returns {Promise.<void>}
  397. */
  398. const checkScopes = (req, channelName) => new Promise((resolve, reject) => {
  399. log.silly(req.requestId, `Checking OAuth scopes for ${channelName}`);
  400. // When accessing public channels, no scopes are needed
  401. if (PUBLIC_CHANNELS.includes(channelName)) {
  402. resolve();
  403. return;
  404. }
  405. // The `read` scope has the highest priority, if the token has it
  406. // then it can access all streams
  407. const requiredScopes = ['read'];
  408. // When accessing specifically the notifications stream,
  409. // we need a read:notifications, while in all other cases,
  410. // we can allow access with read:statuses. Mind that the
  411. // user stream will not contain notifications unless
  412. // the token has either read or read:notifications scope
  413. // as well, this is handled separately.
  414. if (channelName === 'user:notification') {
  415. requiredScopes.push('read:notifications');
  416. } else {
  417. requiredScopes.push('read:statuses');
  418. }
  419. if (req.scopes && requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) {
  420. resolve();
  421. return;
  422. }
  423. const err = new Error('Access token does not cover required scopes');
  424. err.status = 401;
  425. reject(err);
  426. });
  427. /**
  428. * @param {any} info
  429. * @param {function(boolean, number, string): void} callback
  430. */
  431. const wsVerifyClient = (info, callback) => {
  432. // When verifying the websockets connection, we no longer pre-emptively
  433. // check OAuth scopes and drop the connection if they're missing. We only
  434. // drop the connection if access without token is not allowed by environment
  435. // variables. OAuth scope checks are moved to the point of subscription
  436. // to a specific stream.
  437. accountFromRequest(info.req).then(() => {
  438. callback(true, undefined, undefined);
  439. }).catch(err => {
  440. log.error(info.req.requestId, err.toString());
  441. callback(false, 401, 'Unauthorized');
  442. });
  443. };
  444. /**
  445. * @typedef SystemMessageHandlers
  446. * @property {function(): void} onKill
  447. */
  448. /**
  449. * @param {any} req
  450. * @param {SystemMessageHandlers} eventHandlers
  451. * @returns {function(object): void}
  452. */
  453. const createSystemMessageListener = (req, eventHandlers) => {
  454. return message => {
  455. const { event } = message;
  456. log.silly(req.requestId, `System message for ${req.accountId}: ${event}`);
  457. if (event === 'kill') {
  458. log.verbose(req.requestId, `Closing connection for ${req.accountId} due to expired access token`);
  459. eventHandlers.onKill();
  460. } else if (event === 'filters_changed') {
  461. log.verbose(req.requestId, `Invalidating filters cache for ${req.accountId}`);
  462. req.cachedFilters = null;
  463. }
  464. };
  465. };
  466. /**
  467. * @param {any} req
  468. * @param {any} res
  469. */
  470. const subscribeHttpToSystemChannel = (req, res) => {
  471. const accessTokenChannelId = `timeline:access_token:${req.accessTokenId}`;
  472. const systemChannelId = `timeline:system:${req.accountId}`;
  473. const listener = createSystemMessageListener(req, {
  474. onKill() {
  475. res.end();
  476. },
  477. });
  478. res.on('close', () => {
  479. unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  480. unsubscribe(`${redisPrefix}${systemChannelId}`, listener);
  481. connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2);
  482. });
  483. subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  484. subscribe(`${redisPrefix}${systemChannelId}`, listener);
  485. connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2);
  486. };
  487. /**
  488. * @param {any} req
  489. * @param {any} res
  490. * @param {function(Error=): void} next
  491. */
  492. const authenticationMiddleware = (req, res, next) => {
  493. if (req.method === 'OPTIONS') {
  494. next();
  495. return;
  496. }
  497. const channelName = channelNameFromPath(req);
  498. // If no channelName can be found for the request, then we should terminate
  499. // the connection, as there's nothing to stream back
  500. if (!channelName) {
  501. const err = new Error('Unknown channel requested');
  502. err.status = 400;
  503. next(err);
  504. return;
  505. }
  506. accountFromRequest(req).then(() => checkScopes(req, channelName)).then(() => {
  507. subscribeHttpToSystemChannel(req, res);
  508. }).then(() => {
  509. next();
  510. }).catch(err => {
  511. next(err);
  512. });
  513. };
  514. /**
  515. * @param {Error} err
  516. * @param {any} req
  517. * @param {any} res
  518. * @param {function(Error=): void} next
  519. */
  520. const errorMiddleware = (err, req, res, next) => {
  521. log.error(req.requestId, err.toString());
  522. if (res.headersSent) {
  523. next(err);
  524. return;
  525. }
  526. res.writeHead(err.status || 500, { 'Content-Type': 'application/json' });
  527. res.end(JSON.stringify({ error: err.status ? err.toString() : 'An unexpected error occurred' }));
  528. };
  529. /**
  530. * @param {array} arr
  531. * @param {number=} shift
  532. * @returns {string}
  533. */
  534. const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
  535. /**
  536. * @param {string} listId
  537. * @param {any} req
  538. * @returns {Promise.<void>}
  539. */
  540. const authorizeListAccess = (listId, req) => new Promise((resolve, reject) => {
  541. const { accountId } = req;
  542. pgPool.connect((err, client, done) => {
  543. if (err) {
  544. reject();
  545. return;
  546. }
  547. client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [listId], (err, result) => {
  548. done();
  549. if (err || result.rows.length === 0 || result.rows[0].account_id !== accountId) {
  550. reject();
  551. return;
  552. }
  553. resolve();
  554. });
  555. });
  556. });
  557. /**
  558. * @param {string[]} ids
  559. * @param {any} req
  560. * @param {function(string, string): void} output
  561. * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
  562. * @param {'websocket' | 'eventsource'} destinationType
  563. * @param {boolean=} needsFiltering
  564. * @returns {SubscriptionListener}
  565. */
  566. const streamFrom = (ids, req, output, attachCloseHandler, destinationType, needsFiltering = false) => {
  567. const accountId = req.accountId || req.remoteAddress;
  568. log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
  569. const transmit = (event, payload) => {
  570. // TODO: Replace "string"-based delete payloads with object payloads:
  571. const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
  572. messagesSent.labels({ type: destinationType }).inc(1);
  573. log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`);
  574. output(event, encodedPayload);
  575. };
  576. // The listener used to process each message off the redis subscription,
  577. // message here is an object with an `event` and `payload` property. Some
  578. // events also include a queued_at value, but this is being removed shortly.
  579. /** @type {SubscriptionListener} */
  580. const listener = message => {
  581. const { event, payload } = message;
  582. // Streaming only needs to apply filtering to some channels and only to
  583. // some events. This is because majority of the filtering happens on the
  584. // Ruby on Rails side when producing the event for streaming.
  585. //
  586. // The only events that require filtering from the streaming server are
  587. // `update` and `status.update`, all other events are transmitted to the
  588. // client as soon as they're received (pass-through).
  589. //
  590. // The channels that need filtering are determined in the function
  591. // `channelNameToIds` defined below:
  592. if (!needsFiltering || (event !== 'update' && event !== 'status.update')) {
  593. transmit(event, payload);
  594. return;
  595. }
  596. // The rest of the logic from here on in this function is to handle
  597. // filtering of statuses:
  598. // Filter based on language:
  599. if (Array.isArray(req.chosenLanguages) && payload.language !== null && req.chosenLanguages.indexOf(payload.language) === -1) {
  600. log.silly(req.requestId, `Message ${payload.id} filtered by language (${payload.language})`);
  601. return;
  602. }
  603. // When the account is not logged in, it is not necessary to confirm the block or mute
  604. if (!req.accountId) {
  605. transmit(event, payload);
  606. return;
  607. }
  608. // Filter based on domain blocks, blocks, mutes, or custom filters:
  609. const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id));
  610. const accountDomain = payload.account.acct.split('@')[1];
  611. // TODO: Move this logic out of the message handling loop
  612. pgPool.connect((err, client, releasePgConnection) => {
  613. if (err) {
  614. log.error(err);
  615. return;
  616. }
  617. const queries = [
  618. client.query(`SELECT 1
  619. FROM blocks
  620. WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)}))
  621. OR (account_id = $2 AND target_account_id = $1)
  622. UNION
  623. SELECT 1
  624. FROM mutes
  625. WHERE account_id = $1
  626. AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.account.id].concat(targetAccountIds)),
  627. ];
  628. if (accountDomain) {
  629. queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
  630. }
  631. if (!payload.filtered && !req.cachedFilters) {
  632. queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId]));
  633. }
  634. Promise.all(queries).then(values => {
  635. releasePgConnection();
  636. // Handling blocks & mutes and domain blocks: If one of those applies,
  637. // then we don't transmit the payload of the event to the client
  638. if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
  639. return;
  640. }
  641. // If the payload already contains the `filtered` property, it means
  642. // that filtering has been applied on the ruby on rails side, as
  643. // such, we don't need to construct or apply the filters in streaming:
  644. if (Object.prototype.hasOwnProperty.call(payload, "filtered")) {
  645. transmit(event, payload);
  646. return;
  647. }
  648. // Handling for constructing the custom filters and caching them on the request
  649. // TODO: Move this logic out of the message handling lifecycle
  650. if (!req.cachedFilters) {
  651. const filterRows = values[accountDomain ? 2 : 1].rows;
  652. req.cachedFilters = filterRows.reduce((cache, filter) => {
  653. if (cache[filter.id]) {
  654. cache[filter.id].keywords.push([filter.keyword, filter.whole_word]);
  655. } else {
  656. cache[filter.id] = {
  657. keywords: [[filter.keyword, filter.whole_word]],
  658. expires_at: filter.expires_at,
  659. filter: {
  660. id: filter.id,
  661. title: filter.title,
  662. context: filter.context,
  663. expires_at: filter.expires_at,
  664. // filter.filter_action is the value from the
  665. // custom_filters.action database column, it is an integer
  666. // representing a value in an enum defined by Ruby on Rails:
  667. //
  668. // enum { warn: 0, hide: 1 }
  669. filter_action: ['warn', 'hide'][filter.filter_action],
  670. },
  671. };
  672. }
  673. return cache;
  674. }, {});
  675. // Construct the regular expressions for the custom filters: This
  676. // needs to be done in a separate loop as the database returns one
  677. // filterRow per keyword, so we need all the keywords before
  678. // constructing the regular expression
  679. Object.keys(req.cachedFilters).forEach((key) => {
  680. req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
  681. let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
  682. if (whole_word) {
  683. if (/^[\w]/.test(expr)) {
  684. expr = `\\b${expr}`;
  685. }
  686. if (/[\w]$/.test(expr)) {
  687. expr = `${expr}\\b`;
  688. }
  689. }
  690. return expr;
  691. }).join('|'), 'i');
  692. });
  693. }
  694. // Apply cachedFilters against the payload, constructing a
  695. // `filter_results` array of FilterResult entities
  696. if (req.cachedFilters) {
  697. const status = payload;
  698. // TODO: Calculate searchableContent in Ruby on Rails:
  699. const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
  700. const searchableTextContent = JSDOM.fragment(searchableContent).textContent;
  701. const now = new Date();
  702. const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => {
  703. // Check the filter hasn't expired before applying:
  704. if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) {
  705. return results;
  706. }
  707. // Just in-case JSDOM fails to find textContent in searchableContent
  708. if (!searchableTextContent) {
  709. return results;
  710. }
  711. const keyword_matches = searchableTextContent.match(cachedFilter.regexp);
  712. if (keyword_matches) {
  713. // results is an Array of FilterResult; status_matches is always
  714. // null as we only are only applying the keyword-based custom
  715. // filters, not the status-based custom filters.
  716. // https://docs.joinmastodon.org/entities/FilterResult/
  717. results.push({
  718. filter: cachedFilter.filter,
  719. keyword_matches,
  720. status_matches: null
  721. });
  722. }
  723. return results;
  724. }, []);
  725. // Send the payload + the FilterResults as the `filtered` property
  726. // to the streaming connection. To reach this code, the `event` must
  727. // have been either `update` or `status.update`, meaning the
  728. // `payload` is a Status entity, which has a `filtered` property:
  729. //
  730. // filtered: https://docs.joinmastodon.org/entities/Status/#filtered
  731. transmit(event, {
  732. ...payload,
  733. filtered: filter_results
  734. });
  735. } else {
  736. transmit(event, payload);
  737. }
  738. }).catch(err => {
  739. log.error(err);
  740. releasePgConnection();
  741. });
  742. });
  743. };
  744. ids.forEach(id => {
  745. subscribe(`${redisPrefix}${id}`, listener);
  746. });
  747. if (typeof attachCloseHandler === 'function') {
  748. attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
  749. }
  750. return listener;
  751. };
  752. /**
  753. * @param {any} req
  754. * @param {any} res
  755. * @returns {function(string, string): void}
  756. */
  757. const streamToHttp = (req, res) => {
  758. const accountId = req.accountId || req.remoteAddress;
  759. const channelName = channelNameFromPath(req);
  760. connectedClients.labels({ type: 'eventsource' }).inc();
  761. // In theory we'll always have a channel name, but channelNameFromPath can return undefined:
  762. if (typeof channelName === 'string') {
  763. connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc();
  764. }
  765. res.setHeader('Content-Type', 'text/event-stream');
  766. res.setHeader('Cache-Control', 'no-store');
  767. res.setHeader('Transfer-Encoding', 'chunked');
  768. res.write(':)\n');
  769. const heartbeat = setInterval(() => res.write(':thump\n'), 15000);
  770. req.on('close', () => {
  771. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  772. // We decrement these counters here instead of in streamHttpEnd as in that
  773. // method we don't have knowledge of the channel names
  774. connectedClients.labels({ type: 'eventsource' }).dec();
  775. // In theory we'll always have a channel name, but channelNameFromPath can return undefined:
  776. if (typeof channelName === 'string') {
  777. connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec();
  778. }
  779. clearInterval(heartbeat);
  780. });
  781. return (event, payload) => {
  782. res.write(`event: ${event}\n`);
  783. res.write(`data: ${payload}\n\n`);
  784. };
  785. };
  786. /**
  787. * @param {any} req
  788. * @param {function(): void} [closeHandler]
  789. * @returns {function(string[], SubscriptionListener): void}
  790. */
  791. const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
  792. req.on('close', () => {
  793. ids.forEach(id => {
  794. unsubscribe(id, listener);
  795. });
  796. if (closeHandler) {
  797. closeHandler();
  798. }
  799. });
  800. };
  801. /**
  802. * @param {any} req
  803. * @param {any} ws
  804. * @param {string[]} streamName
  805. * @returns {function(string, string): void}
  806. */
  807. const streamToWs = (req, ws, streamName) => (event, payload) => {
  808. if (ws.readyState !== ws.OPEN) {
  809. log.error(req.requestId, 'Tried writing to closed socket');
  810. return;
  811. }
  812. ws.send(JSON.stringify({ stream: streamName, event, payload }), (err) => {
  813. if (err) {
  814. log.error(req.requestId, `Failed to send to websocket: ${err}`);
  815. }
  816. });
  817. };
  818. /**
  819. * @param {any} res
  820. */
  821. const httpNotFound = res => {
  822. res.writeHead(404, { 'Content-Type': 'application/json' });
  823. res.end(JSON.stringify({ error: 'Not found' }));
  824. };
  825. const api = express.Router();
  826. app.use(api);
  827. api.use(setRequestId);
  828. api.use(setRemoteAddress);
  829. api.use(allowCrossDomain);
  830. api.use(authenticationMiddleware);
  831. api.use(errorMiddleware);
  832. api.get('/api/v1/streaming/*', (req, res) => {
  833. channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
  834. const onSend = streamToHttp(req, res);
  835. const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
  836. streamFrom(channelIds, req, onSend, onEnd, 'eventsource', options.needsFiltering);
  837. }).catch(err => {
  838. log.verbose(req.requestId, 'Subscription error:', err.toString());
  839. httpNotFound(res);
  840. });
  841. });
  842. const wss = new WebSocket.Server({ server, verifyClient: wsVerifyClient });
  843. /**
  844. * @typedef StreamParams
  845. * @property {string} [tag]
  846. * @property {string} [list]
  847. * @property {string} [only_media]
  848. */
  849. /**
  850. * @param {any} req
  851. * @returns {string[]}
  852. */
  853. const channelsForUserStream = req => {
  854. const arr = [`timeline:${req.accountId}`];
  855. if (isInScope(req, ['crypto']) && req.deviceId) {
  856. arr.push(`timeline:${req.accountId}:${req.deviceId}`);
  857. }
  858. if (isInScope(req, ['read', 'read:notifications'])) {
  859. arr.push(`timeline:${req.accountId}:notifications`);
  860. }
  861. return arr;
  862. };
  863. /**
  864. * See app/lib/ascii_folder.rb for the canon definitions
  865. * of these constants
  866. */
  867. const NON_ASCII_CHARS = 'ÀÁÂÃÄÅàáâãäåĀāĂ㥹ÇçĆćĈĉĊċČčÐðĎďĐđÈÉÊËèéêëĒēĔĕĖėĘęĚěĜĝĞğĠġĢģĤĥĦħÌÍÎÏìíîïĨĩĪīĬĭĮįİıĴĵĶķĸĹĺĻļĽľĿŀŁłÑñŃńŅņŇňʼnŊŋÒÓÔÕÖØòóôõöøŌōŎŏŐőŔŕŖŗŘřŚśŜŝŞşŠšſŢţŤťŦŧÙÚÛÜùúûüŨũŪūŬŭŮůŰűŲųŴŵÝýÿŶŷŸŹźŻżŽž';
  868. const EQUIVALENT_ASCII_CHARS = 'AAAAAAaaaaaaAaAaAaCcCcCcCcCcDdDdDdEEEEeeeeEeEeEeEeEeGgGgGgGgHhHhIIIIiiiiIiIiIiIiIiJjKkkLlLlLlLlLlNnNnNnNnnNnOOOOOOooooooOoOoOoRrRrRrSsSsSsSssTtTtTtUUUUuuuuUuUuUuUuUuUuWwYyyYyYZzZzZz';
  869. /**
  870. * @param {string} str
  871. * @returns {string}
  872. */
  873. const foldToASCII = str => {
  874. const regex = new RegExp(NON_ASCII_CHARS.split('').join('|'), 'g');
  875. return str.replace(regex, match => {
  876. const index = NON_ASCII_CHARS.indexOf(match);
  877. return EQUIVALENT_ASCII_CHARS[index];
  878. });
  879. };
  880. /**
  881. * @param {string} str
  882. * @returns {string}
  883. */
  884. const normalizeHashtag = str => {
  885. return foldToASCII(str.normalize('NFKC').toLowerCase()).replace(/[^\p{L}\p{N}_\u00b7\u200c]/gu, '');
  886. };
  887. /**
  888. * @param {any} req
  889. * @param {string} name
  890. * @param {StreamParams} params
  891. * @returns {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
  892. */
  893. const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
  894. switch (name) {
  895. case 'user':
  896. resolve({
  897. channelIds: channelsForUserStream(req),
  898. options: { needsFiltering: false },
  899. });
  900. break;
  901. case 'user:notification':
  902. resolve({
  903. channelIds: [`timeline:${req.accountId}:notifications`],
  904. options: { needsFiltering: false },
  905. });
  906. break;
  907. case 'public':
  908. resolve({
  909. channelIds: ['timeline:public'],
  910. options: { needsFiltering: true },
  911. });
  912. break;
  913. case 'public:local':
  914. resolve({
  915. channelIds: ['timeline:public:local'],
  916. options: { needsFiltering: true },
  917. });
  918. break;
  919. case 'public:remote':
  920. resolve({
  921. channelIds: ['timeline:public:remote'],
  922. options: { needsFiltering: true },
  923. });
  924. break;
  925. case 'public:media':
  926. resolve({
  927. channelIds: ['timeline:public:media'],
  928. options: { needsFiltering: true },
  929. });
  930. break;
  931. case 'public:local:media':
  932. resolve({
  933. channelIds: ['timeline:public:local:media'],
  934. options: { needsFiltering: true },
  935. });
  936. break;
  937. case 'public:remote:media':
  938. resolve({
  939. channelIds: ['timeline:public:remote:media'],
  940. options: { needsFiltering: true },
  941. });
  942. break;
  943. case 'direct':
  944. resolve({
  945. channelIds: [`timeline:direct:${req.accountId}`],
  946. options: { needsFiltering: false },
  947. });
  948. break;
  949. case 'hashtag':
  950. if (!params.tag || params.tag.length === 0) {
  951. reject('No tag for stream provided');
  952. } else {
  953. resolve({
  954. channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`],
  955. options: { needsFiltering: true },
  956. });
  957. }
  958. break;
  959. case 'hashtag:local':
  960. if (!params.tag || params.tag.length === 0) {
  961. reject('No tag for stream provided');
  962. } else {
  963. resolve({
  964. channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`],
  965. options: { needsFiltering: true },
  966. });
  967. }
  968. break;
  969. case 'list':
  970. authorizeListAccess(params.list, req).then(() => {
  971. resolve({
  972. channelIds: [`timeline:list:${params.list}`],
  973. options: { needsFiltering: false },
  974. });
  975. }).catch(() => {
  976. reject('Not authorized to stream this list');
  977. });
  978. break;
  979. default:
  980. reject('Unknown stream type');
  981. }
  982. });
  983. /**
  984. * @param {string} channelName
  985. * @param {StreamParams} params
  986. * @returns {string[]}
  987. */
  988. const streamNameFromChannelName = (channelName, params) => {
  989. if (channelName === 'list') {
  990. return [channelName, params.list];
  991. } else if (['hashtag', 'hashtag:local'].includes(channelName)) {
  992. return [channelName, params.tag];
  993. } else {
  994. return [channelName];
  995. }
  996. };
  997. /**
  998. * @typedef WebSocketSession
  999. * @property {any} socket
  1000. * @property {any} request
  1001. * @property {Object.<string, { channelName: string, listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
  1002. */
  1003. /**
  1004. * @param {WebSocketSession} session
  1005. * @param {string} channelName
  1006. * @param {StreamParams} params
  1007. * @returns {void}
  1008. */
  1009. const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) => {
  1010. checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({
  1011. channelIds,
  1012. options,
  1013. }) => {
  1014. if (subscriptions[channelIds.join(';')]) {
  1015. return;
  1016. }
  1017. const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
  1018. const stopHeartbeat = subscriptionHeartbeat(channelIds);
  1019. const listener = streamFrom(channelIds, request, onSend, undefined, 'websocket', options.needsFiltering);
  1020. connectedChannels.labels({ type: 'websocket', channel: channelName }).inc();
  1021. subscriptions[channelIds.join(';')] = {
  1022. channelName,
  1023. listener,
  1024. stopHeartbeat,
  1025. };
  1026. }).catch(err => {
  1027. log.verbose(request.requestId, 'Subscription error:', err.toString());
  1028. socket.send(JSON.stringify({ error: err.toString() }));
  1029. });
  1030. };
  1031. const removeSubscription = (subscriptions, channelIds, request) => {
  1032. log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`);
  1033. const subscription = subscriptions[channelIds.join(';')];
  1034. if (!subscription) {
  1035. return;
  1036. }
  1037. channelIds.forEach(channelId => {
  1038. unsubscribe(`${redisPrefix}${channelId}`, subscription.listener);
  1039. });
  1040. connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec();
  1041. subscription.stopHeartbeat();
  1042. delete subscriptions[channelIds.join(';')];
  1043. };
  1044. /**
  1045. * @param {WebSocketSession} session
  1046. * @param {string} channelName
  1047. * @param {StreamParams} params
  1048. * @returns {void}
  1049. */
  1050. const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) => {
  1051. channelNameToIds(request, channelName, params).then(({ channelIds }) => {
  1052. removeSubscription(subscriptions, channelIds, request);
  1053. }).catch(err => {
  1054. log.verbose(request.requestId, 'Unsubscribe error:', err);
  1055. // If we have a socket that is alive and open still, send the error back to the client:
  1056. // FIXME: In other parts of the code ws === socket
  1057. if (socket.isAlive && socket.readyState === socket.OPEN) {
  1058. socket.send(JSON.stringify({ error: "Error unsubscribing from channel" }));
  1059. }
  1060. });
  1061. };
  1062. /**
  1063. * @param {WebSocketSession} session
  1064. */
  1065. const subscribeWebsocketToSystemChannel = ({ socket, request, subscriptions }) => {
  1066. const accessTokenChannelId = `timeline:access_token:${request.accessTokenId}`;
  1067. const systemChannelId = `timeline:system:${request.accountId}`;
  1068. const listener = createSystemMessageListener(request, {
  1069. onKill() {
  1070. socket.close();
  1071. },
  1072. });
  1073. subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  1074. subscribe(`${redisPrefix}${systemChannelId}`, listener);
  1075. subscriptions[accessTokenChannelId] = {
  1076. channelName: 'system',
  1077. listener,
  1078. stopHeartbeat: () => {
  1079. },
  1080. };
  1081. subscriptions[systemChannelId] = {
  1082. channelName: 'system',
  1083. listener,
  1084. stopHeartbeat: () => {
  1085. },
  1086. };
  1087. connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2);
  1088. };
  1089. /**
  1090. * @param {string|string[]} arrayOrString
  1091. * @returns {string}
  1092. */
  1093. const firstParam = arrayOrString => {
  1094. if (Array.isArray(arrayOrString)) {
  1095. return arrayOrString[0];
  1096. } else {
  1097. return arrayOrString;
  1098. }
  1099. };
  1100. wss.on('connection', (ws, req) => {
  1101. // Note: url.parse could throw, which would terminate the connection, so we
  1102. // increment the connected clients metric straight away when we establish
  1103. // the connection, without waiting:
  1104. connectedClients.labels({ type: 'websocket' }).inc();
  1105. // Setup request properties:
  1106. req.requestId = uuid.v4();
  1107. req.remoteAddress = ws._socket.remoteAddress;
  1108. // Setup connection keep-alive state:
  1109. ws.isAlive = true;
  1110. ws.on('pong', () => {
  1111. ws.isAlive = true;
  1112. });
  1113. /**
  1114. * @type {WebSocketSession}
  1115. */
  1116. const session = {
  1117. socket: ws,
  1118. request: req,
  1119. subscriptions: {},
  1120. };
  1121. ws.on('close', function onWebsocketClose() {
  1122. const subscriptions = Object.keys(session.subscriptions);
  1123. subscriptions.forEach(channelIds => {
  1124. removeSubscription(session.subscriptions, channelIds.split(';'), req);
  1125. });
  1126. // Decrement the metrics for connected clients:
  1127. connectedClients.labels({ type: 'websocket' }).dec();
  1128. // ensure garbage collection:
  1129. session.socket = null;
  1130. session.request = null;
  1131. session.subscriptions = {};
  1132. });
  1133. // Note: immediately after the `error` event is emitted, the `close` event
  1134. // is emitted. As such, all we need to do is log the error here.
  1135. ws.on('error', (err) => {
  1136. log.error('websocket', err.toString());
  1137. });
  1138. ws.on('message', (data, isBinary) => {
  1139. if (isBinary) {
  1140. log.warn('websocket', 'Received binary data, closing connection');
  1141. ws.close(1003, 'The mastodon streaming server does not support binary messages');
  1142. return;
  1143. }
  1144. const message = data.toString('utf8');
  1145. const json = parseJSON(message, session.request);
  1146. if (!json) return;
  1147. const { type, stream, ...params } = json;
  1148. if (type === 'subscribe') {
  1149. subscribeWebsocketToChannel(session, firstParam(stream), params);
  1150. } else if (type === 'unsubscribe') {
  1151. unsubscribeWebsocketFromChannel(session, firstParam(stream), params);
  1152. } else {
  1153. // Unknown action type
  1154. }
  1155. });
  1156. subscribeWebsocketToSystemChannel(session);
  1157. // Parse the URL for the connection arguments (if supplied), url.parse can throw:
  1158. const location = req.url && url.parse(req.url, true);
  1159. if (location && location.query.stream) {
  1160. subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query);
  1161. }
  1162. });
  1163. setInterval(() => {
  1164. wss.clients.forEach(ws => {
  1165. if (ws.isAlive === false) {
  1166. ws.terminate();
  1167. return;
  1168. }
  1169. ws.isAlive = false;
  1170. ws.ping('', false);
  1171. });
  1172. }, 30000);
  1173. attachServerWithConfig(server, address => {
  1174. log.warn(`Streaming API now listening on ${address}`);
  1175. });
  1176. const onExit = () => {
  1177. server.close();
  1178. process.exit(0);
  1179. };
  1180. const onError = (err) => {
  1181. log.error(err);
  1182. server.close();
  1183. process.exit(0);
  1184. };
  1185. process.on('SIGINT', onExit);
  1186. process.on('SIGTERM', onExit);
  1187. process.on('exit', onExit);
  1188. process.on('uncaughtException', onError);
  1189. };
  1190. /**
  1191. * @param {any} server
  1192. * @param {function(string): void} [onSuccess]
  1193. */
  1194. const attachServerWithConfig = (server, onSuccess) => {
  1195. if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
  1196. server.listen(process.env.SOCKET || process.env.PORT, () => {
  1197. if (onSuccess) {
  1198. fs.chmodSync(server.address(), 0o666);
  1199. onSuccess(server.address());
  1200. }
  1201. });
  1202. } else {
  1203. server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => {
  1204. if (onSuccess) {
  1205. onSuccess(`${server.address().address}:${server.address().port}`);
  1206. }
  1207. });
  1208. }
  1209. };
  1210. startServer();