You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
581 lines
19 KiB
581 lines
19 KiB
import { transports } from "./transports/index.js"; |
|
import { installTimerFunctions, byteLength } from "./util.js"; |
|
import { decode } from "./contrib/parseqs.js"; |
|
import { parse } from "./contrib/parseuri.js"; |
|
import { Emitter } from "@socket.io/component-emitter"; |
|
import { protocol } from "engine.io-parser"; |
|
export class Socket extends Emitter { |
|
/** |
|
* Socket constructor. |
|
* |
|
* @param {String|Object} uri or options |
|
* @param {Object} opts - options |
|
* @api public |
|
*/ |
|
constructor(uri, opts = {}) { |
|
super(); |
|
if (uri && "object" === typeof uri) { |
|
opts = uri; |
|
uri = null; |
|
} |
|
if (uri) { |
|
uri = parse(uri); |
|
opts.hostname = uri.host; |
|
opts.secure = uri.protocol === "https" || uri.protocol === "wss"; |
|
opts.port = uri.port; |
|
if (uri.query) |
|
opts.query = uri.query; |
|
} |
|
else if (opts.host) { |
|
opts.hostname = parse(opts.host).host; |
|
} |
|
installTimerFunctions(this, opts); |
|
this.secure = |
|
null != opts.secure |
|
? opts.secure |
|
: typeof location !== "undefined" && "https:" === location.protocol; |
|
if (opts.hostname && !opts.port) { |
|
// if no port is specified manually, use the protocol default |
|
opts.port = this.secure ? "443" : "80"; |
|
} |
|
this.hostname = |
|
opts.hostname || |
|
(typeof location !== "undefined" ? location.hostname : "localhost"); |
|
this.port = |
|
opts.port || |
|
(typeof location !== "undefined" && location.port |
|
? location.port |
|
: this.secure |
|
? "443" |
|
: "80"); |
|
this.transports = opts.transports || ["polling", "websocket"]; |
|
this.readyState = ""; |
|
this.writeBuffer = []; |
|
this.prevBufferLen = 0; |
|
this.opts = Object.assign({ |
|
path: "/engine.io", |
|
agent: false, |
|
withCredentials: false, |
|
upgrade: true, |
|
timestampParam: "t", |
|
rememberUpgrade: false, |
|
rejectUnauthorized: true, |
|
perMessageDeflate: { |
|
threshold: 1024 |
|
}, |
|
transportOptions: {}, |
|
closeOnBeforeunload: true |
|
}, opts); |
|
this.opts.path = this.opts.path.replace(/\/$/, "") + "/"; |
|
if (typeof this.opts.query === "string") { |
|
this.opts.query = decode(this.opts.query); |
|
} |
|
// set on handshake |
|
this.id = null; |
|
this.upgrades = null; |
|
this.pingInterval = null; |
|
this.pingTimeout = null; |
|
// set on heartbeat |
|
this.pingTimeoutTimer = null; |
|
if (typeof addEventListener === "function") { |
|
if (this.opts.closeOnBeforeunload) { |
|
// Firefox closes the connection when the "beforeunload" event is emitted but not Chrome. This event listener |
|
// ensures every browser behaves the same (no "disconnect" event at the Socket.IO level when the page is |
|
// closed/reloaded) |
|
this.beforeunloadEventListener = () => { |
|
if (this.transport) { |
|
// silently close the transport |
|
this.transport.removeAllListeners(); |
|
this.transport.close(); |
|
} |
|
}; |
|
addEventListener("beforeunload", this.beforeunloadEventListener, false); |
|
} |
|
if (this.hostname !== "localhost") { |
|
this.offlineEventListener = () => { |
|
this.onClose("transport close", { |
|
description: "network connection lost" |
|
}); |
|
}; |
|
addEventListener("offline", this.offlineEventListener, false); |
|
} |
|
} |
|
this.open(); |
|
} |
|
/** |
|
* Creates transport of the given type. |
|
* |
|
* @param {String} transport name |
|
* @return {Transport} |
|
* @api private |
|
*/ |
|
createTransport(name) { |
|
const query = Object.assign({}, this.opts.query); |
|
// append engine.io protocol identifier |
|
query.EIO = protocol; |
|
// transport name |
|
query.transport = name; |
|
// session id if we already have one |
|
if (this.id) |
|
query.sid = this.id; |
|
const opts = Object.assign({}, this.opts.transportOptions[name], this.opts, { |
|
query, |
|
socket: this, |
|
hostname: this.hostname, |
|
secure: this.secure, |
|
port: this.port |
|
}); |
|
return new transports[name](opts); |
|
} |
|
/** |
|
* Initializes transport to use and starts probe. |
|
* |
|
* @api private |
|
*/ |
|
open() { |
|
let transport; |
|
if (this.opts.rememberUpgrade && |
|
Socket.priorWebsocketSuccess && |
|
this.transports.indexOf("websocket") !== -1) { |
|
transport = "websocket"; |
|
} |
|
else if (0 === this.transports.length) { |
|
// Emit error on next tick so it can be listened to |
|
this.setTimeoutFn(() => { |
|
this.emitReserved("error", "No transports available"); |
|
}, 0); |
|
return; |
|
} |
|
else { |
|
transport = this.transports[0]; |
|
} |
|
this.readyState = "opening"; |
|
// Retry with the next transport if the transport is disabled (jsonp: false) |
|
try { |
|
transport = this.createTransport(transport); |
|
} |
|
catch (e) { |
|
this.transports.shift(); |
|
this.open(); |
|
return; |
|
} |
|
transport.open(); |
|
this.setTransport(transport); |
|
} |
|
/** |
|
* Sets the current transport. Disables the existing one (if any). |
|
* |
|
* @api private |
|
*/ |
|
setTransport(transport) { |
|
if (this.transport) { |
|
this.transport.removeAllListeners(); |
|
} |
|
// set up transport |
|
this.transport = transport; |
|
// set up transport listeners |
|
transport |
|
.on("drain", this.onDrain.bind(this)) |
|
.on("packet", this.onPacket.bind(this)) |
|
.on("error", this.onError.bind(this)) |
|
.on("close", reason => this.onClose("transport close", reason)); |
|
} |
|
/** |
|
* Probes a transport. |
|
* |
|
* @param {String} transport name |
|
* @api private |
|
*/ |
|
probe(name) { |
|
let transport = this.createTransport(name); |
|
let failed = false; |
|
Socket.priorWebsocketSuccess = false; |
|
const onTransportOpen = () => { |
|
if (failed) |
|
return; |
|
transport.send([{ type: "ping", data: "probe" }]); |
|
transport.once("packet", msg => { |
|
if (failed) |
|
return; |
|
if ("pong" === msg.type && "probe" === msg.data) { |
|
this.upgrading = true; |
|
this.emitReserved("upgrading", transport); |
|
if (!transport) |
|
return; |
|
Socket.priorWebsocketSuccess = "websocket" === transport.name; |
|
this.transport.pause(() => { |
|
if (failed) |
|
return; |
|
if ("closed" === this.readyState) |
|
return; |
|
cleanup(); |
|
this.setTransport(transport); |
|
transport.send([{ type: "upgrade" }]); |
|
this.emitReserved("upgrade", transport); |
|
transport = null; |
|
this.upgrading = false; |
|
this.flush(); |
|
}); |
|
} |
|
else { |
|
const err = new Error("probe error"); |
|
// @ts-ignore |
|
err.transport = transport.name; |
|
this.emitReserved("upgradeError", err); |
|
} |
|
}); |
|
}; |
|
function freezeTransport() { |
|
if (failed) |
|
return; |
|
// Any callback called by transport should be ignored since now |
|
failed = true; |
|
cleanup(); |
|
transport.close(); |
|
transport = null; |
|
} |
|
// Handle any error that happens while probing |
|
const onerror = err => { |
|
const error = new Error("probe error: " + err); |
|
// @ts-ignore |
|
error.transport = transport.name; |
|
freezeTransport(); |
|
this.emitReserved("upgradeError", error); |
|
}; |
|
function onTransportClose() { |
|
onerror("transport closed"); |
|
} |
|
// When the socket is closed while we're probing |
|
function onclose() { |
|
onerror("socket closed"); |
|
} |
|
// When the socket is upgraded while we're probing |
|
function onupgrade(to) { |
|
if (transport && to.name !== transport.name) { |
|
freezeTransport(); |
|
} |
|
} |
|
// Remove all listeners on the transport and on self |
|
const cleanup = () => { |
|
transport.removeListener("open", onTransportOpen); |
|
transport.removeListener("error", onerror); |
|
transport.removeListener("close", onTransportClose); |
|
this.off("close", onclose); |
|
this.off("upgrading", onupgrade); |
|
}; |
|
transport.once("open", onTransportOpen); |
|
transport.once("error", onerror); |
|
transport.once("close", onTransportClose); |
|
this.once("close", onclose); |
|
this.once("upgrading", onupgrade); |
|
transport.open(); |
|
} |
|
/** |
|
* Called when connection is deemed open. |
|
* |
|
* @api private |
|
*/ |
|
onOpen() { |
|
this.readyState = "open"; |
|
Socket.priorWebsocketSuccess = "websocket" === this.transport.name; |
|
this.emitReserved("open"); |
|
this.flush(); |
|
// we check for `readyState` in case an `open` |
|
// listener already closed the socket |
|
if ("open" === this.readyState && |
|
this.opts.upgrade && |
|
this.transport.pause) { |
|
let i = 0; |
|
const l = this.upgrades.length; |
|
for (; i < l; i++) { |
|
this.probe(this.upgrades[i]); |
|
} |
|
} |
|
} |
|
/** |
|
* Handles a packet. |
|
* |
|
* @api private |
|
*/ |
|
onPacket(packet) { |
|
if ("opening" === this.readyState || |
|
"open" === this.readyState || |
|
"closing" === this.readyState) { |
|
this.emitReserved("packet", packet); |
|
// Socket is live - any packet counts |
|
this.emitReserved("heartbeat"); |
|
switch (packet.type) { |
|
case "open": |
|
this.onHandshake(JSON.parse(packet.data)); |
|
break; |
|
case "ping": |
|
this.resetPingTimeout(); |
|
this.sendPacket("pong"); |
|
this.emitReserved("ping"); |
|
this.emitReserved("pong"); |
|
break; |
|
case "error": |
|
const err = new Error("server error"); |
|
// @ts-ignore |
|
err.code = packet.data; |
|
this.onError(err); |
|
break; |
|
case "message": |
|
this.emitReserved("data", packet.data); |
|
this.emitReserved("message", packet.data); |
|
break; |
|
} |
|
} |
|
else { |
|
} |
|
} |
|
/** |
|
* Called upon handshake completion. |
|
* |
|
* @param {Object} data - handshake obj |
|
* @api private |
|
*/ |
|
onHandshake(data) { |
|
this.emitReserved("handshake", data); |
|
this.id = data.sid; |
|
this.transport.query.sid = data.sid; |
|
this.upgrades = this.filterUpgrades(data.upgrades); |
|
this.pingInterval = data.pingInterval; |
|
this.pingTimeout = data.pingTimeout; |
|
this.maxPayload = data.maxPayload; |
|
this.onOpen(); |
|
// In case open handler closes socket |
|
if ("closed" === this.readyState) |
|
return; |
|
this.resetPingTimeout(); |
|
} |
|
/** |
|
* Sets and resets ping timeout timer based on server pings. |
|
* |
|
* @api private |
|
*/ |
|
resetPingTimeout() { |
|
this.clearTimeoutFn(this.pingTimeoutTimer); |
|
this.pingTimeoutTimer = this.setTimeoutFn(() => { |
|
this.onClose("ping timeout"); |
|
}, this.pingInterval + this.pingTimeout); |
|
if (this.opts.autoUnref) { |
|
this.pingTimeoutTimer.unref(); |
|
} |
|
} |
|
/** |
|
* Called on `drain` event |
|
* |
|
* @api private |
|
*/ |
|
onDrain() { |
|
this.writeBuffer.splice(0, this.prevBufferLen); |
|
// setting prevBufferLen = 0 is very important |
|
// for example, when upgrading, upgrade packet is sent over, |
|
// and a nonzero prevBufferLen could cause problems on `drain` |
|
this.prevBufferLen = 0; |
|
if (0 === this.writeBuffer.length) { |
|
this.emitReserved("drain"); |
|
} |
|
else { |
|
this.flush(); |
|
} |
|
} |
|
/** |
|
* Flush write buffers. |
|
* |
|
* @api private |
|
*/ |
|
flush() { |
|
if ("closed" !== this.readyState && |
|
this.transport.writable && |
|
!this.upgrading && |
|
this.writeBuffer.length) { |
|
const packets = this.getWritablePackets(); |
|
this.transport.send(packets); |
|
// keep track of current length of writeBuffer |
|
// splice writeBuffer and callbackBuffer on `drain` |
|
this.prevBufferLen = packets.length; |
|
this.emitReserved("flush"); |
|
} |
|
} |
|
/** |
|
* Ensure the encoded size of the writeBuffer is below the maxPayload value sent by the server (only for HTTP |
|
* long-polling) |
|
* |
|
* @private |
|
*/ |
|
getWritablePackets() { |
|
const shouldCheckPayloadSize = this.maxPayload && |
|
this.transport.name === "polling" && |
|
this.writeBuffer.length > 1; |
|
if (!shouldCheckPayloadSize) { |
|
return this.writeBuffer; |
|
} |
|
let payloadSize = 1; // first packet type |
|
for (let i = 0; i < this.writeBuffer.length; i++) { |
|
const data = this.writeBuffer[i].data; |
|
if (data) { |
|
payloadSize += byteLength(data); |
|
} |
|
if (i > 0 && payloadSize > this.maxPayload) { |
|
return this.writeBuffer.slice(0, i); |
|
} |
|
payloadSize += 2; // separator + packet type |
|
} |
|
return this.writeBuffer; |
|
} |
|
/** |
|
* Sends a message. |
|
* |
|
* @param {String} message. |
|
* @param {Function} callback function. |
|
* @param {Object} options. |
|
* @return {Socket} for chaining. |
|
* @api public |
|
*/ |
|
write(msg, options, fn) { |
|
this.sendPacket("message", msg, options, fn); |
|
return this; |
|
} |
|
send(msg, options, fn) { |
|
this.sendPacket("message", msg, options, fn); |
|
return this; |
|
} |
|
/** |
|
* Sends a packet. |
|
* |
|
* @param {String} packet type. |
|
* @param {String} data. |
|
* @param {Object} options. |
|
* @param {Function} callback function. |
|
* @api private |
|
*/ |
|
sendPacket(type, data, options, fn) { |
|
if ("function" === typeof data) { |
|
fn = data; |
|
data = undefined; |
|
} |
|
if ("function" === typeof options) { |
|
fn = options; |
|
options = null; |
|
} |
|
if ("closing" === this.readyState || "closed" === this.readyState) { |
|
return; |
|
} |
|
options = options || {}; |
|
options.compress = false !== options.compress; |
|
const packet = { |
|
type: type, |
|
data: data, |
|
options: options |
|
}; |
|
this.emitReserved("packetCreate", packet); |
|
this.writeBuffer.push(packet); |
|
if (fn) |
|
this.once("flush", fn); |
|
this.flush(); |
|
} |
|
/** |
|
* Closes the connection. |
|
* |
|
* @api public |
|
*/ |
|
close() { |
|
const close = () => { |
|
this.onClose("forced close"); |
|
this.transport.close(); |
|
}; |
|
const cleanupAndClose = () => { |
|
this.off("upgrade", cleanupAndClose); |
|
this.off("upgradeError", cleanupAndClose); |
|
close(); |
|
}; |
|
const waitForUpgrade = () => { |
|
// wait for upgrade to finish since we can't send packets while pausing a transport |
|
this.once("upgrade", cleanupAndClose); |
|
this.once("upgradeError", cleanupAndClose); |
|
}; |
|
if ("opening" === this.readyState || "open" === this.readyState) { |
|
this.readyState = "closing"; |
|
if (this.writeBuffer.length) { |
|
this.once("drain", () => { |
|
if (this.upgrading) { |
|
waitForUpgrade(); |
|
} |
|
else { |
|
close(); |
|
} |
|
}); |
|
} |
|
else if (this.upgrading) { |
|
waitForUpgrade(); |
|
} |
|
else { |
|
close(); |
|
} |
|
} |
|
return this; |
|
} |
|
/** |
|
* Called upon transport error |
|
* |
|
* @api private |
|
*/ |
|
onError(err) { |
|
Socket.priorWebsocketSuccess = false; |
|
this.emitReserved("error", err); |
|
this.onClose("transport error", err); |
|
} |
|
/** |
|
* Called upon transport close. |
|
* |
|
* @api private |
|
*/ |
|
onClose(reason, description) { |
|
if ("opening" === this.readyState || |
|
"open" === this.readyState || |
|
"closing" === this.readyState) { |
|
// clear timers |
|
this.clearTimeoutFn(this.pingTimeoutTimer); |
|
// stop event from firing again for transport |
|
this.transport.removeAllListeners("close"); |
|
// ensure transport won't stay open |
|
this.transport.close(); |
|
// ignore further transport communication |
|
this.transport.removeAllListeners(); |
|
if (typeof removeEventListener === "function") { |
|
removeEventListener("beforeunload", this.beforeunloadEventListener, false); |
|
removeEventListener("offline", this.offlineEventListener, false); |
|
} |
|
// set ready state |
|
this.readyState = "closed"; |
|
// clear session id |
|
this.id = null; |
|
// emit close event |
|
this.emitReserved("close", reason, description); |
|
// clean buffers after, so users can still |
|
// grab the buffers on `close` event |
|
this.writeBuffer = []; |
|
this.prevBufferLen = 0; |
|
} |
|
} |
|
/** |
|
* Filters upgrades, returning only those matching client transports. |
|
* |
|
* @param {Array} server upgrades |
|
* @api private |
|
* |
|
*/ |
|
filterUpgrades(upgrades) { |
|
const filteredUpgrades = []; |
|
let i = 0; |
|
const j = upgrades.length; |
|
for (; i < j; i++) { |
|
if (~this.transports.indexOf(upgrades[i])) |
|
filteredUpgrades.push(upgrades[i]); |
|
} |
|
return filteredUpgrades; |
|
} |
|
} |
|
Socket.protocol = protocol;
|
|
|