src/emitter.js
'use strict';
const EventEmitter = require('events');
/**
* Extended Emitter implementation
*/
class Emitter extends EventEmitter {
/**
* @param {*} args
*/
constructor(...args) {
super(...args);
this._blockingListeners = {};
this._processing = {};
this._parallel = {};
}
/**
* @param {string} event
* @param {number} count
*
* @returns {Emitter}
*/
maxParallel(event, count) {
this._parallel[event] = count;
return this;
}
/**
* @param {string} event
* @param {*} args
*
* @returns {Promise}
*/
emitBlocking(event, ...args) {
return this._waitAllowParallel(event)
.then(() => this._dispatch(event, args))
.then(() => {
this
._cleanupListeners(event)
._removeParallel(event)
.emit(event, ...args);
return Promise.resolve();
})
.catch(error => {
this
._cleanupListeners(event)
._removeParallel(event);
return Promise.reject(error);
});
}
/**
* @param {string} event
* @param {function} listener
* @param {number} priority
*
* @returns {Emitter}
*/
onBlocking(event, listener, priority = Emitter.DEFAULT_PRIORITY) {
return this._pushListener(event, listener, priority, 'on');
}
/**
* @param {string} event
* @param {function} listener
* @param {number} priority
*
* @returns {Emitter}
*/
onceBlocking(event, listener, priority = Emitter.DEFAULT_PRIORITY) {
return this._pushListener(event, listener, priority, 'once');
}
/**
* @param {string} event
*
* @returns {Emitter}
*
* @private
*/
_removeParallel(event) {
if (!this._processing.hasOwnProperty(event)) {
return this;
}
this._processing[event]--;
return this;
}
/**
* @param {string} event
*
* @returns {Emitter}
*
* @private
*/
_addParallel(event) {
this._processing[event] = this._processing[event] || 0;
this._processing[event]++;
return this;
}
/**
* @param {string} event
*
* @returns {boolean}
*
* @private
*/
_allowParallel(event) {
return !this._parallel.hasOwnProperty(event)
|| !this._processing.hasOwnProperty(event)
|| this._processing[event] < this._parallel[event];
}
/**
* @param {string} event
* @param {number} interval
*
* @returns {Promise}
*
* @private
*/
_waitAllowParallel(event, interval = 100) {
if (this._allowParallel(event)) {
this._addParallel(event);
return Promise.resolve();
}
return new Promise(resolve => {
const id = setInterval(() => {
if (this._allowParallel(event)) {
this._addParallel(event);
clearInterval(id);
process.nextTick(() => resolve());
}
}, interval);
});
}
/**
* @param {string} event
* @param {function} listener
* @param {number} priority
* @param {string} method
*
* @returns {Emitter}
*
* @private
*/
_pushListener(event, listener, priority, method) {
this._blockingListeners[event] = this._blockingListeners[event] || [];
this._blockingListeners[event].push({ listener, priority, method });
this._blockingListeners[event].sort((a, b) => {
return b.priority - a.priority;
});
return this;
}
/**
* @param {string} event
*
* @returns {Emitter}
*
* @private
*/
_cleanupListeners(event) {
if (!this._blockingListeners.hasOwnProperty(event) ) {
return this;
}
this._blockingListeners[event] = this._blockingListeners[event].filter(l => !!l);
return this;
}
/**
* @param {string} event
* @param {*} args
* @param {number} _i
*
* @returns {Promise}
*
* @private
*/
_dispatch(event, args, _i = 0) {
if (!this._blockingListeners.hasOwnProperty(event)
|| this._blockingListeners[event].length < _i + 1) {
return Promise.resolve();
}
const dispatcher = this._blockingListeners[event][_i];
const result = dispatcher.listener(...args);
if (dispatcher.method === 'once') {
delete this._blockingListeners[event][_i];
}
if (!result || typeof result !== 'object' || !(result instanceof Promise)) {
return this._dispatch(event, args, _i + 1);
}
return result.then(() => this._dispatch(event, args, _i + 1));
}
/**
* @returns {number}
*/
static get DEFAULT_PRIORITY() {
return 0;
}
}
module.exports = Emitter;