- /**
- * Function that sends a socket message over the socket connection to the Signaling.
- * @method _sendChannelMessage
- * @private
- * @for Skylink
- * @since 0.5.8
- */
- Skylink.prototype._sendChannelMessage = function(message) {
- var self = this;
- var interval = 1000;
- var throughput = 16;
-
- if (!self._channelOpen || !self._user || !self._socket) {
- log.warn([message.target || 'Server', 'Socket', message.type, 'Dropping of message as Socket connection is not opened or is at ' +
- 'incorrect step ->'], message);
- return;
- }
-
- if (self._user.sid && !self._peerMessagesStamps[self._user.sid]) {
- self._peerMessagesStamps[self._user.sid] = {
- userData: 0,
- audioMuted: 0,
- videoMuted: 0
- };
- }
-
- var checkStampFn = function (statusMessage) {
- if (statusMessage.type === self._SIG_MESSAGE_TYPE.UPDATE_USER) {
- if (!self._user.sid) {
- return false;
- }
- return statusMessage.stamp > self._peerMessagesStamps[self._user.sid].userData;
- } else if (statusMessage.type === self._SIG_MESSAGE_TYPE.MUTE_VIDEO) {
- if (!self._user.sid) {
- return false;
- }
- return statusMessage.stamp > self._peerMessagesStamps[self._user.sid].videoMuted;
- } else if (statusMessage.type === self._SIG_MESSAGE_TYPE.MUTE_AUDIO) {
- if (!self._user.sid) {
- return false;
- }
- return statusMessage.stamp > self._peerMessagesStamps[self._user.sid].audioMuted;
- }
- return true;
- };
-
- var setStampFn = function (statusMessage) {
- if (statusMessage.type === self._SIG_MESSAGE_TYPE.UPDATE_USER) {
- self._peerMessagesStamps[self._user.sid].userData = statusMessage.stamp;
- } else if (statusMessage.type === self._SIG_MESSAGE_TYPE.MUTE_VIDEO) {
- self._peerMessagesStamps[self._user.sid].videoMuted = statusMessage.stamp;
- } else if (statusMessage.type === self._SIG_MESSAGE_TYPE.MUTE_AUDIO) {
- self._peerMessagesStamps[self._user.sid].audioMuted = statusMessage.stamp;
- }
- };
-
- var setQueueFn = function () {
- log.debug([null, 'Socket', null, 'Starting queue timeout']);
-
- self._socketMessageTimeout = setTimeout(function () {
- if (((new Date ()).getTime() - self._timestamp.socketMessage) <= interval) {
- log.debug([null, 'Socket', null, 'Restarting queue timeout']);
- setQueueFn();
- return;
- }
- startSendingQueuedMessageFn();
- }, interval - ((new Date ()).getTime() - self._timestamp.socketMessage));
- };
-
- var triggerEventFn = function (eventMessage) {
- if (eventMessage.type === self._SIG_MESSAGE_TYPE.PUBLIC_MESSAGE) {
- self._trigger('incomingMessage', {
- content: eventMessage.data,
- isPrivate: false,
- targetPeerId: null,
- listOfPeers: Object.keys(self._peerInformations),
- isDataChannel: false,
- senderPeerId: self._user.sid
- }, self._user.sid, self.getPeerInfo(), true);
- }
- };
-
- var sendGroupMessageFn = function (groupMessageList) {
- self._socketMessageTimeout = null;
-
- if (!self._channelOpen || !(self._user && self._user.sid) || !self._socket) {
- log.warn([message.target || 'Server', 'Socket', null, 'Dropping of group messages as Socket connection is not opened or is at ' +
- 'incorrect step ->'], groupMessageList);
- return;
- }
-
- var strGroupMessageList = [];
- var stamps = {
- userData: 0,
- audioMuted: 0,
- videoMuted: 0
- };
-
- for (var k = 0; k < groupMessageList.length; k++) {
- if (checkStampFn(groupMessageList[k])) {
- if (groupMessageList[k].type === self._SIG_MESSAGE_TYPE.UPDATE_USER &&
- groupMessageList[k].stamp > self._peerMessagesStamps[self._user.sid].userData &&
- groupMessageList[k].stamp > stamps.userData) {
- stamps.userData = groupMessageList[k].stamp;
- } else if (groupMessageList[k].type === self._SIG_MESSAGE_TYPE.MUTE_AUDIO &&
- groupMessageList[k].stamp > self._peerMessagesStamps[self._user.sid].audioMuted &&
- groupMessageList[k].stamp > stamps.audioMuted) {
- stamps.audioMuted = groupMessageList[k].stamp;
- } else if (groupMessageList[k].type === self._SIG_MESSAGE_TYPE.MUTE_VIDEO &&
- groupMessageList[k].stamp > self._peerMessagesStamps[self._user.sid].videoMuted &&
- groupMessageList[k].stamp > stamps.videoMuted) {
- stamps.videoMuted = groupMessageList[k].stamp;
- }
- }
- }
-
- for (var i = 0; i < groupMessageList.length; i++) {
- if ((groupMessageList[i].type === self._SIG_MESSAGE_TYPE.UPDATE_USER &&
- groupMessageList[i].stamp < stamps.userData) ||
- (groupMessageList[i].type === self._SIG_MESSAGE_TYPE.MUTE_AUDIO &&
- groupMessageList[i].stamp < stamps.audioMuted) ||
- (groupMessageList[i].type === self._SIG_MESSAGE_TYPE.MUTE_VIDEO &&
- groupMessageList[i].stamp < stamps.videoMuted)) {
- log.warn([message.target || 'Server', 'Socket', groupMessageList[i], 'Dropping of outdated status message ->'],
- clone(groupMessageList[i]));
- groupMessageList.splice(i, 1);
- i--;
- continue;
- }
- strGroupMessageList.push(JSON.stringify(groupMessageList[i]));
- }
-
- if (strGroupMessageList.length > 0) {
- var groupMessage = {
- type: self._SIG_MESSAGE_TYPE.GROUP,
- lists: strGroupMessageList,
- mid: self._user.sid,
- rid: self._room.id
- };
-
- log.log([message.target || 'Server', 'Socket', groupMessage.type,
- 'Sending queued grouped message (max: 16 per group) ->'], clone(groupMessage));
-
- self._socket.send(JSON.stringify(groupMessage));
- self._timestamp.socketMessage = (new Date()).getTime();
-
- for (var j = 0; j < groupMessageList.length; j++) {
- setStampFn(groupMessageList[j]);
- triggerEventFn(groupMessageList[j]);
- }
- }
- };
-
- var startSendingQueuedMessageFn = function(){
- if (self._socketMessageQueue.length > 0){
- if (self._socketMessageQueue.length < throughput){
- sendGroupMessageFn(self._socketMessageQueue.splice(0, self._socketMessageQueue.length));
- } else {
- sendGroupMessageFn(self._socketMessageQueue.splice(0, throughput));
- setQueueFn();
- }
- }
- };
-
- if (self._GROUP_MESSAGE_LIST.indexOf(message.type) > -1) {
- if (!(self._timestamp.socketMessage && ((new Date ()).getTime() - self._timestamp.socketMessage) <= interval)) {
- if (!checkStampFn(message)) {
- log.warn([message.target || 'Server', 'Socket', message.type, 'Dropping of outdated status message ->'], clone(message));
- return;
- }
- if (self._socketMessageTimeout) {
- clearTimeout(self._socketMessageTimeout);
- }
- log.log([message.target || 'Server', 'Socket', message.type, 'Sending message ->'], clone(message));
- self._socket.send(JSON.stringify(message));
- setStampFn(message);
- triggerEventFn(message);
-
- self._timestamp.socketMessage = (new Date()).getTime();
-
- } else {
- log.debug([message.target || 'Server', 'Socket', message.type,
- 'Queueing socket message to prevent message drop ->'], clone(message));
-
- self._socketMessageQueue.push(message);
-
- if (!self._socketMessageTimeout) {
- setQueueFn();
- }
- }
- } else {
- log.log([message.target || 'Server', 'Socket', message.type, 'Sending message ->'], clone(message));
- self._socket.send(JSON.stringify(message));
-
- // If Peer sends "bye" on its own, we trigger it as session disconnected abruptly
- if (message.type === self._SIG_MESSAGE_TYPE.BYE && self._inRoom &&
- self._user && self._user.sid && message.mid === self._user.sid) {
- self.leaveRoom(false);
- self._trigger('sessionDisconnect', self._user.sid, self.getPeerInfo());
- }
- }
- };
-
- /**
- * Function that creates and opens a socket connection to the Signaling.
- * @method _createSocket
- * @private
- * @for Skylink
- * @since 0.5.10
- */
- Skylink.prototype._createSocket = function (type, joinRoomTimestamp) {
- var self = this;
- var options = {
- forceNew: true,
- reconnection: true,
- timeout: self._initOptions.socketTimeout,
- reconnectionAttempts: 2,
- reconnectionDelayMax: 5000,
- reconnectionDelay: 1000,
- transports: ['websocket'],
- query: { // ESS-1038: Adding custom headers to signaling
- Skylink_SDK_type: 'WEB_SDK',
- Skylink_SDK_version: self.VERSION
- }
- };
- var ports = self._initOptions.socketServer && typeof self._initOptions.socketServer === 'object' && Array.isArray(self._initOptions.socketServer.ports) &&
- self._initOptions.socketServer.ports.length > 0 ? self._initOptions.socketServer.ports : self._socketPorts[self._signalingServerProtocol];
- var fallbackType = null;
-
- // just beginning
- if (self._signalingServerPort === null) {
- self._signalingServerPort = ports[0];
- fallbackType = self.SOCKET_FALLBACK.NON_FALLBACK;
-
- // reached the end of the last port for the protocol type
- } else if (ports.indexOf(self._signalingServerPort) === ports.length - 1 || typeof self._initOptions.socketServer === 'string') {
- // re-refresh to long-polling port
- if (type === 'WebSocket') {
- type = 'Polling';
- self._signalingServerPort = ports[0];
- } else {
- self._socketSession.finalAttempts++;
- }
- // move to the next port
- } else {
- self._signalingServerPort = ports[ ports.indexOf(self._signalingServerPort) + 1 ];
- }
-
- if (type === 'Polling') {
- options.reconnectionDelayMax = 1000;
- options.reconnectionAttempts = 4;
- options.transports = ['xhr-polling', 'jsonp-polling', 'polling'];
- }
-
- var url = self._signalingServerProtocol + '//' + self._signalingServer + ':' + self._signalingServerPort + '?rand=' + Date.now();
- var retries = 0;
-
- if (self._initOptions.socketServer) {
- // Provided as string, make it as just the fixed server
- url = typeof self._initOptions.socketServer === 'string' ? self._initOptions.socketServer :
- (self._initOptions.socketServer.protocol ? self._initOptions.socketServer.protocol : self._signalingServerProtocol) + '//' +
- self._initOptions.socketServer.url + ':' + self._signalingServerPort;
- }
-
- self._socketSession.transportType = type;
- self._socketSession.socketOptions = options;
- self._socketSession.socketServer = url;
-
- if (fallbackType === null) {
- fallbackType = self._signalingServerProtocol === 'http:' ?
- (type === 'Polling' ? self.SOCKET_FALLBACK.LONG_POLLING : self.SOCKET_FALLBACK.FALLBACK_PORT) :
- (type === 'Polling' ? self.SOCKET_FALLBACK.LONG_POLLING_SSL : self.SOCKET_FALLBACK.FALLBACK_SSL_PORT);
-
- self._socketSession.attempts++;
- self._trigger('socketError', self.SOCKET_ERROR.RECONNECTION_ATTEMPT, null, fallbackType, clone(self._socketSession));
- self._trigger('channelRetry', fallbackType, self._socketSession.attempts, clone(self._socketSession));
- }
-
- // if socket instance already exists, exit
- if (self._socket) {
- self._closeChannel();
- }
-
- self._channelOpen = false;
-
- log.log('Opening channel with signaling server url:', clone(self._socketSession));
-
- var socket = null;
-
- try {
- socket = io.connect(url, options);
- } catch (error){
- log.error('Failed creating socket connection object ->', error);
- if (fallbackType === self.SOCKET_FALLBACK.NON_FALLBACK) {
- self._trigger('socketError', self.SOCKET_ERROR.CONNECTION_FAILED, error, fallbackType, clone(self._socketSession));
- } else {
- self._trigger('socketError', self.SOCKET_ERROR.RECONNECTION_FAILED, error, fallbackType, clone(self._socketSession));
- }
- self._trigger('socketError', self.SOCKET_ERROR.RECONNECTION_ABORTED, new Error('Reconnection aborted as ' +
- 'there no more available ports, transports and final attempts left.'), fallbackType, clone(self._socketSession));
- return;
- }
-
- socket.on('reconnect_attempt', function (attempt) {
- retries++;
- self._socketSession.attempts++;
- self._handleSignalingStats('reconnect_attempt', retries);
- self._trigger('channelRetry', fallbackType, self._socketSession.attempts, clone(self._socketSession));
- });
-
- socket.on('reconnect_failed', function () {
- var errorMsg = 'Failed reconnection with transport "' + type + '" and port ' + self._signalingServerPort + '.';
- self._handleSignalingStats('reconnect_failed', retries, errorMsg);
-
- if (fallbackType === self.SOCKET_FALLBACK.NON_FALLBACK) {
- errorMsg = errorMsg.replace(/ reconnection /g, ' connection ');
- self._trigger('socketError', self.SOCKET_ERROR.CONNECTION_FAILED, new Error(errorMsg), fallbackType, clone(self._socketSession));
-
- } else {
- self._trigger('socketError', self.SOCKET_ERROR.RECONNECTION_FAILED, new Error(errorMsg), fallbackType, clone(self._socketSession));
- }
-
- if (self._socketSession.finalAttempts < 2) {
- self._createSocket(type, joinRoomTimestamp);
- } else {
- self._trigger('socketError', self.SOCKET_ERROR.RECONNECTION_ABORTED, new Error('Reconnection aborted as ' +
- 'there no more available ports, transports and final attempts left.'), fallbackType, clone(self._socketSession));
- }
- });
-
- socket.on('reconnect_error', function (error) {
- self._handleSignalingStats('reconnect_error', retries, error);
- });
-
- socket.on('connect', function () {
- self._handleSignalingStats('connect', retries);
- if (!self._channelOpen) {
- log.log([null, 'Socket', null, 'Channel opened']);
- self._channelOpen = true;
- self._trigger('channelOpen', clone(self._socketSession));
- }
- });
-
- socket.on('reconnect', function () {
- self._handleSignalingStats('reconnect', retries);
- if (!self._channelOpen) {
- log.log([null, 'Socket', null, 'Channel opened']);
- self._channelOpen = true;
- self._trigger('channelOpen', clone(self._socketSession));
- }
- });
-
- socket.on('error', function(error) {
- self._handleSignalingStats('error', retries, error);
- if (error ? error.message.indexOf('xhr poll error') > -1 : false) {
- log.error([null, 'Socket', null, 'XHR poll connection unstable. Disconnecting.. ->'], error);
- self._closeChannel();
- return;
- }
- log.error([null, 'Socket', null, 'Exception occurred ->'], error);
- self._trigger('channelError', error, clone(self._socketSession));
- });
-
- socket.on('disconnect', function() {
- self._handleSignalingStats('disconnect', retries);
- if (self._channelOpen) {
- self._channelOpen = false;
- self._trigger('channelClose', clone(self._socketSession));
- log.log([null, 'Socket', null, 'Channel closed']);
-
- if (self._inRoom && self._user && self._user.sid) {
- self.leaveRoom(false);
- self._trigger('sessionDisconnect', self._user.sid, self.getPeerInfo());
- }
- }
- });
-
- socket.on('message', function(messageStr) {
- var message = JSON.parse(messageStr);
-
- log.log([null, 'Socket', null, 'Received message ->'], message);
-
- if (message.type === self._SIG_MESSAGE_TYPE.GROUP) {
- log.debug('Bundle of ' + message.lists.length + ' messages');
- for (var i = 0; i < message.lists.length; i++) {
- var indiMessage = JSON.parse(message.lists[i]);
- self._processSigMessage(indiMessage);
- self._trigger('channelMessage', indiMessage, clone(self._socketSession));
- }
- } else {
- self._processSigMessage(message);
- self._trigger('channelMessage', message, clone(self._socketSession));
- }
- });
-
- self._joinRoomManager.socketsFn.push(function (currentJoinRoomTimestamp) {
- if (currentJoinRoomTimestamp !== joinRoomTimestamp) {
- socket.disconnect();
- }
- });
- self._socket = socket;
- };
-
- /**
- * Function that starts the socket connection to the Signaling.
- * This starts creating the socket connection and called at first not when requiring to fallback.
- * @method _openChannel
- * @private
- * @for Skylink
- * @since 0.5.5
- */
- Skylink.prototype._openChannel = function(joinRoomTimestamp) {
- var self = this;
- if (self._channelOpen) {
- log.error([null, 'Socket', null, 'Unable to instantiate a new channel connection ' +
- 'as there is already an ongoing channel connection']);
- return;
- }
-
- if (self._readyState !== self.READY_STATE_CHANGE.COMPLETED) {
- log.error([null, 'Socket', null, 'Unable to instantiate a new channel connection ' +
- 'as readyState is not ready']);
- return;
- }
-
- // set if forceSSL
- if (self._initOptions.forceSSL) {
- self._signalingServerProtocol = 'https:';
- } else {
- self._signalingServerProtocol = window.location.protocol;
- }
-
- var socketType = 'WebSocket';
-
- // For IE < 9 that doesn't support WebSocket
- if (!window.WebSocket) {
- socketType = 'Polling';
- }
-
- self._socketSession.finalAttempts = 0;
- self._socketSession.attempts = 0;
- self._signalingServerPort = null;
-
- // Begin with a websocket connection
- self._createSocket(socketType, joinRoomTimestamp);
- };
-
- /**
- * Function that stops the socket connection to the Signaling.
- * @method _closeChannel
- * @private
- * @for Skylink
- * @since 0.5.5
- */
- Skylink.prototype._closeChannel = function() {
- if (this._socket) {
- this._socket.removeAllListeners('connect_error');
- this._socket.removeAllListeners('reconnect_attempt');
- this._socket.removeAllListeners('reconnect_error');
- this._socket.removeAllListeners('reconnect_failed');
- this._socket.removeAllListeners('connect');
- this._socket.removeAllListeners('reconnect');
- this._socket.removeAllListeners('error');
- this._socket.removeAllListeners('disconnect');
- this._socket.removeAllListeners('message');
- }
-
- if (this._channelOpen) {
- if (this._socket) {
- this._socket.disconnect();
- }
-
- log.log([null, 'Socket', null, 'Channel closed']);
-
- this._channelOpen = false;
- this._trigger('channelClose', clone(this._socketSession));
-
- if (this._inRoom && this._user && this._user.sid) {
- this.leaveRoom(false);
- this._trigger('sessionDisconnect', this._user.sid, this.getPeerInfo());
- }
- }
-
- this._socket = null;
- };
-