index.js 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405
  1. // @ts-check
  2. const os = require('os');
  3. const throng = require('throng');
  4. const dotenv = require('dotenv');
  5. const express = require('express');
  6. const http = require('http');
  7. const redis = require('redis');
  8. const pg = require('pg');
  9. const log = require('npmlog');
  10. const url = require('url');
  11. const uuid = require('uuid');
  12. const fs = require('fs');
  13. const WebSocket = require('ws');
  14. const { JSDOM } = require('jsdom');
  15. const env = process.env.NODE_ENV || 'development';
  16. const alwaysRequireAuth = process.env.LIMITED_FEDERATION_MODE === 'true' || process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true';
  17. dotenv.config({
  18. path: env === 'production' ? '.env.production' : '.env',
  19. });
  20. log.level = process.env.LOG_LEVEL || 'verbose';
  21. /**
  22. * @param {string} dbUrl
  23. * @return {Object.<string, any>}
  24. */
  25. const dbUrlToConfig = (dbUrl) => {
  26. if (!dbUrl) {
  27. return {};
  28. }
  29. const params = url.parse(dbUrl, true);
  30. const config = {};
  31. if (params.auth) {
  32. [config.user, config.password] = params.auth.split(':');
  33. }
  34. if (params.hostname) {
  35. config.host = params.hostname;
  36. }
  37. if (params.port) {
  38. config.port = params.port;
  39. }
  40. if (params.pathname) {
  41. config.database = params.pathname.split('/')[1];
  42. }
  43. const ssl = params.query && params.query.ssl;
  44. if (ssl && ssl === 'true' || ssl === '1') {
  45. config.ssl = true;
  46. }
  47. return config;
  48. };
  49. /**
  50. * @param {Object.<string, any>} defaultConfig
  51. * @param {string} redisUrl
  52. */
  53. const redisUrlToClient = async (defaultConfig, redisUrl) => {
  54. const config = defaultConfig;
  55. let client;
  56. if (!redisUrl) {
  57. client = redis.createClient(config);
  58. } else if (redisUrl.startsWith('unix://')) {
  59. client = redis.createClient(Object.assign(config, {
  60. socket: {
  61. path: redisUrl.slice(7),
  62. },
  63. }));
  64. } else {
  65. client = redis.createClient(Object.assign(config, {
  66. url: redisUrl,
  67. }));
  68. }
  69. client.on('error', (err) => log.error('Redis Client Error!', err));
  70. await client.connect();
  71. return client;
  72. };
  73. const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
  74. /**
  75. * Attempts to safely parse a string as JSON, used when both receiving a message
  76. * from redis and when receiving a message from a client over a websocket
  77. * connection, this is why it accepts a `req` argument.
  78. * @param {string} json
  79. * @param {any?} req
  80. * @returns {Object.<string, any>|null}
  81. */
  82. const parseJSON = (json, req) => {
  83. try {
  84. return JSON.parse(json);
  85. } catch (err) {
  86. /* FIXME: This logging isn't great, and should probably be done at the
  87. * call-site of parseJSON, not in the method, but this would require changing
  88. * the signature of parseJSON to return something akin to a Result type:
  89. * [Error|null, null|Object<string,any}], and then handling the error
  90. * scenarios.
  91. */
  92. if (req) {
  93. if (req.accountId) {
  94. log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`);
  95. } else {
  96. log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`);
  97. }
  98. } else {
  99. log.warn(`Error parsing message from redis: ${err}`);
  100. }
  101. return null;
  102. }
  103. };
  104. const startMaster = () => {
  105. if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
  106. log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
  107. }
  108. log.warn(`Starting streaming API server master with ${numWorkers} workers`);
  109. };
  110. const startWorker = async (workerId) => {
  111. log.warn(`Starting worker ${workerId}`);
  112. const pgConfigs = {
  113. development: {
  114. user: process.env.DB_USER || pg.defaults.user,
  115. password: process.env.DB_PASS || pg.defaults.password,
  116. database: process.env.DB_NAME || 'mastodon_development',
  117. host: process.env.DB_HOST || pg.defaults.host,
  118. port: process.env.DB_PORT || pg.defaults.port,
  119. },
  120. production: {
  121. user: process.env.DB_USER || 'mastodon',
  122. password: process.env.DB_PASS || '',
  123. database: process.env.DB_NAME || 'mastodon_production',
  124. host: process.env.DB_HOST || 'localhost',
  125. port: process.env.DB_PORT || 5432,
  126. },
  127. };
  128. const app = express();
  129. app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal');
  130. const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL), {
  131. max: process.env.DB_POOL || 10,
  132. connectionTimeoutMillis: 15000,
  133. ssl: !!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable',
  134. }));
  135. const server = http.createServer(app);
  136. const redisNamespace = process.env.REDIS_NAMESPACE || null;
  137. const redisParams = {
  138. socket: {
  139. host: process.env.REDIS_HOST || '127.0.0.1',
  140. port: process.env.REDIS_PORT || 6379,
  141. },
  142. database: process.env.REDIS_DB || 0,
  143. password: process.env.REDIS_PASSWORD || undefined,
  144. };
  145. if (redisNamespace) {
  146. redisParams.namespace = redisNamespace;
  147. }
  148. const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
  149. /**
  150. * @type {Object.<string, Array.<function(Object<string, any>): void>>}
  151. */
  152. const subs = {};
  153. const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
  154. const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
  155. /**
  156. * @param {string[]} channels
  157. * @return {function(): void}
  158. */
  159. const subscriptionHeartbeat = channels => {
  160. const interval = 6 * 60;
  161. const tellSubscribed = () => {
  162. channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3));
  163. };
  164. tellSubscribed();
  165. const heartbeat = setInterval(tellSubscribed, interval * 1000);
  166. return () => {
  167. clearInterval(heartbeat);
  168. };
  169. };
  170. /**
  171. * @param {string} message
  172. * @param {string} channel
  173. */
  174. const onRedisMessage = (message, channel) => {
  175. const callbacks = subs[channel];
  176. log.silly(`New message on channel ${channel}`);
  177. if (!callbacks) {
  178. return;
  179. }
  180. const json = parseJSON(message, null);
  181. if (!json) return;
  182. callbacks.forEach(callback => callback(json));
  183. };
  184. /**
  185. * @callback SubscriptionListener
  186. * @param {ReturnType<parseJSON>} json of the message
  187. * @returns void
  188. */
  189. /**
  190. * @param {string} channel
  191. * @param {SubscriptionListener} callback
  192. */
  193. const subscribe = (channel, callback) => {
  194. log.silly(`Adding listener for ${channel}`);
  195. subs[channel] = subs[channel] || [];
  196. if (subs[channel].length === 0) {
  197. log.verbose(`Subscribe ${channel}`);
  198. redisSubscribeClient.subscribe(channel, onRedisMessage);
  199. }
  200. subs[channel].push(callback);
  201. };
  202. /**
  203. * @param {string} channel
  204. * @param {SubscriptionListener} callback
  205. */
  206. const unsubscribe = (channel, callback) => {
  207. log.silly(`Removing listener for ${channel}`);
  208. if (!subs[channel]) {
  209. return;
  210. }
  211. subs[channel] = subs[channel].filter(item => item !== callback);
  212. if (subs[channel].length === 0) {
  213. log.verbose(`Unsubscribe ${channel}`);
  214. redisSubscribeClient.unsubscribe(channel);
  215. delete subs[channel];
  216. }
  217. };
  218. const FALSE_VALUES = [
  219. false,
  220. 0,
  221. '0',
  222. 'f',
  223. 'F',
  224. 'false',
  225. 'FALSE',
  226. 'off',
  227. 'OFF',
  228. ];
  229. /**
  230. * @param {any} value
  231. * @return {boolean}
  232. */
  233. const isTruthy = value =>
  234. value && !FALSE_VALUES.includes(value);
  235. /**
  236. * @param {any} req
  237. * @param {any} res
  238. * @param {function(Error=): void}
  239. */
  240. const allowCrossDomain = (req, res, next) => {
  241. res.header('Access-Control-Allow-Origin', '*');
  242. res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control');
  243. res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
  244. next();
  245. };
  246. /**
  247. * @param {any} req
  248. * @param {any} res
  249. * @param {function(Error=): void}
  250. */
  251. const setRequestId = (req, res, next) => {
  252. req.requestId = uuid.v4();
  253. res.header('X-Request-Id', req.requestId);
  254. next();
  255. };
  256. /**
  257. * @param {any} req
  258. * @param {any} res
  259. * @param {function(Error=): void}
  260. */
  261. const setRemoteAddress = (req, res, next) => {
  262. req.remoteAddress = req.connection.remoteAddress;
  263. next();
  264. };
  265. /**
  266. * @param {any} req
  267. * @param {string[]} necessaryScopes
  268. * @return {boolean}
  269. */
  270. const isInScope = (req, necessaryScopes) =>
  271. req.scopes.some(scope => necessaryScopes.includes(scope));
  272. /**
  273. * @param {string} token
  274. * @param {any} req
  275. * @return {Promise.<void>}
  276. */
  277. const accountFromToken = (token, req) => new Promise((resolve, reject) => {
  278. pgPool.connect((err, client, done) => {
  279. if (err) {
  280. reject(err);
  281. return;
  282. }
  283. client.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
  284. done();
  285. if (err) {
  286. reject(err);
  287. return;
  288. }
  289. if (result.rows.length === 0) {
  290. err = new Error('Invalid access token');
  291. err.status = 401;
  292. reject(err);
  293. return;
  294. }
  295. req.accessTokenId = result.rows[0].id;
  296. req.scopes = result.rows[0].scopes.split(' ');
  297. req.accountId = result.rows[0].account_id;
  298. req.chosenLanguages = result.rows[0].chosen_languages;
  299. req.deviceId = result.rows[0].device_id;
  300. resolve();
  301. });
  302. });
  303. });
  304. /**
  305. * @param {any} req
  306. * @param {boolean=} required
  307. * @return {Promise.<void>}
  308. */
  309. const accountFromRequest = (req, required = true) => new Promise((resolve, reject) => {
  310. const authorization = req.headers.authorization;
  311. const location = url.parse(req.url, true);
  312. const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
  313. if (!authorization && !accessToken) {
  314. if (required) {
  315. const err = new Error('Missing access token');
  316. err.status = 401;
  317. reject(err);
  318. return;
  319. } else {
  320. resolve();
  321. return;
  322. }
  323. }
  324. const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
  325. resolve(accountFromToken(token, req));
  326. });
  327. /**
  328. * @param {any} req
  329. * @returns {string|undefined}
  330. */
  331. const channelNameFromPath = req => {
  332. const { path, query } = req;
  333. const onlyMedia = isTruthy(query.only_media);
  334. switch (path) {
  335. case '/api/v1/streaming/user':
  336. return 'user';
  337. case '/api/v1/streaming/user/notification':
  338. return 'user:notification';
  339. case '/api/v1/streaming/public':
  340. return onlyMedia ? 'public:media' : 'public';
  341. case '/api/v1/streaming/public/local':
  342. return onlyMedia ? 'public:local:media' : 'public:local';
  343. case '/api/v1/streaming/public/remote':
  344. return onlyMedia ? 'public:remote:media' : 'public:remote';
  345. case '/api/v1/streaming/hashtag':
  346. return 'hashtag';
  347. case '/api/v1/streaming/hashtag/local':
  348. return 'hashtag:local';
  349. case '/api/v1/streaming/direct':
  350. return 'direct';
  351. case '/api/v1/streaming/list':
  352. return 'list';
  353. default:
  354. return undefined;
  355. }
  356. };
  357. const PUBLIC_CHANNELS = [
  358. 'public',
  359. 'public:media',
  360. 'public:local',
  361. 'public:local:media',
  362. 'public:remote',
  363. 'public:remote:media',
  364. 'hashtag',
  365. 'hashtag:local',
  366. ];
  367. /**
  368. * @param {any} req
  369. * @param {string} channelName
  370. * @return {Promise.<void>}
  371. */
  372. const checkScopes = (req, channelName) => new Promise((resolve, reject) => {
  373. log.silly(req.requestId, `Checking OAuth scopes for ${channelName}`);
  374. // When accessing public channels, no scopes are needed
  375. if (PUBLIC_CHANNELS.includes(channelName)) {
  376. resolve();
  377. return;
  378. }
  379. // The `read` scope has the highest priority, if the token has it
  380. // then it can access all streams
  381. const requiredScopes = ['read'];
  382. // When accessing specifically the notifications stream,
  383. // we need a read:notifications, while in all other cases,
  384. // we can allow access with read:statuses. Mind that the
  385. // user stream will not contain notifications unless
  386. // the token has either read or read:notifications scope
  387. // as well, this is handled separately.
  388. if (channelName === 'user:notification') {
  389. requiredScopes.push('read:notifications');
  390. } else {
  391. requiredScopes.push('read:statuses');
  392. }
  393. if (req.scopes && requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) {
  394. resolve();
  395. return;
  396. }
  397. const err = new Error('Access token does not cover required scopes');
  398. err.status = 401;
  399. reject(err);
  400. });
  401. /**
  402. * @param {any} info
  403. * @param {function(boolean, number, string): void} callback
  404. */
  405. const wsVerifyClient = (info, callback) => {
  406. // When verifying the websockets connection, we no longer pre-emptively
  407. // check OAuth scopes and drop the connection if they're missing. We only
  408. // drop the connection if access without token is not allowed by environment
  409. // variables. OAuth scope checks are moved to the point of subscription
  410. // to a specific stream.
  411. accountFromRequest(info.req, alwaysRequireAuth).then(() => {
  412. callback(true, undefined, undefined);
  413. }).catch(err => {
  414. log.error(info.req.requestId, err.toString());
  415. callback(false, 401, 'Unauthorized');
  416. });
  417. };
  418. /**
  419. * @typedef SystemMessageHandlers
  420. * @property {function(): void} onKill
  421. */
  422. /**
  423. * @param {any} req
  424. * @param {SystemMessageHandlers} eventHandlers
  425. * @returns {function(object): void}
  426. */
  427. const createSystemMessageListener = (req, eventHandlers) => {
  428. return message => {
  429. const { event } = message;
  430. log.silly(req.requestId, `System message for ${req.accountId}: ${event}`);
  431. if (event === 'kill') {
  432. log.verbose(req.requestId, `Closing connection for ${req.accountId} due to expired access token`);
  433. eventHandlers.onKill();
  434. } else if (event === 'filters_changed') {
  435. log.verbose(req.requestId, `Invalidating filters cache for ${req.accountId}`);
  436. req.cachedFilters = null;
  437. }
  438. };
  439. };
  440. /**
  441. * @param {any} req
  442. * @param {any} res
  443. */
  444. const subscribeHttpToSystemChannel = (req, res) => {
  445. const accessTokenChannelId = `timeline:access_token:${req.accessTokenId}`;
  446. const systemChannelId = `timeline:system:${req.accountId}`;
  447. const listener = createSystemMessageListener(req, {
  448. onKill() {
  449. res.end();
  450. },
  451. });
  452. res.on('close', () => {
  453. unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  454. unsubscribe(`${redisPrefix}${systemChannelId}`, listener);
  455. });
  456. subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  457. subscribe(`${redisPrefix}${systemChannelId}`, listener);
  458. };
  459. /**
  460. * @param {any} req
  461. * @param {any} res
  462. * @param {function(Error=): void} next
  463. */
  464. const authenticationMiddleware = (req, res, next) => {
  465. if (req.method === 'OPTIONS') {
  466. next();
  467. return;
  468. }
  469. accountFromRequest(req, alwaysRequireAuth).then(() => checkScopes(req, channelNameFromPath(req))).then(() => {
  470. subscribeHttpToSystemChannel(req, res);
  471. }).then(() => {
  472. next();
  473. }).catch(err => {
  474. next(err);
  475. });
  476. };
  477. /**
  478. * @param {Error} err
  479. * @param {any} req
  480. * @param {any} res
  481. * @param {function(Error=): void} next
  482. */
  483. const errorMiddleware = (err, req, res, next) => {
  484. log.error(req.requestId, err.toString());
  485. if (res.headersSent) {
  486. next(err);
  487. return;
  488. }
  489. res.writeHead(err.status || 500, { 'Content-Type': 'application/json' });
  490. res.end(JSON.stringify({ error: err.status ? err.toString() : 'An unexpected error occurred' }));
  491. };
  492. /**
  493. * @param {array} arr
  494. * @param {number=} shift
  495. * @return {string}
  496. */
  497. const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
  498. /**
  499. * @param {string} listId
  500. * @param {any} req
  501. * @return {Promise.<void>}
  502. */
  503. const authorizeListAccess = (listId, req) => new Promise((resolve, reject) => {
  504. const { accountId } = req;
  505. pgPool.connect((err, client, done) => {
  506. if (err) {
  507. reject();
  508. return;
  509. }
  510. client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [listId], (err, result) => {
  511. done();
  512. if (err || result.rows.length === 0 || result.rows[0].account_id !== accountId) {
  513. reject();
  514. return;
  515. }
  516. resolve();
  517. });
  518. });
  519. });
  520. /**
  521. * @param {string[]} ids
  522. * @param {any} req
  523. * @param {function(string, string): void} output
  524. * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
  525. * @param {boolean=} needsFiltering
  526. * @returns {SubscriptionListener}
  527. */
  528. const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
  529. const accountId = req.accountId || req.remoteAddress;
  530. log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
  531. const transmit = (event, payload) => {
  532. // TODO: Replace "string"-based delete payloads with object payloads:
  533. const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
  534. log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`);
  535. output(event, encodedPayload);
  536. };
  537. // The listener used to process each message off the redis subscription,
  538. // message here is an object with an `event` and `payload` property. Some
  539. // events also include a queued_at value, but this is being removed shortly.
  540. /** @type {SubscriptionListener} */
  541. const listener = message => {
  542. const { event, payload } = message;
  543. // Streaming only needs to apply filtering to some channels and only to
  544. // some events. This is because majority of the filtering happens on the
  545. // Ruby on Rails side when producing the event for streaming.
  546. //
  547. // The only events that require filtering from the streaming server are
  548. // `update` and `status.update`, all other events are transmitted to the
  549. // client as soon as they're received (pass-through).
  550. //
  551. // The channels that need filtering are determined in the function
  552. // `channelNameToIds` defined below:
  553. if (!needsFiltering || (event !== 'update' && event !== 'status.update')) {
  554. transmit(event, payload);
  555. return;
  556. }
  557. // The rest of the logic from here on in this function is to handle
  558. // filtering of statuses:
  559. // Filter based on language:
  560. if (Array.isArray(req.chosenLanguages) && payload.language !== null && req.chosenLanguages.indexOf(payload.language) === -1) {
  561. log.silly(req.requestId, `Message ${payload.id} filtered by language (${payload.language})`);
  562. return;
  563. }
  564. // When the account is not logged in, it is not necessary to confirm the block or mute
  565. if (!req.accountId) {
  566. transmit(event, payload);
  567. return;
  568. }
  569. // Filter based on domain blocks, blocks, mutes, or custom filters:
  570. const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id));
  571. const accountDomain = payload.account.acct.split('@')[1];
  572. // TODO: Move this logic out of the message handling loop
  573. pgPool.connect((err, client, releasePgConnection) => {
  574. if (err) {
  575. log.error(err);
  576. return;
  577. }
  578. const queries = [
  579. client.query(`SELECT 1
  580. FROM blocks
  581. WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)}))
  582. OR (account_id = $2 AND target_account_id = $1)
  583. UNION
  584. SELECT 1
  585. FROM mutes
  586. WHERE account_id = $1
  587. AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.account.id].concat(targetAccountIds)),
  588. ];
  589. if (accountDomain) {
  590. queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
  591. }
  592. if (!payload.filtered && !req.cachedFilters) {
  593. queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId]));
  594. }
  595. Promise.all(queries).then(values => {
  596. releasePgConnection();
  597. // Handling blocks & mutes and domain blocks: If one of those applies,
  598. // then we don't transmit the payload of the event to the client
  599. if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
  600. return;
  601. }
  602. // If the payload already contains the `filtered` property, it means
  603. // that filtering has been applied on the ruby on rails side, as
  604. // such, we don't need to construct or apply the filters in streaming:
  605. if (Object.prototype.hasOwnProperty.call(payload, "filtered")) {
  606. transmit(event, payload);
  607. return;
  608. }
  609. // Handling for constructing the custom filters and caching them on the request
  610. // TODO: Move this logic out of the message handling lifecycle
  611. if (!req.cachedFilters) {
  612. const filterRows = values[accountDomain ? 2 : 1].rows;
  613. req.cachedFilters = filterRows.reduce((cache, filter) => {
  614. if (cache[filter.id]) {
  615. cache[filter.id].keywords.push([filter.keyword, filter.whole_word]);
  616. } else {
  617. cache[filter.id] = {
  618. keywords: [[filter.keyword, filter.whole_word]],
  619. expires_at: filter.expires_at,
  620. filter: {
  621. id: filter.id,
  622. title: filter.title,
  623. context: filter.context,
  624. expires_at: filter.expires_at,
  625. // filter.filter_action is the value from the
  626. // custom_filters.action database column, it is an integer
  627. // representing a value in an enum defined by Ruby on Rails:
  628. //
  629. // enum { warn: 0, hide: 1 }
  630. filter_action: ['warn', 'hide'][filter.filter_action],
  631. },
  632. };
  633. }
  634. return cache;
  635. }, {});
  636. // Construct the regular expressions for the custom filters: This
  637. // needs to be done in a separate loop as the database returns one
  638. // filterRow per keyword, so we need all the keywords before
  639. // constructing the regular expression
  640. Object.keys(req.cachedFilters).forEach((key) => {
  641. req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
  642. let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
  643. if (whole_word) {
  644. if (/^[\w]/.test(expr)) {
  645. expr = `\\b${expr}`;
  646. }
  647. if (/[\w]$/.test(expr)) {
  648. expr = `${expr}\\b`;
  649. }
  650. }
  651. return expr;
  652. }).join('|'), 'i');
  653. });
  654. }
  655. // Apply cachedFilters against the payload, constructing a
  656. // `filter_results` array of FilterResult entities
  657. if (req.cachedFilters) {
  658. const status = payload;
  659. // TODO: Calculate searchableContent in Ruby on Rails:
  660. const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
  661. const searchableTextContent = JSDOM.fragment(searchableContent).textContent;
  662. const now = new Date();
  663. const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => {
  664. // Check the filter hasn't expired before applying:
  665. if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) {
  666. return results;
  667. }
  668. // Just in-case JSDOM fails to find textContent in searchableContent
  669. if (!searchableTextContent) {
  670. return results;
  671. }
  672. const keyword_matches = searchableTextContent.match(cachedFilter.regexp);
  673. if (keyword_matches) {
  674. // results is an Array of FilterResult; status_matches is always
  675. // null as we only are only applying the keyword-based custom
  676. // filters, not the status-based custom filters.
  677. // https://docs.joinmastodon.org/entities/FilterResult/
  678. results.push({
  679. filter: cachedFilter.filter,
  680. keyword_matches,
  681. status_matches: null
  682. });
  683. }
  684. return results;
  685. }, []);
  686. // Send the payload + the FilterResults as the `filtered` property
  687. // to the streaming connection. To reach this code, the `event` must
  688. // have been either `update` or `status.update`, meaning the
  689. // `payload` is a Status entity, which has a `filtered` property:
  690. //
  691. // filtered: https://docs.joinmastodon.org/entities/Status/#filtered
  692. transmit(event, {
  693. ...payload,
  694. filtered: filter_results
  695. });
  696. } else {
  697. transmit(event, payload);
  698. }
  699. }).catch(err => {
  700. releasePgConnection();
  701. log.error(err);
  702. });
  703. });
  704. };
  705. ids.forEach(id => {
  706. subscribe(`${redisPrefix}${id}`, listener);
  707. });
  708. if (typeof attachCloseHandler === 'function') {
  709. attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
  710. }
  711. return listener;
  712. };
  713. /**
  714. * @param {any} req
  715. * @param {any} res
  716. * @return {function(string, string): void}
  717. */
  718. const streamToHttp = (req, res) => {
  719. const accountId = req.accountId || req.remoteAddress;
  720. res.setHeader('Content-Type', 'text/event-stream');
  721. res.setHeader('Cache-Control', 'no-store');
  722. res.setHeader('Transfer-Encoding', 'chunked');
  723. res.write(':)\n');
  724. const heartbeat = setInterval(() => res.write(':thump\n'), 15000);
  725. req.on('close', () => {
  726. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  727. clearInterval(heartbeat);
  728. });
  729. return (event, payload) => {
  730. res.write(`event: ${event}\n`);
  731. res.write(`data: ${payload}\n\n`);
  732. };
  733. };
  734. /**
  735. * @param {any} req
  736. * @param {function(): void} [closeHandler]
  737. * @returns {function(string[], SubscriptionListener): void}
  738. */
  739. const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
  740. req.on('close', () => {
  741. ids.forEach(id => {
  742. unsubscribe(id, listener);
  743. });
  744. if (closeHandler) {
  745. closeHandler();
  746. }
  747. });
  748. };
  749. /**
  750. * @param {any} req
  751. * @param {any} ws
  752. * @param {string[]} streamName
  753. * @return {function(string, string): void}
  754. */
  755. const streamToWs = (req, ws, streamName) => (event, payload) => {
  756. if (ws.readyState !== ws.OPEN) {
  757. log.error(req.requestId, 'Tried writing to closed socket');
  758. return;
  759. }
  760. ws.send(JSON.stringify({ stream: streamName, event, payload }));
  761. };
  762. /**
  763. * @param {any} res
  764. */
  765. const httpNotFound = res => {
  766. res.writeHead(404, { 'Content-Type': 'application/json' });
  767. res.end(JSON.stringify({ error: 'Not found' }));
  768. };
  769. app.use(setRequestId);
  770. app.use(setRemoteAddress);
  771. app.use(allowCrossDomain);
  772. app.get('/api/v1/streaming/health', (req, res) => {
  773. res.writeHead(200, { 'Content-Type': 'text/plain' });
  774. res.end('OK');
  775. });
  776. app.get('/metrics', (req, res) => server.getConnections((err, count) => {
  777. res.writeHeader(200, { 'Content-Type': 'application/openmetrics-text; version=1.0.0; charset=utf-8' });
  778. res.write('# TYPE connected_clients gauge\n');
  779. res.write('# HELP connected_clients The number of clients connected to the streaming server\n');
  780. res.write(`connected_clients ${count}.0\n`);
  781. res.write('# TYPE connected_channels gauge\n');
  782. res.write('# HELP connected_channels The number of Redis channels the streaming server is subscribed to\n');
  783. res.write(`connected_channels ${Object.keys(subs).length}.0\n`);
  784. res.write('# TYPE pg_pool_total_connections gauge\n');
  785. res.write('# HELP pg_pool_total_connections The total number of clients existing within the pool\n');
  786. res.write(`pg_pool_total_connections ${pgPool.totalCount}.0\n`);
  787. res.write('# TYPE pg_pool_idle_connections gauge\n');
  788. res.write('# HELP pg_pool_idle_connections The number of clients which are not checked out but are currently idle in the pool\n');
  789. res.write(`pg_pool_idle_connections ${pgPool.idleCount}.0\n`);
  790. res.write('# TYPE pg_pool_waiting_queries gauge\n');
  791. res.write('# HELP pg_pool_waiting_queries The number of queued requests waiting on a client when all clients are checked out\n');
  792. res.write(`pg_pool_waiting_queries ${pgPool.waitingCount}.0\n`);
  793. res.write('# EOF\n');
  794. res.end();
  795. }));
  796. app.use(authenticationMiddleware);
  797. app.use(errorMiddleware);
  798. app.get('/api/v1/streaming/*', (req, res) => {
  799. channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
  800. const onSend = streamToHttp(req, res);
  801. const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
  802. streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering);
  803. }).catch(err => {
  804. log.verbose(req.requestId, 'Subscription error:', err.toString());
  805. httpNotFound(res);
  806. });
  807. });
  808. const wss = new WebSocket.Server({ server, verifyClient: wsVerifyClient });
  809. /**
  810. * @typedef StreamParams
  811. * @property {string} [tag]
  812. * @property {string} [list]
  813. * @property {string} [only_media]
  814. */
  815. /**
  816. * @param {any} req
  817. * @return {string[]}
  818. */
  819. const channelsForUserStream = req => {
  820. const arr = [`timeline:${req.accountId}`];
  821. if (isInScope(req, ['crypto']) && req.deviceId) {
  822. arr.push(`timeline:${req.accountId}:${req.deviceId}`);
  823. }
  824. if (isInScope(req, ['read', 'read:notifications'])) {
  825. arr.push(`timeline:${req.accountId}:notifications`);
  826. }
  827. return arr;
  828. };
  829. /**
  830. * See app/lib/ascii_folder.rb for the canon definitions
  831. * of these constants
  832. */
  833. const NON_ASCII_CHARS = 'ÀÁÂÃÄÅàáâãäåĀāĂ㥹ÇçĆćĈĉĊċČčÐðĎďĐđÈÉÊËèéêëĒēĔĕĖėĘęĚěĜĝĞğĠġĢģĤĥĦħÌÍÎÏìíîïĨĩĪīĬĭĮįİıĴĵĶķĸĹĺĻļĽľĿŀŁłÑñŃńŅņŇňʼnŊŋÒÓÔÕÖØòóôõöøŌōŎŏŐőŔŕŖŗŘřŚśŜŝŞşŠšſŢţŤťŦŧÙÚÛÜùúûüŨũŪūŬŭŮůŰűŲųŴŵÝýÿŶŷŸŹźŻżŽž';
  834. const EQUIVALENT_ASCII_CHARS = 'AAAAAAaaaaaaAaAaAaCcCcCcCcCcDdDdDdEEEEeeeeEeEeEeEeEeGgGgGgGgHhHhIIIIiiiiIiIiIiIiIiJjKkkLlLlLlLlLlNnNnNnNnnNnOOOOOOooooooOoOoOoRrRrRrSsSsSsSssTtTtTtUUUUuuuuUuUuUuUuUuUuWwYyyYyYZzZzZz';
  835. /**
  836. * @param {string} str
  837. * @return {string}
  838. */
  839. const foldToASCII = str => {
  840. const regex = new RegExp(NON_ASCII_CHARS.split('').join('|'), 'g');
  841. return str.replace(regex, match => {
  842. const index = NON_ASCII_CHARS.indexOf(match);
  843. return EQUIVALENT_ASCII_CHARS[index];
  844. });
  845. };
  846. /**
  847. * @param {string} str
  848. * @return {string}
  849. */
  850. const normalizeHashtag = str => {
  851. return foldToASCII(str.normalize('NFKC').toLowerCase()).replace(/[^\p{L}\p{N}_\u00b7\u200c]/gu, '');
  852. };
  853. /**
  854. * @param {any} req
  855. * @param {string} name
  856. * @param {StreamParams} params
  857. * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
  858. */
  859. const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
  860. switch (name) {
  861. case 'user':
  862. resolve({
  863. channelIds: channelsForUserStream(req),
  864. options: { needsFiltering: false },
  865. });
  866. break;
  867. case 'user:notification':
  868. resolve({
  869. channelIds: [`timeline:${req.accountId}:notifications`],
  870. options: { needsFiltering: false },
  871. });
  872. break;
  873. case 'public':
  874. resolve({
  875. channelIds: ['timeline:public'],
  876. options: { needsFiltering: true },
  877. });
  878. break;
  879. case 'public:local':
  880. resolve({
  881. channelIds: ['timeline:public:local'],
  882. options: { needsFiltering: true },
  883. });
  884. break;
  885. case 'public:remote':
  886. resolve({
  887. channelIds: ['timeline:public:remote'],
  888. options: { needsFiltering: true },
  889. });
  890. break;
  891. case 'public:media':
  892. resolve({
  893. channelIds: ['timeline:public:media'],
  894. options: { needsFiltering: true },
  895. });
  896. break;
  897. case 'public:local:media':
  898. resolve({
  899. channelIds: ['timeline:public:local:media'],
  900. options: { needsFiltering: true },
  901. });
  902. break;
  903. case 'public:remote:media':
  904. resolve({
  905. channelIds: ['timeline:public:remote:media'],
  906. options: { needsFiltering: true },
  907. });
  908. break;
  909. case 'direct':
  910. resolve({
  911. channelIds: [`timeline:direct:${req.accountId}`],
  912. options: { needsFiltering: false },
  913. });
  914. break;
  915. case 'hashtag':
  916. if (!params.tag || params.tag.length === 0) {
  917. reject('No tag for stream provided');
  918. } else {
  919. resolve({
  920. channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`],
  921. options: { needsFiltering: true },
  922. });
  923. }
  924. break;
  925. case 'hashtag:local':
  926. if (!params.tag || params.tag.length === 0) {
  927. reject('No tag for stream provided');
  928. } else {
  929. resolve({
  930. channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`],
  931. options: { needsFiltering: true },
  932. });
  933. }
  934. break;
  935. case 'list':
  936. authorizeListAccess(params.list, req).then(() => {
  937. resolve({
  938. channelIds: [`timeline:list:${params.list}`],
  939. options: { needsFiltering: false },
  940. });
  941. }).catch(() => {
  942. reject('Not authorized to stream this list');
  943. });
  944. break;
  945. default:
  946. reject('Unknown stream type');
  947. }
  948. });
  949. /**
  950. * @param {string} channelName
  951. * @param {StreamParams} params
  952. * @return {string[]}
  953. */
  954. const streamNameFromChannelName = (channelName, params) => {
  955. if (channelName === 'list') {
  956. return [channelName, params.list];
  957. } else if (['hashtag', 'hashtag:local'].includes(channelName)) {
  958. return [channelName, params.tag];
  959. } else {
  960. return [channelName];
  961. }
  962. };
  963. /**
  964. * @typedef WebSocketSession
  965. * @property {any} socket
  966. * @property {any} request
  967. * @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
  968. */
  969. /**
  970. * @param {WebSocketSession} session
  971. * @param {string} channelName
  972. * @param {StreamParams} params
  973. */
  974. const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) =>
  975. checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({
  976. channelIds,
  977. options,
  978. }) => {
  979. if (subscriptions[channelIds.join(';')]) {
  980. return;
  981. }
  982. const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
  983. const stopHeartbeat = subscriptionHeartbeat(channelIds);
  984. const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
  985. subscriptions[channelIds.join(';')] = {
  986. listener,
  987. stopHeartbeat,
  988. };
  989. }).catch(err => {
  990. log.verbose(request.requestId, 'Subscription error:', err.toString());
  991. socket.send(JSON.stringify({ error: err.toString() }));
  992. });
  993. /**
  994. * @param {WebSocketSession} session
  995. * @param {string} channelName
  996. * @param {StreamParams} params
  997. */
  998. const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) =>
  999. channelNameToIds(request, channelName, params).then(({ channelIds }) => {
  1000. log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`);
  1001. const subscription = subscriptions[channelIds.join(';')];
  1002. if (!subscription) {
  1003. return;
  1004. }
  1005. const { listener, stopHeartbeat } = subscription;
  1006. channelIds.forEach(channelId => {
  1007. unsubscribe(`${redisPrefix}${channelId}`, listener);
  1008. });
  1009. stopHeartbeat();
  1010. delete subscriptions[channelIds.join(';')];
  1011. }).catch(err => {
  1012. log.verbose(request.requestId, 'Unsubscription error:', err);
  1013. socket.send(JSON.stringify({ error: err.toString() }));
  1014. });
  1015. /**
  1016. * @param {WebSocketSession} session
  1017. */
  1018. const subscribeWebsocketToSystemChannel = ({ socket, request, subscriptions }) => {
  1019. const accessTokenChannelId = `timeline:access_token:${request.accessTokenId}`;
  1020. const systemChannelId = `timeline:system:${request.accountId}`;
  1021. const listener = createSystemMessageListener(request, {
  1022. onKill() {
  1023. socket.close();
  1024. },
  1025. });
  1026. subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  1027. subscribe(`${redisPrefix}${systemChannelId}`, listener);
  1028. subscriptions[accessTokenChannelId] = {
  1029. listener,
  1030. stopHeartbeat: () => {
  1031. },
  1032. };
  1033. subscriptions[systemChannelId] = {
  1034. listener,
  1035. stopHeartbeat: () => {
  1036. },
  1037. };
  1038. };
  1039. /**
  1040. * @param {string|string[]} arrayOrString
  1041. * @return {string}
  1042. */
  1043. const firstParam = arrayOrString => {
  1044. if (Array.isArray(arrayOrString)) {
  1045. return arrayOrString[0];
  1046. } else {
  1047. return arrayOrString;
  1048. }
  1049. };
  1050. wss.on('connection', (ws, req) => {
  1051. const location = url.parse(req.url, true);
  1052. req.requestId = uuid.v4();
  1053. req.remoteAddress = ws._socket.remoteAddress;
  1054. ws.isAlive = true;
  1055. ws.on('pong', () => {
  1056. ws.isAlive = true;
  1057. });
  1058. /**
  1059. * @type {WebSocketSession}
  1060. */
  1061. const session = {
  1062. socket: ws,
  1063. request: req,
  1064. subscriptions: {},
  1065. };
  1066. const onEnd = () => {
  1067. const keys = Object.keys(session.subscriptions);
  1068. keys.forEach(channelIds => {
  1069. const { listener, stopHeartbeat } = session.subscriptions[channelIds];
  1070. channelIds.split(';').forEach(channelId => {
  1071. unsubscribe(`${redisPrefix}${channelId}`, listener);
  1072. });
  1073. stopHeartbeat();
  1074. });
  1075. };
  1076. ws.on('close', onEnd);
  1077. ws.on('error', onEnd);
  1078. ws.on('message', (data, isBinary) => {
  1079. if (isBinary) {
  1080. log.warn('socket', 'Received binary data, closing connection');
  1081. ws.close(1003, 'The mastodon streaming server does not support binary messages');
  1082. return;
  1083. }
  1084. const message = data.toString('utf8');
  1085. const json = parseJSON(message, session.request);
  1086. if (!json) return;
  1087. const { type, stream, ...params } = json;
  1088. if (type === 'subscribe') {
  1089. subscribeWebsocketToChannel(session, firstParam(stream), params);
  1090. } else if (type === 'unsubscribe') {
  1091. unsubscribeWebsocketFromChannel(session, firstParam(stream), params);
  1092. } else {
  1093. // Unknown action type
  1094. }
  1095. });
  1096. subscribeWebsocketToSystemChannel(session);
  1097. if (location.query.stream) {
  1098. subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query);
  1099. }
  1100. });
  1101. setInterval(() => {
  1102. wss.clients.forEach(ws => {
  1103. if (ws.isAlive === false) {
  1104. ws.terminate();
  1105. return;
  1106. }
  1107. ws.isAlive = false;
  1108. ws.ping('', false);
  1109. });
  1110. }, 30000);
  1111. attachServerWithConfig(server, address => {
  1112. log.warn(`Worker ${workerId} now listening on ${address}`);
  1113. });
  1114. const onExit = () => {
  1115. log.warn(`Worker ${workerId} exiting`);
  1116. server.close();
  1117. process.exit(0);
  1118. };
  1119. const onError = (err) => {
  1120. log.error(err);
  1121. server.close();
  1122. process.exit(0);
  1123. };
  1124. process.on('SIGINT', onExit);
  1125. process.on('SIGTERM', onExit);
  1126. process.on('exit', onExit);
  1127. process.on('uncaughtException', onError);
  1128. };
  1129. /**
  1130. * @param {any} server
  1131. * @param {function(string): void} [onSuccess]
  1132. */
  1133. const attachServerWithConfig = (server, onSuccess) => {
  1134. if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
  1135. server.listen(process.env.SOCKET || process.env.PORT, () => {
  1136. if (onSuccess) {
  1137. fs.chmodSync(server.address(), 0o666);
  1138. onSuccess(server.address());
  1139. }
  1140. });
  1141. } else {
  1142. server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => {
  1143. if (onSuccess) {
  1144. onSuccess(`${server.address().address}:${server.address().port}`);
  1145. }
  1146. });
  1147. }
  1148. };
  1149. /**
  1150. * @param {function(Error=): void} onSuccess
  1151. */
  1152. const onPortAvailable = onSuccess => {
  1153. const testServer = http.createServer();
  1154. testServer.once('error', err => {
  1155. onSuccess(err);
  1156. });
  1157. testServer.once('listening', () => {
  1158. testServer.once('close', () => onSuccess());
  1159. testServer.close();
  1160. });
  1161. attachServerWithConfig(testServer);
  1162. };
  1163. onPortAvailable(err => {
  1164. if (err) {
  1165. log.error('Could not start server, the port or socket is in use');
  1166. return;
  1167. }
  1168. throng({
  1169. workers: numWorkers,
  1170. lifetime: Infinity,
  1171. start: startWorker,
  1172. master: startMaster,
  1173. });
  1174. });