You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
377 lines
11 KiB
377 lines
11 KiB
import { Socket as Engine, installTimerFunctions, nextTick, } from "engine.io-client"; |
|
import { Socket } from "./socket.js"; |
|
import * as parser from "socket.io-parser"; |
|
import { on } from "./on.js"; |
|
import { Backoff } from "./contrib/backo2.js"; |
|
import { Emitter, } from "@socket.io/component-emitter"; |
|
import debugModule from "debug"; // debug() |
|
const debug = debugModule("socket.io-client:manager"); // debug() |
|
export class Manager extends Emitter { |
|
constructor(uri, opts) { |
|
var _a; |
|
super(); |
|
this.nsps = {}; |
|
this.subs = []; |
|
if (uri && "object" === typeof uri) { |
|
opts = uri; |
|
uri = undefined; |
|
} |
|
opts = opts || {}; |
|
opts.path = opts.path || "/socket.io"; |
|
this.opts = opts; |
|
installTimerFunctions(this, opts); |
|
this.reconnection(opts.reconnection !== false); |
|
this.reconnectionAttempts(opts.reconnectionAttempts || Infinity); |
|
this.reconnectionDelay(opts.reconnectionDelay || 1000); |
|
this.reconnectionDelayMax(opts.reconnectionDelayMax || 5000); |
|
this.randomizationFactor((_a = opts.randomizationFactor) !== null && _a !== void 0 ? _a : 0.5); |
|
this.backoff = new Backoff({ |
|
min: this.reconnectionDelay(), |
|
max: this.reconnectionDelayMax(), |
|
jitter: this.randomizationFactor(), |
|
}); |
|
this.timeout(null == opts.timeout ? 20000 : opts.timeout); |
|
this._readyState = "closed"; |
|
this.uri = uri; |
|
const _parser = opts.parser || parser; |
|
this.encoder = new _parser.Encoder(); |
|
this.decoder = new _parser.Decoder(); |
|
this._autoConnect = opts.autoConnect !== false; |
|
if (this._autoConnect) |
|
this.open(); |
|
} |
|
reconnection(v) { |
|
if (!arguments.length) |
|
return this._reconnection; |
|
this._reconnection = !!v; |
|
return this; |
|
} |
|
reconnectionAttempts(v) { |
|
if (v === undefined) |
|
return this._reconnectionAttempts; |
|
this._reconnectionAttempts = v; |
|
return this; |
|
} |
|
reconnectionDelay(v) { |
|
var _a; |
|
if (v === undefined) |
|
return this._reconnectionDelay; |
|
this._reconnectionDelay = v; |
|
(_a = this.backoff) === null || _a === void 0 ? void 0 : _a.setMin(v); |
|
return this; |
|
} |
|
randomizationFactor(v) { |
|
var _a; |
|
if (v === undefined) |
|
return this._randomizationFactor; |
|
this._randomizationFactor = v; |
|
(_a = this.backoff) === null || _a === void 0 ? void 0 : _a.setJitter(v); |
|
return this; |
|
} |
|
reconnectionDelayMax(v) { |
|
var _a; |
|
if (v === undefined) |
|
return this._reconnectionDelayMax; |
|
this._reconnectionDelayMax = v; |
|
(_a = this.backoff) === null || _a === void 0 ? void 0 : _a.setMax(v); |
|
return this; |
|
} |
|
timeout(v) { |
|
if (!arguments.length) |
|
return this._timeout; |
|
this._timeout = v; |
|
return this; |
|
} |
|
/** |
|
* Starts trying to reconnect if reconnection is enabled and we have not |
|
* started reconnecting yet |
|
* |
|
* @private |
|
*/ |
|
maybeReconnectOnOpen() { |
|
// Only try to reconnect if it's the first time we're connecting |
|
if (!this._reconnecting && |
|
this._reconnection && |
|
this.backoff.attempts === 0) { |
|
// keeps reconnection from firing twice for the same reconnection loop |
|
this.reconnect(); |
|
} |
|
} |
|
/** |
|
* Sets the current transport `socket`. |
|
* |
|
* @param {Function} fn - optional, callback |
|
* @return self |
|
* @public |
|
*/ |
|
open(fn) { |
|
debug("readyState %s", this._readyState); |
|
if (~this._readyState.indexOf("open")) |
|
return this; |
|
debug("opening %s", this.uri); |
|
this.engine = new Engine(this.uri, this.opts); |
|
const socket = this.engine; |
|
const self = this; |
|
this._readyState = "opening"; |
|
this.skipReconnect = false; |
|
// emit `open` |
|
const openSubDestroy = on(socket, "open", function () { |
|
self.onopen(); |
|
fn && fn(); |
|
}); |
|
// emit `error` |
|
const errorSub = on(socket, "error", (err) => { |
|
debug("error"); |
|
self.cleanup(); |
|
self._readyState = "closed"; |
|
this.emitReserved("error", err); |
|
if (fn) { |
|
fn(err); |
|
} |
|
else { |
|
// Only do this if there is no fn to handle the error |
|
self.maybeReconnectOnOpen(); |
|
} |
|
}); |
|
if (false !== this._timeout) { |
|
const timeout = this._timeout; |
|
debug("connect attempt will timeout after %d", timeout); |
|
if (timeout === 0) { |
|
openSubDestroy(); // prevents a race condition with the 'open' event |
|
} |
|
// set timer |
|
const timer = this.setTimeoutFn(() => { |
|
debug("connect attempt timed out after %d", timeout); |
|
openSubDestroy(); |
|
socket.close(); |
|
// @ts-ignore |
|
socket.emit("error", new Error("timeout")); |
|
}, timeout); |
|
if (this.opts.autoUnref) { |
|
timer.unref(); |
|
} |
|
this.subs.push(function subDestroy() { |
|
clearTimeout(timer); |
|
}); |
|
} |
|
this.subs.push(openSubDestroy); |
|
this.subs.push(errorSub); |
|
return this; |
|
} |
|
/** |
|
* Alias for open() |
|
* |
|
* @return self |
|
* @public |
|
*/ |
|
connect(fn) { |
|
return this.open(fn); |
|
} |
|
/** |
|
* Called upon transport open. |
|
* |
|
* @private |
|
*/ |
|
onopen() { |
|
debug("open"); |
|
// clear old subs |
|
this.cleanup(); |
|
// mark as open |
|
this._readyState = "open"; |
|
this.emitReserved("open"); |
|
// add new subs |
|
const socket = this.engine; |
|
this.subs.push(on(socket, "ping", this.onping.bind(this)), on(socket, "data", this.ondata.bind(this)), on(socket, "error", this.onerror.bind(this)), on(socket, "close", this.onclose.bind(this)), on(this.decoder, "decoded", this.ondecoded.bind(this))); |
|
} |
|
/** |
|
* Called upon a ping. |
|
* |
|
* @private |
|
*/ |
|
onping() { |
|
this.emitReserved("ping"); |
|
} |
|
/** |
|
* Called with data. |
|
* |
|
* @private |
|
*/ |
|
ondata(data) { |
|
try { |
|
this.decoder.add(data); |
|
} |
|
catch (e) { |
|
this.onclose("parse error", e); |
|
} |
|
} |
|
/** |
|
* Called when parser fully decodes a packet. |
|
* |
|
* @private |
|
*/ |
|
ondecoded(packet) { |
|
// the nextTick call prevents an exception in a user-provided event listener from triggering a disconnection due to a "parse error" |
|
nextTick(() => { |
|
this.emitReserved("packet", packet); |
|
}, this.setTimeoutFn); |
|
} |
|
/** |
|
* Called upon socket error. |
|
* |
|
* @private |
|
*/ |
|
onerror(err) { |
|
debug("error", err); |
|
this.emitReserved("error", err); |
|
} |
|
/** |
|
* Creates a new socket for the given `nsp`. |
|
* |
|
* @return {Socket} |
|
* @public |
|
*/ |
|
socket(nsp, opts) { |
|
let socket = this.nsps[nsp]; |
|
if (!socket) { |
|
socket = new Socket(this, nsp, opts); |
|
this.nsps[nsp] = socket; |
|
} |
|
return socket; |
|
} |
|
/** |
|
* Called upon a socket close. |
|
* |
|
* @param socket |
|
* @private |
|
*/ |
|
_destroy(socket) { |
|
const nsps = Object.keys(this.nsps); |
|
for (const nsp of nsps) { |
|
const socket = this.nsps[nsp]; |
|
if (socket.active) { |
|
debug("socket %s is still active, skipping close", nsp); |
|
return; |
|
} |
|
} |
|
this._close(); |
|
} |
|
/** |
|
* Writes a packet. |
|
* |
|
* @param packet |
|
* @private |
|
*/ |
|
_packet(packet) { |
|
debug("writing packet %j", packet); |
|
const encodedPackets = this.encoder.encode(packet); |
|
for (let i = 0; i < encodedPackets.length; i++) { |
|
this.engine.write(encodedPackets[i], packet.options); |
|
} |
|
} |
|
/** |
|
* Clean up transport subscriptions and packet buffer. |
|
* |
|
* @private |
|
*/ |
|
cleanup() { |
|
debug("cleanup"); |
|
this.subs.forEach((subDestroy) => subDestroy()); |
|
this.subs.length = 0; |
|
this.decoder.destroy(); |
|
} |
|
/** |
|
* Close the current socket. |
|
* |
|
* @private |
|
*/ |
|
_close() { |
|
debug("disconnect"); |
|
this.skipReconnect = true; |
|
this._reconnecting = false; |
|
this.onclose("forced close"); |
|
if (this.engine) |
|
this.engine.close(); |
|
} |
|
/** |
|
* Alias for close() |
|
* |
|
* @private |
|
*/ |
|
disconnect() { |
|
return this._close(); |
|
} |
|
/** |
|
* Called upon engine close. |
|
* |
|
* @private |
|
*/ |
|
onclose(reason, description) { |
|
debug("closed due to %s", reason); |
|
this.cleanup(); |
|
this.backoff.reset(); |
|
this._readyState = "closed"; |
|
this.emitReserved("close", reason, description); |
|
if (this._reconnection && !this.skipReconnect) { |
|
this.reconnect(); |
|
} |
|
} |
|
/** |
|
* Attempt a reconnection. |
|
* |
|
* @private |
|
*/ |
|
reconnect() { |
|
if (this._reconnecting || this.skipReconnect) |
|
return this; |
|
const self = this; |
|
if (this.backoff.attempts >= this._reconnectionAttempts) { |
|
debug("reconnect failed"); |
|
this.backoff.reset(); |
|
this.emitReserved("reconnect_failed"); |
|
this._reconnecting = false; |
|
} |
|
else { |
|
const delay = this.backoff.duration(); |
|
debug("will wait %dms before reconnect attempt", delay); |
|
this._reconnecting = true; |
|
const timer = this.setTimeoutFn(() => { |
|
if (self.skipReconnect) |
|
return; |
|
debug("attempting reconnect"); |
|
this.emitReserved("reconnect_attempt", self.backoff.attempts); |
|
// check again for the case socket closed in above events |
|
if (self.skipReconnect) |
|
return; |
|
self.open((err) => { |
|
if (err) { |
|
debug("reconnect attempt error"); |
|
self._reconnecting = false; |
|
self.reconnect(); |
|
this.emitReserved("reconnect_error", err); |
|
} |
|
else { |
|
debug("reconnect success"); |
|
self.onreconnect(); |
|
} |
|
}); |
|
}, delay); |
|
if (this.opts.autoUnref) { |
|
timer.unref(); |
|
} |
|
this.subs.push(function subDestroy() { |
|
clearTimeout(timer); |
|
}); |
|
} |
|
} |
|
/** |
|
* Called upon successful reconnect. |
|
* |
|
* @private |
|
*/ |
|
onreconnect() { |
|
const attempt = this.backoff.attempts; |
|
this._reconnecting = false; |
|
this.backoff.reset(); |
|
this.emitReserved("reconnect", attempt); |
|
} |
|
}
|
|
|