|
@@ -81,6 +81,19 @@ const redisUrlToClient = (defaultConfig, redisUrl) => {
|
|
|
|
|
|
const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
|
|
|
|
|
|
+/**
|
|
|
+ * @param {string} json
|
|
|
+ * @return {Object.<string, any>|null}
|
|
|
+ */
|
|
|
+const parseJSON = (json) => {
|
|
|
+ try {
|
|
|
+ return JSON.parse(json);
|
|
|
+ } catch (err) {
|
|
|
+ log.error(err);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
const startMaster = () => {
|
|
|
if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
|
|
|
log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
|
|
@@ -522,7 +535,9 @@ const startWorker = (workerId) => {
|
|
|
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`);
|
|
|
|
|
|
const listener = message => {
|
|
|
- const { event, payload, queued_at } = JSON.parse(message);
|
|
|
+ const json = parseJSON(message);
|
|
|
+ if (!json) return;
|
|
|
+ const { event, payload, queued_at } = json;
|
|
|
|
|
|
const transmit = () => {
|
|
|
const now = new Date().getTime();
|
|
@@ -932,7 +947,9 @@ const startWorker = (workerId) => {
|
|
|
ws.on('error', onEnd);
|
|
|
|
|
|
ws.on('message', data => {
|
|
|
- const { type, stream, ...params } = JSON.parse(data);
|
|
|
+ const json = parseJSON(data);
|
|
|
+ if (!json) return;
|
|
|
+ const { type, stream, ...params } = json;
|
|
|
|
|
|
if (type === 'subscribe') {
|
|
|
subscribeWebsocketToChannel(session, firstParam(stream), params);
|