File: source/socket-channel.js

  1. /**
  2. * Function that sends a socket message over the socket connection to the Signaling.
  3. * @method _sendChannelMessage
  4. * @private
  5. * @for Skylink
  6. * @since 0.5.8
  7. */
  8. Skylink.prototype._sendChannelMessage = function(message) {
  9. var self = this;
  10. var interval = 1000;
  11. var throughput = 16;
  12.  
  13. if (!self._channelOpen || !self._user || !self._socket) {
  14. log.warn([message.target || 'Server', 'Socket', message.type, 'Dropping of message as Socket connection is not opened or is at ' +
  15. 'incorrect step ->'], message);
  16. return;
  17. }
  18.  
  19. if (self._user.sid && !self._peerMessagesStamps[self._user.sid]) {
  20. self._peerMessagesStamps[self._user.sid] = {
  21. userData: 0,
  22. audioMuted: 0,
  23. videoMuted: 0
  24. };
  25. }
  26.  
  27. var checkStampFn = function (statusMessage) {
  28. if (statusMessage.type === self._SIG_MESSAGE_TYPE.UPDATE_USER) {
  29. if (!self._user.sid) {
  30. return false;
  31. }
  32. return statusMessage.stamp > self._peerMessagesStamps[self._user.sid].userData;
  33. } else if (statusMessage.type === self._SIG_MESSAGE_TYPE.MUTE_VIDEO) {
  34. if (!self._user.sid) {
  35. return false;
  36. }
  37. return statusMessage.stamp > self._peerMessagesStamps[self._user.sid].videoMuted;
  38. } else if (statusMessage.type === self._SIG_MESSAGE_TYPE.MUTE_AUDIO) {
  39. if (!self._user.sid) {
  40. return false;
  41. }
  42. return statusMessage.stamp > self._peerMessagesStamps[self._user.sid].audioMuted;
  43. }
  44. return true;
  45. };
  46.  
  47. var setStampFn = function (statusMessage) {
  48. if (statusMessage.type === self._SIG_MESSAGE_TYPE.UPDATE_USER) {
  49. self._peerMessagesStamps[self._user.sid].userData = statusMessage.stamp;
  50. } else if (statusMessage.type === self._SIG_MESSAGE_TYPE.MUTE_VIDEO) {
  51. self._peerMessagesStamps[self._user.sid].videoMuted = statusMessage.stamp;
  52. } else if (statusMessage.type === self._SIG_MESSAGE_TYPE.MUTE_AUDIO) {
  53. self._peerMessagesStamps[self._user.sid].audioMuted = statusMessage.stamp;
  54. }
  55. };
  56.  
  57. var setQueueFn = function () {
  58. log.debug([null, 'Socket', null, 'Starting queue timeout']);
  59.  
  60. self._socketMessageTimeout = setTimeout(function () {
  61. if (((new Date ()).getTime() - self._timestamp.socketMessage) <= interval) {
  62. log.debug([null, 'Socket', null, 'Restarting queue timeout']);
  63. setQueueFn();
  64. return;
  65. }
  66. startSendingQueuedMessageFn();
  67. }, interval - ((new Date ()).getTime() - self._timestamp.socketMessage));
  68. };
  69.  
  70. var triggerEventFn = function (eventMessage) {
  71. if (eventMessage.type === self._SIG_MESSAGE_TYPE.PUBLIC_MESSAGE) {
  72. self._trigger('incomingMessage', {
  73. content: eventMessage.data,
  74. isPrivate: false,
  75. targetPeerId: null,
  76. listOfPeers: Object.keys(self._peerInformations),
  77. isDataChannel: false,
  78. senderPeerId: self._user.sid
  79. }, self._user.sid, self.getPeerInfo(), true);
  80. }
  81. };
  82.  
  83. var sendGroupMessageFn = function (groupMessageList) {
  84. self._socketMessageTimeout = null;
  85.  
  86. if (!self._channelOpen || !(self._user && self._user.sid) || !self._socket) {
  87. log.warn([message.target || 'Server', 'Socket', null, 'Dropping of group messages as Socket connection is not opened or is at ' +
  88. 'incorrect step ->'], groupMessageList);
  89. return;
  90. }
  91.  
  92. var strGroupMessageList = [];
  93. var stamps = {
  94. userData: 0,
  95. audioMuted: 0,
  96. videoMuted: 0
  97. };
  98.  
  99. for (var k = 0; k < groupMessageList.length; k++) {
  100. if (checkStampFn(groupMessageList[k])) {
  101. if (groupMessageList[k].type === self._SIG_MESSAGE_TYPE.UPDATE_USER &&
  102. groupMessageList[k].stamp > self._peerMessagesStamps[self._user.sid].userData &&
  103. groupMessageList[k].stamp > stamps.userData) {
  104. stamps.userData = groupMessageList[k].stamp;
  105. } else if (groupMessageList[k].type === self._SIG_MESSAGE_TYPE.MUTE_AUDIO &&
  106. groupMessageList[k].stamp > self._peerMessagesStamps[self._user.sid].audioMuted &&
  107. groupMessageList[k].stamp > stamps.audioMuted) {
  108. stamps.audioMuted = groupMessageList[k].stamp;
  109. } else if (groupMessageList[k].type === self._SIG_MESSAGE_TYPE.MUTE_VIDEO &&
  110. groupMessageList[k].stamp > self._peerMessagesStamps[self._user.sid].videoMuted &&
  111. groupMessageList[k].stamp > stamps.videoMuted) {
  112. stamps.videoMuted = groupMessageList[k].stamp;
  113. }
  114. }
  115. }
  116.  
  117. for (var i = 0; i < groupMessageList.length; i++) {
  118. if ((groupMessageList[i].type === self._SIG_MESSAGE_TYPE.UPDATE_USER &&
  119. groupMessageList[i].stamp < stamps.userData) ||
  120. (groupMessageList[i].type === self._SIG_MESSAGE_TYPE.MUTE_AUDIO &&
  121. groupMessageList[i].stamp < stamps.audioMuted) ||
  122. (groupMessageList[i].type === self._SIG_MESSAGE_TYPE.MUTE_VIDEO &&
  123. groupMessageList[i].stamp < stamps.videoMuted)) {
  124. log.warn([message.target || 'Server', 'Socket', groupMessageList[i], 'Dropping of outdated status message ->'],
  125. clone(groupMessageList[i]));
  126. groupMessageList.splice(i, 1);
  127. i--;
  128. continue;
  129. }
  130. strGroupMessageList.push(JSON.stringify(groupMessageList[i]));
  131. }
  132.  
  133. if (strGroupMessageList.length > 0) {
  134. var groupMessage = {
  135. type: self._SIG_MESSAGE_TYPE.GROUP,
  136. lists: strGroupMessageList,
  137. mid: self._user.sid,
  138. rid: self._room.id
  139. };
  140.  
  141. log.log([message.target || 'Server', 'Socket', groupMessage.type,
  142. 'Sending queued grouped message (max: 16 per group) ->'], clone(groupMessage));
  143.  
  144. self._socket.send(JSON.stringify(groupMessage));
  145. self._timestamp.socketMessage = (new Date()).getTime();
  146.  
  147. for (var j = 0; j < groupMessageList.length; j++) {
  148. setStampFn(groupMessageList[j]);
  149. triggerEventFn(groupMessageList[j]);
  150. }
  151. }
  152. };
  153.  
  154. var startSendingQueuedMessageFn = function(){
  155. if (self._socketMessageQueue.length > 0){
  156. if (self._socketMessageQueue.length < throughput){
  157. sendGroupMessageFn(self._socketMessageQueue.splice(0, self._socketMessageQueue.length));
  158. } else {
  159. sendGroupMessageFn(self._socketMessageQueue.splice(0, throughput));
  160. setQueueFn();
  161. }
  162. }
  163. };
  164.  
  165. if (self._GROUP_MESSAGE_LIST.indexOf(message.type) > -1) {
  166. if (!(self._timestamp.socketMessage && ((new Date ()).getTime() - self._timestamp.socketMessage) <= interval)) {
  167. if (!checkStampFn(message)) {
  168. log.warn([message.target || 'Server', 'Socket', message.type, 'Dropping of outdated status message ->'], clone(message));
  169. return;
  170. }
  171. if (self._socketMessageTimeout) {
  172. clearTimeout(self._socketMessageTimeout);
  173. }
  174. log.log([message.target || 'Server', 'Socket', message.type, 'Sending message ->'], clone(message));
  175. self._socket.send(JSON.stringify(message));
  176. setStampFn(message);
  177. triggerEventFn(message);
  178.  
  179. self._timestamp.socketMessage = (new Date()).getTime();
  180.  
  181. } else {
  182. log.debug([message.target || 'Server', 'Socket', message.type,
  183. 'Queueing socket message to prevent message drop ->'], clone(message));
  184.  
  185. self._socketMessageQueue.push(message);
  186.  
  187. if (!self._socketMessageTimeout) {
  188. setQueueFn();
  189. }
  190. }
  191. } else {
  192. log.log([message.target || 'Server', 'Socket', message.type, 'Sending message ->'], clone(message));
  193. self._socket.send(JSON.stringify(message));
  194.  
  195. // If Peer sends "bye" on its own, we trigger it as session disconnected abruptly
  196. if (message.type === self._SIG_MESSAGE_TYPE.BYE && self._inRoom &&
  197. self._user && self._user.sid && message.mid === self._user.sid) {
  198. self.leaveRoom(false);
  199. self._trigger('sessionDisconnect', self._user.sid, self.getPeerInfo());
  200. }
  201. }
  202. };
  203.  
  204. /**
  205. * Function that creates and opens a socket connection to the Signaling.
  206. * @method _createSocket
  207. * @private
  208. * @for Skylink
  209. * @since 0.5.10
  210. */
  211. Skylink.prototype._createSocket = function (type, joinRoomTimestamp) {
  212. var self = this;
  213. var options = {
  214. forceNew: true,
  215. reconnection: true,
  216. timeout: self._initOptions.socketTimeout,
  217. reconnectionAttempts: 2,
  218. reconnectionDelayMax: 5000,
  219. reconnectionDelay: 1000,
  220. transports: ['websocket'],
  221. query: { // ESS-1038: Adding custom headers to signaling
  222. Skylink_SDK_type: 'WEB_SDK',
  223. Skylink_SDK_version: self.VERSION
  224. }
  225. };
  226. var ports = self._initOptions.socketServer && typeof self._initOptions.socketServer === 'object' && Array.isArray(self._initOptions.socketServer.ports) &&
  227. self._initOptions.socketServer.ports.length > 0 ? self._initOptions.socketServer.ports : self._socketPorts[self._signalingServerProtocol];
  228. var fallbackType = null;
  229.  
  230. // just beginning
  231. if (self._signalingServerPort === null) {
  232. self._signalingServerPort = ports[0];
  233. fallbackType = self.SOCKET_FALLBACK.NON_FALLBACK;
  234.  
  235. // reached the end of the last port for the protocol type
  236. } else if (ports.indexOf(self._signalingServerPort) === ports.length - 1 || typeof self._initOptions.socketServer === 'string') {
  237. // re-refresh to long-polling port
  238. if (type === 'WebSocket') {
  239. type = 'Polling';
  240. self._signalingServerPort = ports[0];
  241. } else {
  242. self._socketSession.finalAttempts++;
  243. }
  244. // move to the next port
  245. } else {
  246. self._signalingServerPort = ports[ ports.indexOf(self._signalingServerPort) + 1 ];
  247. }
  248.  
  249. if (type === 'Polling') {
  250. options.reconnectionDelayMax = 1000;
  251. options.reconnectionAttempts = 4;
  252. options.transports = ['xhr-polling', 'jsonp-polling', 'polling'];
  253. }
  254.  
  255. var url = self._signalingServerProtocol + '//' + self._signalingServer + ':' + self._signalingServerPort + '?rand=' + Date.now();
  256. var retries = 0;
  257.  
  258. if (self._initOptions.socketServer) {
  259. // Provided as string, make it as just the fixed server
  260. url = typeof self._initOptions.socketServer === 'string' ? self._initOptions.socketServer :
  261. (self._initOptions.socketServer.protocol ? self._initOptions.socketServer.protocol : self._signalingServerProtocol) + '//' +
  262. self._initOptions.socketServer.url + ':' + self._signalingServerPort;
  263. }
  264.  
  265. self._socketSession.transportType = type;
  266. self._socketSession.socketOptions = options;
  267. self._socketSession.socketServer = url;
  268.  
  269. if (fallbackType === null) {
  270. fallbackType = self._signalingServerProtocol === 'http:' ?
  271. (type === 'Polling' ? self.SOCKET_FALLBACK.LONG_POLLING : self.SOCKET_FALLBACK.FALLBACK_PORT) :
  272. (type === 'Polling' ? self.SOCKET_FALLBACK.LONG_POLLING_SSL : self.SOCKET_FALLBACK.FALLBACK_SSL_PORT);
  273.  
  274. self._socketSession.attempts++;
  275. self._trigger('socketError', self.SOCKET_ERROR.RECONNECTION_ATTEMPT, null, fallbackType, clone(self._socketSession));
  276. self._trigger('channelRetry', fallbackType, self._socketSession.attempts, clone(self._socketSession));
  277. }
  278.  
  279. // if socket instance already exists, exit
  280. if (self._socket) {
  281. self._closeChannel();
  282. }
  283.  
  284. self._channelOpen = false;
  285.  
  286. log.log('Opening channel with signaling server url:', clone(self._socketSession));
  287.  
  288. var socket = null;
  289.  
  290. try {
  291. socket = io.connect(url, options);
  292. } catch (error){
  293. log.error('Failed creating socket connection object ->', error);
  294. if (fallbackType === self.SOCKET_FALLBACK.NON_FALLBACK) {
  295. self._trigger('socketError', self.SOCKET_ERROR.CONNECTION_FAILED, error, fallbackType, clone(self._socketSession));
  296. } else {
  297. self._trigger('socketError', self.SOCKET_ERROR.RECONNECTION_FAILED, error, fallbackType, clone(self._socketSession));
  298. }
  299. self._trigger('socketError', self.SOCKET_ERROR.RECONNECTION_ABORTED, new Error('Reconnection aborted as ' +
  300. 'there no more available ports, transports and final attempts left.'), fallbackType, clone(self._socketSession));
  301. return;
  302. }
  303.  
  304. socket.on('reconnect_attempt', function (attempt) {
  305. retries++;
  306. self._socketSession.attempts++;
  307. self._handleSignalingStats('reconnect_attempt', retries);
  308. self._trigger('channelRetry', fallbackType, self._socketSession.attempts, clone(self._socketSession));
  309. });
  310.  
  311. socket.on('reconnect_failed', function () {
  312. var errorMsg = 'Failed reconnection with transport "' + type + '" and port ' + self._signalingServerPort + '.';
  313. self._handleSignalingStats('reconnect_failed', retries, errorMsg);
  314.  
  315. if (fallbackType === self.SOCKET_FALLBACK.NON_FALLBACK) {
  316. errorMsg = errorMsg.replace(/ reconnection /g, ' connection ');
  317. self._trigger('socketError', self.SOCKET_ERROR.CONNECTION_FAILED, new Error(errorMsg), fallbackType, clone(self._socketSession));
  318.  
  319. } else {
  320. self._trigger('socketError', self.SOCKET_ERROR.RECONNECTION_FAILED, new Error(errorMsg), fallbackType, clone(self._socketSession));
  321. }
  322.  
  323. if (self._socketSession.finalAttempts < 2) {
  324. self._createSocket(type, joinRoomTimestamp);
  325. } else {
  326. self._trigger('socketError', self.SOCKET_ERROR.RECONNECTION_ABORTED, new Error('Reconnection aborted as ' +
  327. 'there no more available ports, transports and final attempts left.'), fallbackType, clone(self._socketSession));
  328. }
  329. });
  330.  
  331. socket.on('reconnect_error', function (error) {
  332. self._handleSignalingStats('reconnect_error', retries, error);
  333. });
  334.  
  335. socket.on('connect', function () {
  336. self._handleSignalingStats('connect', retries);
  337. if (!self._channelOpen) {
  338. log.log([null, 'Socket', null, 'Channel opened']);
  339. self._channelOpen = true;
  340. self._trigger('channelOpen', clone(self._socketSession));
  341. }
  342. });
  343.  
  344. socket.on('reconnect', function () {
  345. self._handleSignalingStats('reconnect', retries);
  346. if (!self._channelOpen) {
  347. log.log([null, 'Socket', null, 'Channel opened']);
  348. self._channelOpen = true;
  349. self._trigger('channelOpen', clone(self._socketSession));
  350. }
  351. });
  352.  
  353. socket.on('error', function(error) {
  354. self._handleSignalingStats('error', retries, error);
  355. if (error ? error.message.indexOf('xhr poll error') > -1 : false) {
  356. log.error([null, 'Socket', null, 'XHR poll connection unstable. Disconnecting.. ->'], error);
  357. self._closeChannel();
  358. return;
  359. }
  360. log.error([null, 'Socket', null, 'Exception occurred ->'], error);
  361. self._trigger('channelError', error, clone(self._socketSession));
  362. });
  363.  
  364. socket.on('disconnect', function() {
  365. self._handleSignalingStats('disconnect', retries);
  366. if (self._channelOpen) {
  367. self._channelOpen = false;
  368. self._trigger('channelClose', clone(self._socketSession));
  369. log.log([null, 'Socket', null, 'Channel closed']);
  370.  
  371. if (self._inRoom && self._user && self._user.sid) {
  372. self.leaveRoom(false);
  373. self._trigger('sessionDisconnect', self._user.sid, self.getPeerInfo());
  374. }
  375. }
  376. });
  377.  
  378. socket.on('message', function(messageStr) {
  379. var message = JSON.parse(messageStr);
  380.  
  381. log.log([null, 'Socket', null, 'Received message ->'], message);
  382.  
  383. if (message.type === self._SIG_MESSAGE_TYPE.GROUP) {
  384. log.debug('Bundle of ' + message.lists.length + ' messages');
  385. for (var i = 0; i < message.lists.length; i++) {
  386. var indiMessage = JSON.parse(message.lists[i]);
  387. self._processSigMessage(indiMessage);
  388. self._trigger('channelMessage', indiMessage, clone(self._socketSession));
  389. }
  390. } else {
  391. self._processSigMessage(message);
  392. self._trigger('channelMessage', message, clone(self._socketSession));
  393. }
  394. });
  395.  
  396. self._joinRoomManager.socketsFn.push(function (currentJoinRoomTimestamp) {
  397. if (currentJoinRoomTimestamp !== joinRoomTimestamp) {
  398. socket.disconnect();
  399. }
  400. });
  401. self._socket = socket;
  402. };
  403.  
  404. /**
  405. * Function that starts the socket connection to the Signaling.
  406. * This starts creating the socket connection and called at first not when requiring to fallback.
  407. * @method _openChannel
  408. * @private
  409. * @for Skylink
  410. * @since 0.5.5
  411. */
  412. Skylink.prototype._openChannel = function(joinRoomTimestamp) {
  413. var self = this;
  414. if (self._channelOpen) {
  415. log.error([null, 'Socket', null, 'Unable to instantiate a new channel connection ' +
  416. 'as there is already an ongoing channel connection']);
  417. return;
  418. }
  419.  
  420. if (self._readyState !== self.READY_STATE_CHANGE.COMPLETED) {
  421. log.error([null, 'Socket', null, 'Unable to instantiate a new channel connection ' +
  422. 'as readyState is not ready']);
  423. return;
  424. }
  425.  
  426. // set if forceSSL
  427. if (self._initOptions.forceSSL) {
  428. self._signalingServerProtocol = 'https:';
  429. } else {
  430. self._signalingServerProtocol = window.location.protocol;
  431. }
  432.  
  433. var socketType = 'WebSocket';
  434.  
  435. // For IE < 9 that doesn't support WebSocket
  436. if (!window.WebSocket) {
  437. socketType = 'Polling';
  438. }
  439.  
  440. self._socketSession.finalAttempts = 0;
  441. self._socketSession.attempts = 0;
  442. self._signalingServerPort = null;
  443.  
  444. // Begin with a websocket connection
  445. self._createSocket(socketType, joinRoomTimestamp);
  446. };
  447.  
  448. /**
  449. * Function that stops the socket connection to the Signaling.
  450. * @method _closeChannel
  451. * @private
  452. * @for Skylink
  453. * @since 0.5.5
  454. */
  455. Skylink.prototype._closeChannel = function() {
  456. if (this._socket) {
  457. this._socket.removeAllListeners('connect_error');
  458. this._socket.removeAllListeners('reconnect_attempt');
  459. this._socket.removeAllListeners('reconnect_error');
  460. this._socket.removeAllListeners('reconnect_failed');
  461. this._socket.removeAllListeners('connect');
  462. this._socket.removeAllListeners('reconnect');
  463. this._socket.removeAllListeners('error');
  464. this._socket.removeAllListeners('disconnect');
  465. this._socket.removeAllListeners('message');
  466. }
  467.  
  468. if (this._channelOpen) {
  469. if (this._socket) {
  470. this._socket.disconnect();
  471. }
  472.  
  473. log.log([null, 'Socket', null, 'Channel closed']);
  474.  
  475. this._channelOpen = false;
  476. this._trigger('channelClose', clone(this._socketSession));
  477.  
  478. if (this._inRoom && this._user && this._user.sid) {
  479. this.leaveRoom(false);
  480. this._trigger('sessionDisconnect', this._user.sid, this.getPeerInfo());
  481. }
  482. }
  483.  
  484. this._socket = null;
  485. };