const Message = require('../lib/message');
const EventEmitterExtra = require('event-emitter-extra');
const assign = require('lodash/assign');
const forEach = require('lodash/forEach');
const isInteger = require('lodash/isInteger');
const isObject = require('lodash/isObject');
const debounce = require('lodash/debounce');
const Deferred = require('../lib/deferred');
const uuid = require('uuid');
const debug = require('debug')('line:server:connection');
const LineError = require('../lib/error');
const CloseStatus = require('../lib/closestatus');
/**
* Server connection class. Constructor of this class is not publicly accessible.
* When you listen `Server.Event.CONNECTION` or `Server.Event.HANDSHAKE`, an instance
* of `ServerConnection` will be emitted.
*
* @class ServerConnection
* @extends {EventEmitterExtra}
* @private
* @property {string} id Unique connection id
*/
class ServerConnection extends EventEmitterExtra {
constructor(socket, server) {
super();
this.id = uuid.v4();
debug(`Creating connection with id ${this.id} ...`);
this.socket = socket;
this.server = server;
this.state = ServerConnection.State.AWAITING_HANDSHAKE;
this.deferreds_ = {};
this.autoPing_ = debounce(() => {});
this.socket.on('message', this.onMessage_.bind(this));
this.socket.on('error', this.onError_.bind(this));
this.socket.on('close', this.onClose_.bind(this));
if (server.options.pingInterval > 0) {
this.autoPing_ = debounce(() => {
this
.ping()
.then(() => {
debug(`Auto-ping successful`);
if (server.options.pingInterval > 0 && this.state == ServerConnection.State.CONNECTED) {
this.autoPing_();
}
})
.catch((err) => {/* Disconnection is handled in ping */});
}, server.options.pingInterval);
}
if (server.options.handshakeTimeout > 0) {
this.handshakeTimeout_ = setTimeout(() => {
if (this.state != ServerConnection.State.AWAITING_HANDSHAKE) {
return debug(`Handshake is not awaiting, ignoring handshake timeout...`);
}
debug(`Handshake timeout exceed, closing the connection...`);
this.close(CloseStatus.HANDSHAKE_FAILED.code, `Handshake not completed after ${server.options.handshakeTimeout} ms`);
}, server.options.handshakeTimeout);
}
}
/**
* Native "message" event handler.
*
* @param {string|Buffer} data
* @param {Object} flags
* @param {boolean} flags.binary Specifies if data is binary.
* @param {boolean} flags.Boolean Specifies if data was masked.
* @ignore
*/
onMessage_(data, flags) {
debug(`Native "message" event recieved: ${data}`);
let message;
// A message is recieved, debounce our auto-ping handler if connected
if (this.state == ServerConnection.State.CONNECTED) {
this.autoPing_();
}
try {
message = Message.parse(data);
} catch (err) {
this.emit(ServerConnection.Event.ERROR, new LineError(
ServerConnection.ErrorCode.INVALID_JSON,
'Could not parse message, invalid json. Check payload for incoming data.',
data
));
return;
}
/**
* Route the incoming message
*/
if (message.name == Message.Name.HANDSHAKE) { // Handshake
this.onHandshakeMessage_(message);
} else if (message.name == Message.Name.PING) { // Ping
this.onPingMessage_(message);
} else if (message.name == Message.Name.RESPONSE) { // Message response
this.onResponseMessage_(message);
} else if (Message.ReservedNames.indexOf(message.name) == -1) { // If message name is not reserved
if (!message.id) { // Message without response (no id fields)
this.onMessageWithoutResponse_(message);
} else { // Message arrived awaiting its response
this.onMessageWithResponse_(message);
}
} else {
debug(`Could not route the message`, message);
}
}
/**
* On "handshake" message handler.
*
* @param {Message} message
* @ignore
*/
onHandshakeMessage_(message) {
if (this.state == ServerConnection.State.CONNECTED) {
debug(`Handshake message recieved but, handshake is already resolved, ignoring...`);
return this
.sendWithoutResponse_(message.createResponse(new Error('Handshake is already resolved')))
.catch(() => { /* Ignoring */ });
}
debug(`Handshake message recieved: ${message}`);
/**
* If handshake is resolved
*/
message.once('resolved', (payload) => {
debug(`Handshake is resolved, sending response...`);
this.state = ServerConnection.State.CONNECTED;
this.handshakeTimeout_ && clearTimeout(this.handshakeTimeout_);
this.autoPing_(); // Start auto-pinging
const responsePayload = {
payload,
id: this.id
};
this
.sendWithoutResponse_(message.createResponse(null, responsePayload))
.then(() => {
debug(`Handshake resolving response is sent, emitting connection...`);
this.server.rooms.root.add(this);
this.server.emit('connection', this);
})
.catch((err) => {
debug(`Could not send handshake response`, err);
// TODO: Emit these errors from the server
if (err instanceof LineError) {
switch (err.code) {
case ServerConnection.ErrorCode.DISCONNECTED:
debug(`Connection is gone before handshake completed, ignoring...`);
return;
case ServerConnection.ErrorCode.WEBSOCKET_ERROR:
// TODO: Try again!
debug('Native websocket error', err.payload);
return this.close(CloseStatus.HANDSHAKE_FAILED.code, CloseStatus.HANDSHAKE_FAILED.reason);
default:
debug('Unhandled line error', err);
return this.close(CloseStatus.HANDSHAKE_FAILED.code, CloseStatus.HANDSHAKE_FAILED.reason);
}
}
debug(`Unknown error`, err);
return this.close(CloseStatus.HANDSHAKE_FAILED.code, CloseStatus.HANDSHAKE_FAILED.reason);
})
.then(() => {
message.dispose();
});
});
/**
* Id handshake is rejected
*/
message.once('rejected', (err) => {
debug(`Handshake is rejected, sending response...`);
this
.sendWithoutResponse_(message.createResponse(err))
.catch(err => debug(`Handshake rejecting response could not sent, manually calling "close"...`, err))
.then(() => this.close(CloseStatus.HANDSHAKE_REJECTED.code, CloseStatus.HANDSHAKE_REJECTED.reason, 50))
.then(() => {
message.dispose();
});
});
/**
* Emit handshake event from the server
*/
debug(`Emitting server's "handshake" event...`);
const handshakeListener = this.server.emit('handshake', this, message);
if (!handshakeListener) {
debug(`There is no handshake listener, resolving the handshake by default...`);
message.resolve();
}
}
/**
* On "ping" message handler. Reply with pong.
*
* @param {Message} message
* @ignore
*/
onPingMessage_(message) {
debug('Ping received, responding with "pong"...');
this
.sendWithoutResponse_(message.createResponse(null, 'pong'))
.catch(err => debug('Ping response failed to send back, ignoring for now...', err));
}
/**
* A message is recieved, and its response is expected.
*
* @param {Message} message
* @ignore
*/
onResponseMessage_(message) {
const deferred = this.deferreds_[message.id];
if (!deferred) return;
if (message.err) {
debug(`Response (rejecting) recieved: ${message}`);
const err = new LineError(
ServerConnection.ErrorCode.MESSAGE_REJECTED,
'Message is rejected by server, check payload.',
message.err
);
deferred.reject(err);
} else {
debug(`Response (resolving) recieved: ${message}`);
deferred.resolve(message.payload);
}
delete this.deferreds_[message.id];
}
/**
* A message is arrived without waiting its response.
*
* @param {Message} message
* @ignore
*/
onMessageWithoutResponse_(message) {
debug(`Message without response: name="${message.name}"`);
this.emit(message.name, message);
}
/**
* A message is arrived and the client is expecting its response.
*
* @param {Message} message
* @ignore
*/
onMessageWithResponse_(message) {
debug(`Message with response: name="${message.name}" id="${message.id}"`);
message.once('resolved', (payload) => {
debug(`Message #${message.id} is resolved, sending response...`);
this
.sendWithoutResponse_(message.createResponse(null, payload))
.catch((err) => {
this.emit(ServerConnection.Event.ERROR, new LineError(
ServerConnection.ErrorCode.MESSAGE_NOT_RESPONDED,
`Message (name="${message.name}" id="${message.id}") could not responded (resolve)`,
err
));
})
.then(() => message.dispose());
});
message.once('rejected', (err) => {
debug(`Message #${message.id} is rejected, sending response...`);
this
.sendWithoutResponse_(message.createResponse(err))
.catch((err) => {
this.emit(ServerConnection.Event.ERROR, new LineError(
ServerConnection.ErrorCode.MESSAGE_NOT_RESPONDED,
`Message (name="${message.name}" id="${message.id}") could not responded (reject)`,
err
));
})
.then(() => message.dispose());
});
this.emit(message.name, message);
}
/**
* Native "error" event.
*
* @param {Error} err
* @ignore
*/
onError_(err) {
debug(`Native "error" event recieved, emitting line's "error" event: ${err}`);
this.emit(ServerConnection.Event.ERROR, err);
}
/**
* Native "close" event.
*
* @param {number} code
* @param {string=} reason
* @ignore
*/
onClose_(code, reason) {
debug(`Native "close" event recieved with code ${code}: ${reason}`);
debug(`Removing connection from all rooms, rejecting all waiting messages...`);
this.handshakeTimeout_ && clearTimeout(this.handshakeTimeout_);
this.autoPing_.cancel();
this.server.rooms.removeFromAll(this);
this.server.rooms.root.remove(this);
this.rejectAllDeferreds_(new LineError(ServerConnection.ErrorCode.DISCONNECTED, 'Socket connection closed!'));
debug(`Emitting line's "close" event...`);
this.state = ServerConnection.State.DISCONNECTED;
this.emit(ServerConnection.Event.DISCONNECTED, code, reason);
}
/**
* Changes connection's id, it's random by default. This method is helpful if you already have
* custom identification for your clients. You must do this before handshake resolved. If
* handshake is already resolved or there is conflict, this method will throw error.
*
* Throws:
* - `ServerConnection.ErrorCode.HANDSHAKE_ENDED`: Id could not be changed after handshake
* - `ServerConnection.ErrorCode.ID_CONFLICT`: There is alrady another connection with provided id.
*
* @param {string} newId New connection id
* @memberOf ServerConnection
* @example
* server.on(Server.Event.HANDSHAKE, (connection, handshake) => {
* // Assuming client's `options.handshake.payload` is something like `{authToken: '...'}`
*
* // Imaginary db
* db.find(handshake.payload.authToken, (record) => {
* if (!record) return handshake.reject(new Error('Invalid auth token'));
* connection.setId(record.id);
* handshake.resolve(record);
* });
* });
*/
setId(newId) {
if (this.state != ServerConnection.State.AWAITING_HANDSHAKE) {
throw new LineError(
ServerConnection.ErrorCode.HANDSHAKE_ENDED,
'Handshake already ended, you cannot change connection id anymore'
);
}
if (this.server.getConnectionById(newId)) {
throw new LineError(
ServerConnection.ErrorCode.ID_CONFLICT,
`Conflict! There is already connection with id ${newId}`
);
}
this.id = newId;
}
/**
* Joins the connection into provided room. If there is no room, it will be created automatically.
*
* @param {string} roomName
* @memberOf ServerConnection
*/
joinRoom(roomName) {
this.server.rooms.add(roomName, this);
}
/**
* Leaves the connection from provided room.
*
* @param {string} roomName
* @memberOf ServerConnection
*/
leaveRoom(roomName) {
this.server.rooms.remove(roomName, this);
}
/**
* Gets the joined room names.
*
* @returns {Array<string>}
* @memberOf ServerConnection
*/
getRooms() {
return this.server.rooms.getRoomsOf(this);
}
/**
* Sends a message to client with awaiting its response. This method returns a promise
* which resolves the payload parameter will be passed into `message.resolve(...)` in client-side.
*
* If client rejects the message with `message.reject(...)`, this promise will be rejected with
* `ServerConnection.ErrorCode.MESSAGE_REJECTED`. You can access the original error object with `err.payload`.
*
* Rejections:
* - `ServerConnection.ErrorCode.INVALID_JSON`: Could not stringify the message payload. Probably circular json.
* - `ServerConnection.ErrorCode.MESSAGE_REJECTED`: Message is explicitly rejected by the client.
* - `ServerConnection.ErrorCode.MESSAGE_TIMEOUT`: Message response did not arrived, timeout exceeded.
* - `ServerConnection.ErrorCode.DISCONNECTED`: Client is not connected (& handshake resolved) or connection is closing
* - `ServerConnection.ErrorCode.WEBSOCKET_ERROR`: Native websocket error
*
* @param {string} name
* @param {any=} payload
* @param {number=} timout
* @returns {Promise<any>}
* @memberOf ServerConnection
* @example
* connection
* .send('hello', {optional: 'payload'})
* .then((data) => {
* // Message is resolved by client
* })
* .catch((err) => {
* // Could not send message
* // or
* // Client rejected the message!
* });
*/
send(name, payload, opt_timeout) { // This method is for external usage!
if (this.state != ServerConnection.State.CONNECTED) {
return Promise.reject(new LineError(
ServerConnection.ErrorCode.DISCONNECTED,
`Could not send message, client is not connected.`
));
}
try {
const message = new Message({name, payload});
return this.send_(message, opt_timeout);
} catch (err) {
// `err` can only be Message.ErrorCode.INVALID_JSON
return Promise.reject(new LineError(
ServerConnection.ErrorCode.INVALID_JSON,
`Could not send message, "payload" stringify error. Probably circular json issue.`
));
}
}
/**
* Sends a message to client without waiting its response. This method returns a promise
* that resolves with nothing if the message is successfully sent.
*
* Rejections:
* - `ServerConnection.ErrorCode.INVALID_JSON`: Could not stringify the message payload. Probably circular json.
* - `ServerConnection.ErrorCode.DISCONNECTED`: Client is not connected (& handshake resolved) or connection is closing
* - `ServerConnection.ErrorCode.WEBSOCKET_ERROR`: Native websocket error
*
* @param {string} name
* @param {any=} payload
* @returns {Promise}
* @memberOf ServerConnection
* @example
* connection
* .sendWithoutResponse('hello', {optional: 'payload'})
* .then(() => {
* // Message sent successfully
* })
* .catch((err) => {
* // Message could not be sent to client
* })
*/
sendWithoutResponse(name, payload) { // For external usage
if (this.state != ServerConnection.State.CONNECTED) {
return Promise.reject(new LineError(
ServerConnection.ErrorCode.DISCONNECTED,
`Could not send message, client is not connected.`
));
}
try {
const message = new Message({name, payload}); // Can throw Message.ErrorCode.INVALID_JSON
return this.sendWithoutResponse_(message);
} catch (err) {
// `err` can only be Message.ErrorCode.INVALID_JSON
return Promise.reject(new LineError(
ServerConnection.ErrorCode.INVALID_JSON,
`Could not send message, "payload" stringify error. Probably circular json issue.`
));
}
}
/**
* Base method for sending a message with timeout. Please favor this method internally
* instead of using `send` method.
*
* Rejections:
* - `ServerConnection.ErrorCode.MESSAGE_REJECTED`: Message is explicitly rejected by the client.
* - `ServerConnection.ErrorCode.MESSAGE_TIMEOUT`: Message response did not arrived, timeout exceeded.
* - `ServerConnection.ErrorCode.DISCONNECTED`: Client is not connected (& handshake resolved) or connection is closing
* - `ServerConnection.ErrorCode.WEBSOCKET_ERROR`: Native websocket error
*
* @param {Message} message
* @param {number=} opt_timeout
* @returns {Promise}
* @ignore
*/
send_(message, opt_timeout) {
const timeout = isInteger(opt_timeout) && opt_timeout >= 0 ? opt_timeout : this.server.options.responseTimeout;
message.setId();
const deferred = this.deferreds_[message.id] = new Deferred({
onExpire: () => {
delete this.deferreds_[message.id];
},
timeout: timeout
});
return this
.sendWithoutResponse_(message)
.then(() => deferred)
.catch((err) => {
deferred.dispose();
// Convert expired -> timeout error
if (err instanceof LineError && err.code == Deferred.ErrorCode.EXPIRED) {
throw new LineError(
ServerConnection.ErrorCode.MESSAGE_TIMEOUT,
`Message timeout! Its response did not recived after ${timeout} ms`
);
}
throw err;
});
}
/**
* Base method for sending a message without response. Please favor this method internally
* instead of using `sendWithoutResponse` method.
*
* Rejections:
* - `ServerConnection.ErrorCode.DISCONNECTED`: Client is not connected (& handshake resolved) or connection is closing
* - `ServerConnection.ErrorCode.WEBSOCKET_ERROR`: Native websocket error
*
* @param {Message} message
* @returns {Promise}
* @ignore
*/
sendWithoutResponse_(message) {
if (!this.socket || this.socket.readyState != 1) {
return Promise.reject(new LineError(
ServerConnection.ErrorCode.DISCONNECTED,
`Could not send message, there is no open connection.`
));
}
return new Promise((resolve, reject) => {
debug(`Sending message: ${message}`);
const messageStr = message.toString();
this.socket.send(messageStr, (err) => {
if (err) {
return reject(new LineError(
ServerConnection.ErrorCode.WEBSOCKET_ERROR,
`Could not send message, native websocket error, check payload.`,
err
));
}
resolve();
});
});
}
/**
* Pings the client. If there is no respose, closes the connection.
*
* @returns {Promise}
* @memberOf ServerConnection
*/
ping() {
debug(`Pinging...`);
return this
.send_(new Message({name: Message.Name.PING}))
.catch(err => {
// No matter what error is, start disconnection process
debug('Auto-ping failed, manually disconnecting...');
this.close(CloseStatus.PING_FAILED.code, CloseStatus.PING_FAILED.reason);
throw new LineError(
ServerConnection.ErrorCode.PING_ERROR,
`Ping failed, manually disconnecting...`,
err
);
});
}
/**
* Gracefully closes the client connection.
*
* @param {number=} code
* @param {string=} reason
* @param {number=} delay
* @returns {Promise}
*/
close(code, reason, delay) {
debug(`Closing the connection in ${delay || 0} ms...`);
return new Promise((resolve) => {
setTimeout(() => {
this.socket.close(code || 1000, reason);
resolve();
}, delay || 0);
});
}
/**
* Reject all the awaiting deferred with given error.
*
* @param {Error} err An error object to reject all awaiting deferreds.
* @ignore
*/
rejectAllDeferreds_(err) {
forEach(this.deferreds_, deferred => deferred.reject(err));
this.deferreds_ = {};
}
}
/**
* @static
* @readonly
* @enum {string}
*/
ServerConnection.ErrorCode = {
/**
* This error can be seen in rejection of `serverConnection.send()` method.
*/
MESSAGE_TIMEOUT: 'scMessageTimeout',
/**
* This error can be seen in rejection of `serverConnection.send()` method,
* which again indicates that server is explicitly rejected the message.
*/
MESSAGE_REJECTED: 'scMessageRejected',
/**
* When the response of a message failed to send to client, this error
* will be emitted in `ServerConnection.Event.ERROR` event.
*/
MESSAGE_NOT_RESPONDED: 'cMessageNotResponded',
/**
* Indicates an error while json parsing/stringify.
*/
INVALID_JSON: 'scInvalidJson',
/**
* This error can be thrown in `serverConnection.setId()`. Connection id
* cannot be set after handshake.
*/
HANDSHAKE_ENDED: 'scHandshakeEnded',
/**
* This error can be seen while using `serverConnection.setId()`. If there is
* already connection with that id, this error will be thrown.
*/
ID_CONFLICT: 'scIdConflict',
/**
* This error indicates client is disconnected.
*/
DISCONNECTED: 'scDisconnected',
/**
* This error is for native websocket errors.
*/
WEBSOCKET_ERROR: 'scWebsocketError',
/**
* This error can be seen in rejection of `serverConnection.ping()` method.
*/
PING_ERROR: 'scPingError'
};
/**
* @static
* @readonly
* @enum {string}
*/
ServerConnection.State = {
/**
* `awaitingHandshake` Connection is open but handshake is not completed yet.
*/
AWAITING_HANDSHAKE: 'awaitingHandshake',
/**
* `connected` Connection is open and handshake resolved.
*/
CONNECTED: 'connected',
/**
* `disconnected` There is no open connection.
*/
DISCONNECTED: 'disconnected'
};
/**
* @static
* @readonly
* @enum {string}
*/
ServerConnection.Event = {
/**
* `_error`
*/
ERROR: '_error',
/**
* `_close`
*/
DISCONNECTED: '_disconnected'
};
module.exports = ServerConnection;