diff --git a/gulpfile.js b/gulpfile.js index 9a63a7795..5feebdb58 100644 --- a/gulpfile.js +++ b/gulpfile.js @@ -105,7 +105,8 @@ gulp.task('test', function() 'test/test-parser.js', 'test/test-properties.js', 'test/test-UA-no-WebRTC.js', - 'test/test-digestAuthentication.js' + 'test/test-digestAuthentication.js', + 'test/test-UA-subscriber-notifier.js' ]; return gulp.src(src) diff --git a/lib/Constants.d.ts b/lib/Constants.d.ts index 2d7527838..31297cc62 100644 --- a/lib/Constants.d.ts +++ b/lib/Constants.d.ts @@ -57,7 +57,7 @@ export declare enum DTMF_TRANSPORT { } export const REASON_PHRASE: Record -export const ALLOWED_METHODS = 'INVITE,ACK,CANCEL,BYE,UPDATE,MESSAGE,OPTIONS,REFER,INFO,NOTIFY' +export const ALLOWED_METHODS = 'INVITE,ACK,CANCEL,BYE,UPDATE,MESSAGE,OPTIONS,REFER,INFO,NOTIFY,SUBSCRIBE' export const ACCEPTED_BODY_TYPES = 'application/sdp, application/dtmf-relay' export const MAX_FORWARDS = 69 export const SESSION_EXPIRES = 90 diff --git a/lib/Constants.js b/lib/Constants.js index 5cb3a88a4..bb10550d3 100644 --- a/lib/Constants.js +++ b/lib/Constants.js @@ -149,7 +149,7 @@ module.exports = { 606 : 'Not Acceptable' }, - ALLOWED_METHODS : 'INVITE,ACK,CANCEL,BYE,UPDATE,MESSAGE,OPTIONS,REFER,INFO,NOTIFY', + ALLOWED_METHODS : 'INVITE,ACK,CANCEL,BYE,UPDATE,MESSAGE,OPTIONS,REFER,INFO,NOTIFY,SUBSCRIBE', ACCEPTED_BODY_TYPES : 'application/sdp, application/dtmf-relay', MAX_FORWARDS : 69, SESSION_EXPIRES : 90, diff --git a/lib/Notifier.d.ts b/lib/Notifier.d.ts new file mode 100644 index 000000000..89ec127bc --- /dev/null +++ b/lib/Notifier.d.ts @@ -0,0 +1,24 @@ +import {EventEmitter} from 'events' +import {SUBSCRIBE, NOTIFY} from './Constants' +import * as Utils from './Utils' + +declare enum NotifierTerminatedCode { + NOTIFY_RESPONSE_TIMEOUT = 0, + NOTIFY_TRANSPORT_ERROR = 1, + NOTIFY_NON_OK_RESPONSE = 2, + NOTIFY_FAILED_AUTHENTICATION = 3, + SEND_FINAL_NOTIFY = 4, + RECEIVE_UNSUBSCRIBE = 5, + SUBSCRIPTION_EXPIRED = 6 +} + +export class Notifier extends EventEmitter { + start(): void; + setActiveState(): void; + notify(body?: string): void; + terminate(body?: string, reason?: string): void; + get state(): string; + get id(): string; + static get C(): typeof NotifierTerminatedCode; + get C(): typeof NotifierTerminatedCode; +} diff --git a/lib/Notifier.js b/lib/Notifier.js new file mode 100644 index 000000000..dbae8e66c --- /dev/null +++ b/lib/Notifier.js @@ -0,0 +1,387 @@ +const EventEmitter = require('events').EventEmitter; +const Logger = require('./Logger'); +const JsSIP_C = require('./Constants'); +const Utils = require('./Utils'); +const Dialog = require('./Dialog'); + +const logger = new Logger('Notifier'); + +/** + * Termination codes. + */ +const C = { + // Termination codes. + NOTIFY_RESPONSE_TIMEOUT : 0, + NOTIFY_TRANSPORT_ERROR : 1, + NOTIFY_NON_OK_RESPONSE : 2, + NOTIFY_FAILED_AUTHENTICATION : 3, + SEND_FINAL_NOTIFY : 4, + RECEIVE_UNSUBSCRIBE : 5, + SUBSCRIPTION_EXPIRED : 6, + + // Notifer states + STATE_PENDING : 0, + STATE_ACTIVE : 1, + STATE_TERMINATED : 2 +}; + +/** + * RFC 6665 Notifier implementation. + */ +module.exports = class Notifier extends EventEmitter +{ + /** + * Expose C object. + */ + static get C() + { + return C; + } + + get C() + { + return C; + } + + /** + * @param {UA} ua - JsSIP User Agent instance. + * @param {IncomingRequest} subscribe - Subscribe request. + * @param {string} contentType - Content-Type header value. + * @param {NotifierOptions} options - Optional parameters. + * @param {Array} extraHeaders - Additional SIP headers. + * @param {string} allowEvents - Allow-Events header value. + * @param {boolean} pending - Set initial dialog state as "pending". + */ + constructor(ua, subscribe, contentType, { extraHeaders, allowEvents, pending }) + { + logger.debug('new'); + + super(); + + if (!subscribe) + { + throw new TypeError('subscribe is undefined'); + } + + if (!subscribe.hasHeader('contact')) + { + throw new TypeError('subscribe - no contact header'); + } + + if (!contentType) + { + throw new TypeError('contentType is undefined'); + } + + this._ua = ua; + this._initial_subscribe = subscribe; + this._expires_timestamp = null; + this._expires_timer = null; + + // Notifier state: pending, active, terminated. Not used: init, resp_wait. + this._state = pending ? C.STATE_PENDING : C.STATE_ACTIVE; + + // Custom session empty object for high level use. + this.data = {}; + + this._dialog = null; + + const eventName = subscribe.getHeader('event'); + + this._content_type = contentType; + this._expires = parseInt(subscribe.getHeader('expires')); + this._headers = Utils.cloneArray(extraHeaders); + this._headers.push(`Event: ${eventName}`); + + // Use contact from extraHeaders or create it. + this._contact = this._headers.find((header) => header.startsWith('Contact')); + if (!this._contact) + { + this._contact = `Contact: ${this._ua._contact.toString()}`; + + this._headers.push(this._contact); + } + + if (allowEvents) + { + this._headers.push(`Allow-Events: ${allowEvents}`); + } + + this._target = subscribe.from.uri.user; + subscribe.to_tag = Utils.newTag(); + + // Create dialog for normal and fetch-subscribe. + const dialog = new Dialog(this, subscribe, 'UAS'); + + this._dialog = dialog; + + if (this._expires > 0) + { + // Set expires timer and time-stamp. + this._setExpiresTimer(); + } + } + + /** + * Get dialog state. + */ + get state() + { + return this._state; + } + + /** + * Get dialog id. + */ + get id() + { + return this._dialog ? this._dialog.id : null; + } + + /** + * Dialog callback. + * Called also for initial subscribe. + * Supported RFC 6665 4.4.3: initial fetch subscribe (with expires: 0). + */ + receiveRequest(request) + { + if (request.method !== JsSIP_C.SUBSCRIBE) + { + request.reply(405); + + return; + } + + if (request.hasHeader('expires')) + { + this._expires = parseInt(request.getHeader('expires')); + } + else + { + // RFC 6665 3.1.1, default expires value. + this._expires = 900; + + logger.debug(`missing Expires header field, default value set: ${this._expires}`); + } + request.reply(200, null, [ `Expires: ${this._expires}`, `${this._contact}` ]); + + const body = request.body; + const content_type = request.getHeader('content-type'); + const is_unsubscribe = this._expires === 0; + + if (!is_unsubscribe) + { + this._setExpiresTimer(); + } + + logger.debug('emit "subscribe"'); + this.emit('subscribe', is_unsubscribe, request, body, content_type); + + if (is_unsubscribe) + { + this._terminateDialog(C.RECEIVE_UNSUBSCRIBE); + } + } + + /** + * User API + */ + /** + * Please call after creating the Notifier instance and setting the event handlers. + */ + start() + { + logger.debug('start()'); + + this.receiveRequest(this._initial_subscribe); + } + + /** + * Switch pending dialog state to active. + */ + setActiveState() + { + logger.debug('setActiveState()'); + + if (this._state === C.STATE_PENDING) + { + this._state = C.STATE_ACTIVE; + } + } + + /** + * Send the initial and subsequent notify request. + * @param {string} body - notify request body. + */ + notify(body=null) + { + logger.debug('notify()'); + + if (this._state === C.STATE_TERMINATED) + { + throw new Error('Cannot send notify in terminated state.'); + } + + let expires = Math.floor((this._expires_timestamp - new Date().getTime()) / 1000); + + if (expires < 0) + { + expires = 0; + } + + this._sendNotify([ `;expires=${expires}` ], body); + } + + /** + * Terminate. (Send the final NOTIFY request). + * + * @param {string} body - Notify message body. + * @param {string} reason - Set Subscription-State reason parameter. + * @param {number} retryAfter - Set Subscription-State retry-after parameter. + */ + terminate(body = null, reason = null, retryAfter = null) + { + logger.debug('terminate()'); + + this._state = C.STATE_TERMINATED; + + const subsStateParameters = []; + + if (reason) + { + subsStateParameters.push(`;reason=${reason}`); + } + + if (retryAfter !== null) + { + subsStateParameters.push(`;retry-after=${retryAfter}`); + } + + this._sendNotify(subsStateParameters, body); + + this._terminateDialog(reason === 'timeout' ? C.SUBSCRIPTION_EXPIRED : C.SEND_FINAL_NOTIFY); + } + + /** + * Private API + */ + /** + * @param {Array} subsStateParams subscription state parameters. + * @param {String} body Notify body + * @param {Array} extraHeaders + */ + _sendNotify(subsStateParameters, body=null, extraHeaders=null) + { + // Prevent send notify after final notify. + if (!this._dialog) + { + logger.warn('final notify has sent'); + + return; + } + + // Build Subscription-State header with parameters. + let subsState = `Subscription-State: ${this._stateToString()}`; + + for (const param of subsStateParameters) + { + subsState += param; + } + + let headers = this._headers.slice(); + + headers.push(subsState); + + if (extraHeaders) + { + headers = headers.concat(extraHeaders); + } + + if (body) + { + headers.push(`Content-Type: ${this._content_type}`); + } + + this._dialog.sendRequest(JsSIP_C.NOTIFY, { + body, + extraHeaders : headers, + eventHandlers : { + onRequestTimeout : () => + { + this._terminateDialog(C.NOTIFY_RESPONSE_TIMEOUT); + }, + onTransportError : () => + { + this._terminateDialog(C.NOTIFY_TRANSPORT_ERROR); + }, + onErrorResponse : (response) => + { + if (response.status_code === 401 || response.status_code === 407) + { + this._terminateDialog(C.NOTIFY_FAILED_AUTHENTICATION); + } + else + { + this._terminateDialog(C.NOTIFY_NON_OK_RESPONSE); + } + }, + onDialogError : () => + { + this._terminateDialog(C.NOTIFY_NON_OK_RESPONSE); + } + } + }); + } + + _terminateDialog(termination_code) + { + if (!this._dialog) + { + return; + } + + this._state = C.STATE_TERMINATED; + clearTimeout(this._expires_timer); + + if (this._dialog) + { + this._dialog.terminate(); + this._dialog = null; + } + + // For SUBSCRIPTION_EXPIRED the 'terminated' event was fired in expiration timer + if (termination_code !== C.SUBSCRIPTION_EXPIRED) + { + logger.debug(`emit "terminated" code=${termination_code}, send final notify=false`); + this.emit('terminated', termination_code, false); + } + } + + _setExpiresTimer() + { + this._expires_timestamp = new Date().getTime() + (this._expires * 1000); + + clearTimeout(this._expires_timer); + this._expires_timer = setTimeout(() => + { + if (!this._dialog) + { + return; + } + + logger.debug(`emit "terminated" code=${C.SUBSCRIPTION_EXPIRED}, send final notify=true`); + this.emit('terminated', C.SUBSCRIPTION_EXPIRED, true); + }, this._expires * 1000); + } + + _stateToString() + { + switch (this._state) + { + case C.STATE_PENDING: return 'pending'; + case C.STATE_ACTIVE: return 'active'; + case C.STATE_TERMINATED: return 'terminated'; + default: throw new TypeError('wrong state value'); + } + } +}; diff --git a/lib/Subscriber.d.ts b/lib/Subscriber.d.ts new file mode 100644 index 000000000..eb87b0948 --- /dev/null +++ b/lib/Subscriber.d.ts @@ -0,0 +1,23 @@ +import {EventEmitter} from 'events' +import {SUBSCRIBE, NOTIFY} from './Constants' +import * as Utils from './Utils' + +declare enum SubscriberTerminatedCode { + SUBSCRIBE_RESPONSE_TIMEOUT = 0, + SUBSCRIBE_TRANSPORT_ERROR = 1, + SUBSCRIBE_NON_OK_RESPONSE = 2, + SUBSCRIBE_BAD_OK_RESPONSE = 3, + SUBSCRIBE_FAILED_AUTHENTICATION = 4, + UNSUBSCRIBE_TIMEOUT = 5, + RECEIVE_FINAL_NOTIFY = 6, + RECEIVE_BAD_NOTIFY = 7 +} + +export class Subscriber extends EventEmitter { + subscribe(body?: string): void; + terminate(body?: string): void; + get state(): string; + get id(): string; + static get C(): typeof SubscriberTerminatedCode; + get C(): typeof SubscriberTerminatedCode; +} diff --git a/lib/Subscriber.js b/lib/Subscriber.js new file mode 100644 index 000000000..31cfc01ac --- /dev/null +++ b/lib/Subscriber.js @@ -0,0 +1,605 @@ +const EventEmitter = require('events').EventEmitter; +const Logger = require('./Logger'); +const JsSIP_C = require('./Constants'); +const Utils = require('./Utils'); +const Grammar = require('./Grammar'); +const SIPMessage = require('./SIPMessage'); +const RequestSender = require('./RequestSender'); +const Dialog = require('./Dialog'); + +const logger = new Logger('Subscriber'); + +/** + * Termination codes. + */ +const C = { + // Termination codes. + SUBSCRIBE_RESPONSE_TIMEOUT : 0, + SUBSCRIBE_TRANSPORT_ERROR : 1, + SUBSCRIBE_NON_OK_RESPONSE : 2, + SUBSCRIBE_BAD_OK_RESPONSE : 3, + SUBSCRIBE_FAILED_AUTHENTICATION : 4, + UNSUBSCRIBE_TIMEOUT : 5, + RECEIVE_FINAL_NOTIFY : 6, + RECEIVE_BAD_NOTIFY : 7, + + // Subscriber states. + STATE_PENDING : 0, + STATE_ACTIVE : 1, + STATE_TERMINATED : 2, + STATE_INIT : 3, + STATE_NOTIFY_WAIT : 4 +}; + +/** + * RFC 6665 Subscriber implementation. + */ +module.exports = class Subscriber extends EventEmitter +{ + /** + * Expose C object. + */ + static get C() + { + return C; + } + + get C() + { + return C; + } + + /** + * @param {UA} ua - reference to JsSIP.UA + * @param {string} target + * @param {string} eventName - Event header value. May end with optional ;id=xxx + * @param {string} accept - Accept header value. + * + * @param {SubscriberOption} options - optional parameters. + * @param {number} expires - Expires header value. Default is 900. + * @param {string} contentType - Content-Type header value. Used for SUBSCRIBE with body + * @param {string} allowEvents - Allow-Events header value. + * @param {RequestParams} params - Will have priority over ua.configuration. + * If set please define: to_uri, to_display_name, from_uri, from_display_name + * @param {Array} extraHeaders - Additional SIP headers. + */ + constructor(ua, target, eventName, accept, { expires, contentType, + allowEvents, params, extraHeaders }) + { + logger.debug('new'); + + super(); + + // Check that arguments are defined + if (!target) + { + throw new TypeError('target is undefined'); + } + + if (!eventName) + { + throw new TypeError('eventName is undefined'); + } + + if (!accept) + { + throw new TypeError('accept is undefined'); + } + + this._ua = ua; + this._target = target; + + if (expires !== 0 && !expires) + { + expires = 900; + } + + this._expires = expires; + + // Used to subscribe with body. + this._content_type = contentType; + + // Set initial subscribe parameters. + this._params = Utils.cloneObject(params); + + if (!this._params.from_uri) + { + this._params.from_uri = this._ua.configuration.uri; + } + + this._params.from_tag = Utils.newTag(); + this._params.to_tag = null; + this._params.call_id = Utils.createRandomToken(20); + + // Create subscribe cseq if not defined custom cseq. + if (this._params.cseq === undefined) + { + this._params.cseq = Math.floor((Math.random() * 10000) + 1); + } + + // Subscriber state. + this._state = C.STATE_INIT; + + // Dialog + this._dialog = null; + + // To refresh subscription. + this._expires_timer = null; + this._expires_timestamp = null; + + // To prevent duplicate terminated call. + this._terminated = false; + + // After send un-subscribe wait final notify limited time. + this._unsubscribe_timeout_timer = null; + + // Custom session empty object for high level use. + this.data = {}; + + const parsed = Grammar.parse(eventName, 'Event'); + + if (parsed === -1) + { + throw new TypeError('eventName - wrong format'); + } + + this._event_name = parsed.event; + this._event_id = parsed.params && parsed.params.id; + + let eventValue = this._event_name; + + if (this._event_id) + { + eventValue += `;id=${this._event_id}`; + } + + this._headers = Utils.cloneArray(extraHeaders); + this._headers = this._headers.concat([ + `Event: ${eventValue}`, + `Expires: ${this._expires}`, + `Accept: ${accept}` + ]); + + if (!this._headers.find((header) => header.startsWith('Contact'))) + { + + const contact = `Contact: ${this._ua._contact.toString()}`; + + this._headers.push(contact); + } + + if (allowEvents) + { + this._headers.push(`Allow-Events: ${allowEvents}`); + } + + // To enqueue subscribes created before receive initial subscribe OK. + this._queue = []; + } + + onRequestTimeout() + { + this._dialogTerminated(C.SUBSCRIBE_RESPONSE_TIMEOUT); + } + + onTransportError() + { + this._dialogTerminated(C.SUBSCRIBE_TRANSPORT_ERROR); + } + + /** + * Dialog callback. + */ + receiveRequest(request) + { + if (request.method !== JsSIP_C.NOTIFY) + { + logger.warn('received non-NOTIFY request'); + request.reply(405); + + return; + } + + // RFC 6665 8.2.1. Check if event header matches. + const event_header = request.parseHeader('Event'); + + if (!event_header) + { + logger.warn('missed Event header'); + request.reply(400); + this._dialogTerminated(C.RECEIVE_BAD_NOTIFY); + + return; + } + + const event_name = event_header.event; + const event_id = event_header.params && event_header.params.id; + + if (event_name !== this._event_name || event_id !== this._event_id) + { + logger.warn('Event header does not match SUBSCRIBE'); + request.reply(489); + this._dialogTerminated(C.RECEIVE_BAD_NOTIFY); + + return; + } + + // Process Subscription-State header. + const subs_state = request.parseHeader('subscription-state'); + + if (!subs_state) + { + logger.warn('missed Subscription-State header'); + request.reply(400); + this._dialogTerminated(C.RECEIVE_BAD_NOTIFY); + + return; + } + + request.reply(200); + + const new_state = this._stateStringToNumber(subs_state.state); + const prev_state = this._state; + + if (prev_state !== C.STATE_TERMINATED && new_state !== C.STATE_TERMINATED) + { + this._state = new_state; + + if (subs_state.expires !== undefined) + { + const expires = subs_state.expires; + const expires_timestamp = new Date().getTime() + (expires * 1000); + const max_time_deviation = 2000; + + // Expiration time is shorter and the difference is not too small. + if (this._expires_timestamp - expires_timestamp > max_time_deviation) + { + logger.debug('update sending re-SUBSCRIBE time'); + + this._scheduleSubscribe(expires); + } + } + } + + if (prev_state !== C.STATE_PENDING && new_state === C.STATE_PENDING) + { + logger.debug('emit "pending"'); + this.emit('pending'); + } + else if (prev_state !== C.STATE_ACTIVE && new_state === C.STATE_ACTIVE) + { + logger.debug('emit "active"'); + this.emit('active'); + } + + const body = request.body; + + // Check if the notify is final. + const is_final = new_state === C.STATE_TERMINATED; + + // Notify event fired only for notify with body. + if (body) + { + const content_type = request.getHeader('content-type'); + + logger.debug('emit "notify"'); + this.emit('notify', is_final, request, body, content_type); + } + + if (is_final) + { + const reason = subs_state.reason; + let retry_after = undefined; + + if (subs_state.params && subs_state.params['retry-after'] !== undefined) + { + retry_after = parseInt(subs_state.params['retry-after']); + } + + this._dialogTerminated(C.RECEIVE_FINAL_NOTIFY, reason, retry_after); + } + } + + /** + * User API + */ + + /** + * Send the initial (non-fetch) and subsequent subscribe. + * @param {string} body - subscribe request body. + */ + subscribe(body = null) + { + logger.debug('subscribe()'); + + if (this._state === C.STATE_INIT) + { + this._sendInitialSubscribe(body, this._headers); + } + else + { + this._sendSubsequentSubscribe(body, this._headers); + } + } + + /** + * terminate. + * Send un-subscribe or fetch-subscribe (with Expires: 0). + * @param {string} body - un-subscribe request body + */ + terminate(body = null) + { + logger.debug('terminate()'); + + // Prevent duplication un-subscribe sending. + if (this._terminated) + { + + return; + } + this._terminated = true; + + // Set header Expires: 0. + const headers = this._headers.map((header) => + { + return header.startsWith('Expires') ? 'Expires: 0' : header; + }); + + if (this._state === C.STATE_INIT) + { + // fetch-subscribe - initial subscribe with Expires: 0. + this._sendInitialSubscribe(body, headers); + } + else + { + this._sendSubsequentSubscribe(body, headers); + } + + // Waiting for the final notify for a while. + const final_notify_timeout = 30000; + + this._unsubscribe_timeout_timer = setTimeout(() => + { + this._dialogTerminated(C.UNSUBSCRIBE_TIMEOUT); + }, final_notify_timeout); + } + + /** + * Get dialog state. + */ + get state() + { + return this._state; + } + + /** + * Get dialog id. + */ + get id() + { + return this._dialog ? this._dialog.id : null; + } + + /** + * Private API. + */ + _sendInitialSubscribe(body, headers) + { + if (body) + { + if (!this._content_type) + { + throw new TypeError('content_type is undefined'); + } + + headers = headers.slice(); + headers.push(`Content-Type: ${this._content_type}`); + } + + this._state = C.STATE_NOTIFY_WAIT; + + const request = new SIPMessage.OutgoingRequest(JsSIP_C.SUBSCRIBE, + this._ua.normalizeTarget(this._target), this._ua, this._params, headers, body); + + const request_sender = new RequestSender(this._ua, request, { + onRequestTimeout : () => + { + this.onRequestTimeout(); + }, + onTransportError : () => + { + this.onTransportError(); + }, + onReceiveResponse : (response) => + { + this._receiveSubscribeResponse(response); + } + }); + + request_sender.send(); + } + + _receiveSubscribeResponse(response) + { + if (response.status_code >= 200 && response.status_code < 300) + { + // Create dialog + if (this._dialog === null) + { + const dialog = new Dialog(this, response, 'UAC'); + + if (dialog.error) + { + // OK response without Contact + logger.warn(dialog.error); + this._dialogTerminated(C.SUBSCRIBE_BAD_OK_RESPONSE); + + return; + } + + this._dialog = dialog; + + logger.debug('emit "accepted"'); + this.emit('accepted'); + + // Subsequent subscribes saved in the queue until dialog created. + for (const subscribe of this._queue) + { + logger.debug('dequeue subscribe'); + + this._sendSubsequentSubscribe(subscribe.body, subscribe.headers); + } + } + + // Check expires value. + let expires_value = response.getHeader('expires'); + + if (expires_value !== 0 && !expires_value) + { + logger.warn('response without Expires header'); + + // RFC 6665 3.1.1 subscribe OK response must contain Expires header. + // Use workaround expires value. + expires_value = '900'; + } + + const expires = parseInt(expires_value); + + if (expires > 0) + { + this._scheduleSubscribe(expires); + } + } + else if (response.status_code === 401 || response.status_code === 407) + { + this._dialogTerminated(C.SUBSCRIBE_FAILED_AUTHENTICATION); + } + else if (response.status_code >= 300) + { + this._dialogTerminated(C.SUBSCRIBE_NON_OK_RESPONSE); + } + } + + _sendSubsequentSubscribe(body, headers) + { + if (this._state === C.STATE_TERMINATED) + { + return; + } + + if (!this._dialog) + { + logger.debug('enqueue subscribe'); + + this._queue.push({ body, headers: headers.slice() }); + + return; + } + + if (body) + { + if (!this._content_type) + { + throw new TypeError('content_type is undefined'); + } + + headers = headers.slice(); + headers.push(`Content-Type: ${this._content_type}`); + } + + this._dialog.sendRequest(JsSIP_C.SUBSCRIBE, { + body, + extraHeaders : headers, + eventHandlers : { + onRequestTimeout : () => + { + this.onRequestTimeout(); + }, + onTransportError : () => + { + this.onTransportError(); + }, + onSuccessResponse : (response) => + { + this._receiveSubscribeResponse(response); + }, + onErrorResponse : (response) => + { + this._receiveSubscribeResponse(response); + }, + onDialogError : (response) => + { + this._receiveSubscribeResponse(response); + } + } + }); + } + + _dialogTerminated(terminationCode, reason = undefined, retryAfter = undefined) + { + // To prevent duplicate emit terminated event. + if (this._state === C.STATE_TERMINATED) + { + return; + } + + this._state = C.STATE_TERMINATED; + + // Clear timers. + clearTimeout(this._expires_timer); + clearTimeout(this._unsubscribe_timeout_timer); + + if (this._dialog) + { + this._dialog.terminate(); + this._dialog = null; + } + + logger.debug(`emit "terminated" code=${terminationCode}`); + this.emit('terminated', terminationCode, reason, retryAfter); + } + + _scheduleSubscribe(expires) + { + /* + If the expires time is less than 140 seconds we do not support Chrome intensive timer throttling mode. + In this case, the re-subcribe is sent 5 seconds before the subscription expiration. + + When Chrome is in intensive timer throttling mode, in the worst case, + the timer will be 60 seconds late. + We give the server 10 seconds to make sure it will execute the command even if it is heavily loaded. + As a result, we order the time no later than 70 seconds before the subscription expiration. + Resulting time calculated as half time interval + (half interval - 70) * random. + + E.g. expires is 140, re-subscribe will be ordered to send in 70 seconds. + expires is 600, re-subscribe will be ordered to send in 300 + (0 .. 230) seconds. + */ + + const timeout = expires >= 140 ? (expires * 1000 / 2) + + Math.floor(((expires / 2) - 70) * 1000 * Math.random()) : (expires * 1000) - 5000; + + this._expires_timestamp = new Date().getTime() + (expires * 1000); + + logger.debug(`next SUBSCRIBE will be sent in ${Math.floor(timeout / 1000)} sec`); + + clearTimeout(this._expires_timer); + this._expires_timer = setTimeout(() => + { + this._expires_timer = null; + this._sendSubsequentSubscribe(null, this._headers); + }, timeout); + } + + _stateStringToNumber(strState) + { + switch (strState) + { + case 'pending': return C.STATE_PENDING; + case 'active': return C.STATE_ACTIVE; + case 'terminated': return C.STATE_TERMINATED; + case 'init': return C.STATE_INIT; + case 'notify_wait': return C.STATE_NOTIFY_WAIT; + default: throw new TypeError('wrong state value'); + } + } +}; diff --git a/lib/UA.d.ts b/lib/UA.d.ts index 6ee4a7881..0fff768d4 100644 --- a/lib/UA.d.ts +++ b/lib/UA.d.ts @@ -5,6 +5,8 @@ import {AnswerOptions, Originator, RTCSession, RTCSessionEventMap, TerminateOpti import {IncomingRequest, IncomingResponse, OutgoingRequest} from './SIPMessage' import {Message, SendMessageOptions} from './Message' import {Registrator} from './Registrator' +import {Notifier} from './Notifier' +import {Subscriber} from './Subscriber' import {URI} from './URI' import {causes} from './Constants' @@ -125,6 +127,7 @@ export type IncomingOptionsListener = (event: IncomingOptionsEvent) => void; export type OutgoingOptionsListener = (event: OutgoingOptionsEvent) => void; export type OptionsListener = IncomingOptionsListener | OutgoingOptionsListener; export type SipEventListener = (event: { event: T; request: IncomingRequest; }) => void +export type SipSubscribeListener = (event: { event: T; request: IncomingRequest; }) => void export interface UAEventMap { connecting: ConnectingListener; @@ -137,6 +140,7 @@ export interface UAEventMap { newRTCSession: RTCSessionListener; newMessage: MessageListener; sipEvent: SipEventListener; + newSubscribe: SipSubscribeListener; newOptions: OptionsListener; } @@ -153,6 +157,38 @@ export interface UAContact { toString(options?: UAContactOptions): string } +export interface RequestParams { + from_uri: URI; + from_display_name?: string; + from_tag: string; + to_uri: URI; + to_display_name?: string; + to_tag?: string; + call_id: string; + cseq: number; +} + +export interface SubscriberParams { + from_uri: URI; + from_display_name?: string; + to_uri: URI; + to_display_name?: string; +} + +export interface SubscriberOptions { + expires?: number; + contentType?: string; + allowEvents?: string; + params?: SubscriberParams; + extraHeaders?: string[]; +} + +export interface NotifierOptions { + allowEvents?: string; + extraHeaders?: string[]; + pending?: boolean; +} + declare enum UAStatus { // UA status codes. STATUS_INIT = 0, @@ -189,6 +225,10 @@ export class UA extends EventEmitter { sendMessage(target: string | URI, body: string, options?: SendMessageOptions): Message; + subscribe(target: string, eventName: string, accept: string, options?: SubscriberOptions): Subscriber; + + notify( subscribe: IncomingRequest, contentType: string, options?: NotifierOptions): Notifier; + terminateSessions(options?: TerminateOptions): void; isRegistered(): boolean; diff --git a/lib/UA.js b/lib/UA.js index c352408a3..a57709fa1 100644 --- a/lib/UA.js +++ b/lib/UA.js @@ -3,6 +3,8 @@ const Logger = require('./Logger'); const JsSIP_C = require('./Constants'); const Registrator = require('./Registrator'); const RTCSession = require('./RTCSession'); +const Subscriber = require('./Subscriber'); +const Notifier = require('./Notifier'); const Message = require('./Message'); const Options = require('./Options'); const Transactions = require('./Transactions'); @@ -262,6 +264,26 @@ module.exports = class UA extends EventEmitter return message; } + /** + * Create subscriber instance + */ + subscribe(target, eventName, accept, options) + { + logger.debug('subscribe()'); + + return new Subscriber(this, target, eventName, accept, options); + } + + /** + * Create notifier instance + */ + notify(subscribe, contentType, options) + { + logger.debug('notify()'); + + return new Notifier(this, subscribe, contentType, options); + } + /** * Send a SIP OPTIONS. * @@ -645,6 +667,15 @@ module.exports = class UA extends EventEmitter message.init_incoming(request); } + else if (method === JsSIP_C.SUBSCRIBE) + { + if (this.listeners('newSubscribe').length === 0) + { + request.reply(405); + + return; + } + } else if (method === JsSIP_C.INVITE) { // Initial INVITE. @@ -731,6 +762,12 @@ module.exports = class UA extends EventEmitter }); request.reply(200); break; + case JsSIP_C.SUBSCRIBE: + this.emit('newSubscribe', { + event : request.event, + request + }); + break; default: request.reply(405); break; diff --git a/test/include/loopSocket.js b/test/include/loopSocket.js new file mode 100644 index 000000000..9a92d9a21 --- /dev/null +++ b/test/include/loopSocket.js @@ -0,0 +1,48 @@ +// LoopSocket send message itself. +// Used P2P logic: message call-id is modified in each leg. +module.exports = class LoopSocket +{ + constructor() + { + this.url = 'ws://localhost:12345'; + this.via_transport = 'WS'; + this.sip_uri = 'sip:localhost:12345;transport=ws'; + } + + connect() + { + setTimeout(() => { this.onconnect(); }, 0); + } + + disconnect() + { + } + + send(message) + { + const message2 = this._modifyCallId(message); + + setTimeout(() => { this.ondata(message2); }, 0); + + return true; + } + + // Call-ID: add or drop word '_second'. + _modifyCallId(message) + { + const ixBegin = message.indexOf('Call-ID'); + const ixEnd = message.indexOf('\r', ixBegin); + let callId = message.substring(ixBegin+9, ixEnd); + + if (callId.endsWith('_second')) + { + callId = callId.substring(0, callId.length - 7); + } + else + { + callId += '_second'; + } + + return `${message.substring(0, ixBegin)}Call-ID: ${callId}${message.substring(ixEnd)}`; + } +}; diff --git a/test/test-UA-subscriber-notifier.js b/test/test-UA-subscriber-notifier.js new file mode 100644 index 000000000..28621fcf9 --- /dev/null +++ b/test/test-UA-subscriber-notifier.js @@ -0,0 +1,177 @@ +require('./include/common'); +const JsSIP = require('../'); +const LoopSocket = require('./include/LoopSocket'); + +module.exports = { + 'subscriber/notifier communication' : function(test) + { + test.expect(39); + + let eventSequence = 0; + + const TARGET = 'ikq'; + const REQUEST_URI = 'sip:ikq@example.com'; + const CONTACT_URI = 'sip:ikq@abcdefabcdef.invalid;transport=ws'; + const SUBSCRIBE_ACCEPT = 'application/text, text/plain'; + const EVENT_NAME = 'weather'; + const CONTENT_TYPE = 'text/plain'; + const WEATHER_REQUEST = 'Please report the weather condition'; + const WEATHER_REPORT = '+20..+24°C, no precipitation, light wind'; + + function createSubscriber(ua) + { + const options = { + expires : 3600, + contentType : CONTENT_TYPE, + params : null + }; + + const subscriber = ua.subscribe(TARGET, EVENT_NAME, SUBSCRIBE_ACCEPT, options); + + subscriber.on('active', () => + { + test.ok(++eventSequence === 6, 'receive notify with subscription-state: active'); + }); + + subscriber.on('notify', (isFinal, notify, body, contType) => + { + eventSequence++; + test.ok(eventSequence === 7 || eventSequence === 11, 'receive notify'); + + test.strictEqual(notify.method, 'NOTIFY'); + test.strictEqual(notify.getHeader('contact'), `<${CONTACT_URI}>`, 'notify contact'); + test.strictEqual(body, WEATHER_REPORT, 'notify body'); + test.strictEqual(contType, CONTENT_TYPE, 'notify content-type'); + + const subsState = notify.parseHeader('subscription-state').state; + + test.ok(subsState === 'pending' || subsState === 'active' || subsState === 'terminated', 'notify subscription-state'); + + // After receiving the first notify, send un-subscribe. + if (eventSequence === 7) + { + test.ok(++eventSequence === 8, 'send un-subscribe'); + + subscriber.terminate(WEATHER_REQUEST); + } + }); + + subscriber.on('terminated', (terminationCode, reason, retryAfter) => + { + test.ok(++eventSequence === 12, 'subscriber terminated'); + test.ok(terminationCode === subscriber.C.RECEIVE_FINAL_NOTIFY); + test.ok(reason === undefined); + test.ok(retryAfter === undefined); + + ua.stop(); + + test.done(); + }); + + subscriber.on('accepted', () => + { + test.ok(++eventSequence === 5, 'initial subscribe accepted'); + }); + + test.ok(++eventSequence === 2, 'send subscribe'); + + subscriber.subscribe(WEATHER_REQUEST); + } + + function createNotifier(ua, subscribe) + { + const notifier = ua.notify(subscribe, CONTENT_TYPE, { pending: false }); + + // Receive subscribe (includes initial) + notifier.on('subscribe', (isUnsubscribe, subs, body, contType) => + { + test.strictEqual(subscribe.method, 'SUBSCRIBE'); + test.strictEqual(subscribe.getHeader('contact'), `<${CONTACT_URI}>`, 'subscribe contact'); + test.strictEqual(subscribe.getHeader('accept'), SUBSCRIBE_ACCEPT, 'subscribe accept'); + test.strictEqual(body, WEATHER_REQUEST, 'subscribe body'); + test.strictEqual(contType, CONTENT_TYPE, 'subscribe content-type'); + + if (isUnsubscribe) + { + test.ok(++eventSequence === 9, 'receive un-subscribe, send final notify'); + + notifier.terminate(WEATHER_REPORT); + } + else + { + test.ok(++eventSequence === 4, 'receive subscribe, send notify'); + + notifier.notify(WEATHER_REPORT); + } + }); + + notifier.on('terminated', (terminationCode, sendFinalNotify) => + { + test.ok(++eventSequence === 10, 'notifier terminated'); + test.ok(!sendFinalNotify, 'final notify sending if subscription expired'); + + if (sendFinalNotify) + { + notifier.terminate(WEATHER_REPORT, 'timeout'); + } + }); + + notifier.start(); + } + + // Start JsSIP UA with loop socket. + const config = + { + sockets : new LoopSocket(), // message sending itself, with modified Call-ID + uri : REQUEST_URI, + contact_uri : CONTACT_URI, + register : false + }; + + const ua = new JsSIP.UA(config); + + // Uncomment to see SIP communication + // JsSIP.debug.enable('JsSIP:*'); + + ua.on('newSubscribe', (e) => + { + test.ok(++eventSequence === 3, 'receive initial subscribe'); + + const subs = e.request; + const ev = subs.parseHeader('event'); + + test.strictEqual(subs.ruri.toString(), REQUEST_URI, 'initial subscribe uri'); + test.strictEqual(ev.event, EVENT_NAME, 'subscribe event'); + + if (ev.event !== EVENT_NAME) + { + subs.reply(489); // "Bad Event" + + return; + } + + const accepts = subs.getHeaders('accept'); + const canUse = accepts && accepts.some((v) => v.includes(CONTENT_TYPE)); + + test.ok(canUse, 'notifier can use subscribe accept header'); + + if (!canUse) + { + subs.reply(406); // "Not Acceptable" + + return; + } + + createNotifier(ua, subs); + }); + + ua.on('connected', () => + { + test.ok(++eventSequence === 1, 'socket connected'); + + createSubscriber(ua); + }); + + ua.start(); + } +};