client.js 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. var urlparse = require('url').parse
  2. , OutgoingMessage = require('http').OutgoingMessage
  3. , Stream = require('net').Stream
  4. , options = require('./utils').options
  5. , encode = require('./utils').encode
  6. , decode = require('./utils').decode
  7. , merge = require('./utils').merge
  8. , util = require(process.binding('natives').util ? 'util' : 'sys');
  9. var Client = module.exports = function(listener, req, res, options, head){
  10. process.EventEmitter.call(this);
  11. this.listener = listener;
  12. this.options(merge({
  13. timeout: 8000,
  14. heartbeatInterval: 10000,
  15. closeTimeout: 0
  16. }, this.getOptions ? this.getOptions() : {}), options);
  17. this.connections = 0;
  18. this._open = false;
  19. this._heartbeats = 0;
  20. this.connected = false;
  21. this.upgradeHead = head;
  22. this._onConnect(req, res);
  23. };
  24. util.inherits(Client, process.EventEmitter);
  25. Client.prototype.send = function(message){
  26. if (!this._open || !(this.connection.readyState === 'open' || this.connection.readyState === 'writeOnly')){
  27. return this._queue(message);
  28. }
  29. this._write(encode(message));
  30. return this;
  31. };
  32. Client.prototype.broadcast = function(message){
  33. if (!('sessionId' in this)) return this;
  34. this.listener.broadcast(message, this.sessionId);
  35. return this;
  36. };
  37. Client.prototype._onMessage = function(data){
  38. var messages = decode(data);
  39. if (messages === false) return this.listener.options.log('Bad message received from client ' + this.sessionId);
  40. for (var i = 0, l = messages.length, frame; i < l; i++){
  41. frame = messages[i].substr(0, 3);
  42. switch (frame){
  43. case '~h~':
  44. return this._onHeartbeat(messages[i].substr(3));
  45. case '~j~':
  46. try {
  47. messages[i] = JSON.parse(messages[i].substr(3));
  48. } catch(e) {
  49. messages[i] = {};
  50. }
  51. break;
  52. }
  53. this.emit('message', messages[i]);
  54. this.listener._onClientMessage(messages[i], this);
  55. }
  56. };
  57. Client.prototype._onConnect = function(req, res){
  58. var self = this
  59. , attachConnection = !this.connection;
  60. this.request = req;
  61. this.response = res;
  62. this.connection = req.connection;
  63. if(!attachConnection) attachConnection = !attachConnection && this.connection.eventsAttached === undefined;
  64. this.connection.eventsAttached = true;
  65. if (attachConnection){
  66. function destroyConnection(){
  67. self._onClose();
  68. self.connection && self.connection.destroy()
  69. };
  70. this.connection.addListener('end', destroyConnection);
  71. this.connection.addListener('timeout', destroyConnection);
  72. this.connection.addListener('error', destroyConnection);
  73. }
  74. if (req){
  75. function destroyRequest(){
  76. req.destroy && req.destroy();
  77. };
  78. req.addListener('error', destroyRequest);
  79. req.addListener('timeout', destroyRequest);
  80. if (res){
  81. function destroyResponse(){
  82. res.destroy && res.destroy();
  83. }
  84. res.addListener('error', destroyResponse);
  85. res.addListener('timeout', destroyResponse);
  86. }
  87. if (this._disconnectTimeout) clearTimeout(this._disconnectTimeout);
  88. }
  89. };
  90. Client.prototype._payload = function(){
  91. var payload = [];
  92. this.connections++;
  93. this.connected = true;
  94. this._open = true;
  95. if (!this.handshaked){
  96. this._generateSessionId();
  97. payload.push(this.sessionId);
  98. this.handshaked = true;
  99. }
  100. payload = payload.concat(this._writeQueue || []);
  101. this._writeQueue = [];
  102. if (payload.length) this._write(encode(payload));
  103. if (this.connections === 1) this.listener._onClientConnect(this);
  104. if (this.options.timeout) this._heartbeat();
  105. };
  106. Client.prototype._heartbeat = function(){
  107. var self = this;
  108. this._heartbeatInterval = setTimeout(function(){
  109. self.send('~h~' + ++self._heartbeats);
  110. self._heartbeatTimeout = setTimeout(function(){
  111. self._onClose();
  112. }, self.options.timeout);
  113. }, self.options.heartbeatInterval);
  114. };
  115. Client.prototype._onHeartbeat = function(h){
  116. if (h == this._heartbeats){
  117. clearTimeout(this._heartbeatTimeout);
  118. this._heartbeat();
  119. }
  120. };
  121. Client.prototype._onClose = function(skipDisconnect){
  122. if (!this._open) return this;
  123. var self = this;
  124. if (this._heartbeatInterval) clearTimeout(this._heartbeatInterval);
  125. if (this._heartbeatTimeout) clearTimeout(this._heartbeatTimeout);
  126. this._open = false;
  127. this.request = null;
  128. this.response = null;
  129. if (skipDisconnect !== false){
  130. if (this.handshaked){
  131. this._disconnectTimeout = setTimeout(function(){
  132. self._onDisconnect();
  133. }, this.options.closeTimeout);
  134. } else
  135. this._onDisconnect();
  136. }
  137. };
  138. Client.prototype._onDisconnect = function(){
  139. if (this._open) this._onClose(true);
  140. if (this._disconnectTimeout) clearTimeout(this._disconnectTimeout);
  141. this._writeQueue = [];
  142. this.connected = false;
  143. if (this.handshaked){
  144. this.emit('disconnect');
  145. this.listener._onClientDisconnect(this);
  146. this.handshaked = false;
  147. }
  148. };
  149. Client.prototype._queue = function(message){
  150. this._writeQueue = this._writeQueue || [];
  151. this._writeQueue.push(message);
  152. return this;
  153. };
  154. Client.prototype._generateSessionId = function(){
  155. this.sessionId = Math.random().toString().substr(2);
  156. return this;
  157. };
  158. Client.prototype._verifyOrigin = function(origin){
  159. var origins = this.listener.options.origins;
  160. if (origins.indexOf('*:*') !== -1) {
  161. return true;
  162. }
  163. if (origin) {
  164. try {
  165. var parts = urlparse(origin);
  166. return origins.indexOf(parts.host + ':' + parts.port) !== -1 ||
  167. origins.indexOf(parts.host + ':*') !== -1 ||
  168. origins.indexOf('*:' + parts.port) !== -1;
  169. } catch (ex) {}
  170. }
  171. return false;
  172. };
  173. for (var i in options) Client.prototype[i] = options[i];