stomp-handler.ts 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. import { BYTE } from './byte.js';
  2. import { Client } from './client.js';
  3. import { FrameImpl } from './frame-impl.js';
  4. import { IMessage } from './i-message.js';
  5. import { ITransaction } from './i-transaction.js';
  6. import { Parser } from './parser.js';
  7. import { StompHeaders } from './stomp-headers.js';
  8. import { StompSubscription } from './stomp-subscription.js';
  9. import {
  10. closeEventCallbackType,
  11. debugFnType,
  12. frameCallbackType,
  13. IPublishParams,
  14. IStompSocket,
  15. IStompSocketMessageEvent,
  16. IStomptHandlerConfig,
  17. messageCallbackType,
  18. StompSocketState,
  19. wsErrorCallbackType,
  20. } from './types.js';
  21. import { Versions } from './versions.js';
  22. import { augmentWebsocket } from './augment-websocket.js';
  23. /**
  24. * The STOMP protocol handler
  25. *
  26. * Part of `@stomp/stompjs`.
  27. *
  28. * @internal
  29. */
  30. export class StompHandler {
  31. public debug: debugFnType;
  32. public stompVersions: Versions;
  33. public connectHeaders: StompHeaders;
  34. public disconnectHeaders: StompHeaders;
  35. public heartbeatIncoming: number;
  36. public heartbeatOutgoing: number;
  37. public onUnhandledMessage: messageCallbackType;
  38. public onUnhandledReceipt: frameCallbackType;
  39. public onUnhandledFrame: frameCallbackType;
  40. public onConnect: frameCallbackType;
  41. public onDisconnect: frameCallbackType;
  42. public onStompError: frameCallbackType;
  43. public onWebSocketClose: closeEventCallbackType;
  44. public onWebSocketError: wsErrorCallbackType;
  45. public logRawCommunication: boolean;
  46. public splitLargeFrames: boolean;
  47. public maxWebSocketChunkSize: number;
  48. public forceBinaryWSFrames: boolean;
  49. public appendMissingNULLonIncoming: boolean;
  50. public discardWebsocketOnCommFailure: boolean;
  51. get connectedVersion(): string | undefined {
  52. return this._connectedVersion;
  53. }
  54. private _connectedVersion: string | undefined;
  55. get connected(): boolean {
  56. return this._connected;
  57. }
  58. private _connected: boolean = false;
  59. private readonly _subscriptions: { [key: string]: messageCallbackType };
  60. private readonly _receiptWatchers: { [key: string]: frameCallbackType };
  61. private _partialData: string;
  62. private _escapeHeaderValues: boolean;
  63. private _counter: number;
  64. private _pinger: any;
  65. private _ponger: any;
  66. private _lastServerActivityTS: number;
  67. constructor(
  68. private _client: Client,
  69. public _webSocket: IStompSocket,
  70. config: IStomptHandlerConfig
  71. ) {
  72. // used to index subscribers
  73. this._counter = 0;
  74. // subscription callbacks indexed by subscriber's ID
  75. this._subscriptions = {};
  76. // receipt-watchers indexed by receipts-ids
  77. this._receiptWatchers = {};
  78. this._partialData = '';
  79. this._escapeHeaderValues = false;
  80. this._lastServerActivityTS = Date.now();
  81. this.debug = config.debug;
  82. this.stompVersions = config.stompVersions;
  83. this.connectHeaders = config.connectHeaders;
  84. this.disconnectHeaders = config.disconnectHeaders;
  85. this.heartbeatIncoming = config.heartbeatIncoming;
  86. this.heartbeatOutgoing = config.heartbeatOutgoing;
  87. this.splitLargeFrames = config.splitLargeFrames;
  88. this.maxWebSocketChunkSize = config.maxWebSocketChunkSize;
  89. this.forceBinaryWSFrames = config.forceBinaryWSFrames;
  90. this.logRawCommunication = config.logRawCommunication;
  91. this.appendMissingNULLonIncoming = config.appendMissingNULLonIncoming;
  92. this.discardWebsocketOnCommFailure = config.discardWebsocketOnCommFailure;
  93. this.onConnect = config.onConnect;
  94. this.onDisconnect = config.onDisconnect;
  95. this.onStompError = config.onStompError;
  96. this.onWebSocketClose = config.onWebSocketClose;
  97. this.onWebSocketError = config.onWebSocketError;
  98. this.onUnhandledMessage = config.onUnhandledMessage;
  99. this.onUnhandledReceipt = config.onUnhandledReceipt;
  100. this.onUnhandledFrame = config.onUnhandledFrame;
  101. }
  102. public start(): void {
  103. const parser = new Parser(
  104. // On Frame
  105. rawFrame => {
  106. const frame = FrameImpl.fromRawFrame(
  107. rawFrame,
  108. this._escapeHeaderValues
  109. );
  110. // if this.logRawCommunication is set, the rawChunk is logged at this._webSocket.onmessage
  111. if (!this.logRawCommunication) {
  112. this.debug(`<<< ${frame}`);
  113. }
  114. const serverFrameHandler =
  115. this._serverFrameHandlers[frame.command] || this.onUnhandledFrame;
  116. serverFrameHandler(frame);
  117. },
  118. // On Incoming Ping
  119. () => {
  120. this.debug('<<< PONG');
  121. }
  122. );
  123. this._webSocket.onmessage = (evt: IStompSocketMessageEvent) => {
  124. this.debug('Received data');
  125. this._lastServerActivityTS = Date.now();
  126. if (this.logRawCommunication) {
  127. const rawChunkAsString =
  128. evt.data instanceof ArrayBuffer
  129. ? new TextDecoder().decode(evt.data)
  130. : evt.data;
  131. this.debug(`<<< ${rawChunkAsString}`);
  132. }
  133. parser.parseChunk(
  134. evt.data as string | ArrayBuffer,
  135. this.appendMissingNULLonIncoming
  136. );
  137. };
  138. this._webSocket.onclose = (closeEvent): void => {
  139. this.debug(`Connection closed to ${this._webSocket.url}`);
  140. this._cleanUp();
  141. this.onWebSocketClose(closeEvent);
  142. };
  143. this._webSocket.onerror = (errorEvent): void => {
  144. this.onWebSocketError(errorEvent);
  145. };
  146. this._webSocket.onopen = () => {
  147. // Clone before updating
  148. const connectHeaders = (Object as any).assign({}, this.connectHeaders);
  149. this.debug('Web Socket Opened...');
  150. connectHeaders['accept-version'] = this.stompVersions.supportedVersions();
  151. connectHeaders['heart-beat'] = [
  152. this.heartbeatOutgoing,
  153. this.heartbeatIncoming,
  154. ].join(',');
  155. this._transmit({ command: 'CONNECT', headers: connectHeaders });
  156. };
  157. }
  158. private readonly _serverFrameHandlers: {
  159. [key: string]: frameCallbackType;
  160. } = {
  161. // [CONNECTED Frame](https://stomp.github.com/stomp-specification-1.2.html#CONNECTED_Frame)
  162. CONNECTED: frame => {
  163. this.debug(`connected to server ${frame.headers.server}`);
  164. this._connected = true;
  165. this._connectedVersion = frame.headers.version;
  166. // STOMP version 1.2 needs header values to be escaped
  167. if (this._connectedVersion === Versions.V1_2) {
  168. this._escapeHeaderValues = true;
  169. }
  170. this._setupHeartbeat(frame.headers);
  171. this.onConnect(frame);
  172. },
  173. // [MESSAGE Frame](https://stomp.github.com/stomp-specification-1.2.html#MESSAGE)
  174. MESSAGE: frame => {
  175. // the callback is registered when the client calls
  176. // `subscribe()`.
  177. // If there is no registered subscription for the received message,
  178. // the default `onUnhandledMessage` callback is used that the client can set.
  179. // This is useful for subscriptions that are automatically created
  180. // on the browser side (e.g. [RabbitMQ's temporary
  181. // queues](https://www.rabbitmq.com/stomp.html)).
  182. const subscription = frame.headers.subscription;
  183. const onReceive =
  184. this._subscriptions[subscription] || this.onUnhandledMessage;
  185. // bless the frame to be a Message
  186. const message = frame as IMessage;
  187. const client = this;
  188. const messageId =
  189. this._connectedVersion === Versions.V1_2
  190. ? message.headers.ack
  191. : message.headers['message-id'];
  192. // add `ack()` and `nack()` methods directly to the returned frame
  193. // so that a simple call to `message.ack()` can acknowledge the message.
  194. message.ack = (headers: StompHeaders = {}): void => {
  195. return client.ack(messageId, subscription, headers);
  196. };
  197. message.nack = (headers: StompHeaders = {}): void => {
  198. return client.nack(messageId, subscription, headers);
  199. };
  200. onReceive(message);
  201. },
  202. // [RECEIPT Frame](https://stomp.github.com/stomp-specification-1.2.html#RECEIPT)
  203. RECEIPT: frame => {
  204. const callback = this._receiptWatchers[frame.headers['receipt-id']];
  205. if (callback) {
  206. callback(frame);
  207. // Server will acknowledge only once, remove the callback
  208. delete this._receiptWatchers[frame.headers['receipt-id']];
  209. } else {
  210. this.onUnhandledReceipt(frame);
  211. }
  212. },
  213. // [ERROR Frame](https://stomp.github.com/stomp-specification-1.2.html#ERROR)
  214. ERROR: frame => {
  215. this.onStompError(frame);
  216. },
  217. };
  218. private _setupHeartbeat(headers: StompHeaders): void {
  219. if (
  220. headers.version !== Versions.V1_1 &&
  221. headers.version !== Versions.V1_2
  222. ) {
  223. return;
  224. }
  225. // It is valid for the server to not send this header
  226. // https://stomp.github.io/stomp-specification-1.2.html#Heart-beating
  227. if (!headers['heart-beat']) {
  228. return;
  229. }
  230. // heart-beat header received from the server looks like:
  231. //
  232. // heart-beat: sx, sy
  233. const [serverOutgoing, serverIncoming] = headers['heart-beat']
  234. .split(',')
  235. .map((v: string) => parseInt(v, 10));
  236. if (this.heartbeatOutgoing !== 0 && serverIncoming !== 0) {
  237. const ttl: number = Math.max(this.heartbeatOutgoing, serverIncoming);
  238. this.debug(`send PING every ${ttl}ms`);
  239. this._pinger = setInterval(() => {
  240. if (this._webSocket.readyState === StompSocketState.OPEN) {
  241. this._webSocket.send(BYTE.LF);
  242. this.debug('>>> PING');
  243. }
  244. }, ttl);
  245. }
  246. if (this.heartbeatIncoming !== 0 && serverOutgoing !== 0) {
  247. const ttl: number = Math.max(this.heartbeatIncoming, serverOutgoing);
  248. this.debug(`check PONG every ${ttl}ms`);
  249. this._ponger = setInterval(() => {
  250. const delta = Date.now() - this._lastServerActivityTS;
  251. // We wait twice the TTL to be flexible on window's setInterval calls
  252. if (delta > ttl * 2) {
  253. this.debug(`did not receive server activity for the last ${delta}ms`);
  254. this._closeOrDiscardWebsocket();
  255. }
  256. }, ttl);
  257. }
  258. }
  259. private _closeOrDiscardWebsocket() {
  260. if (this.discardWebsocketOnCommFailure) {
  261. this.debug(
  262. 'Discarding websocket, the underlying socket may linger for a while'
  263. );
  264. this.discardWebsocket();
  265. } else {
  266. this.debug('Issuing close on the websocket');
  267. this._closeWebsocket();
  268. }
  269. }
  270. public forceDisconnect() {
  271. if (this._webSocket) {
  272. if (
  273. this._webSocket.readyState === StompSocketState.CONNECTING ||
  274. this._webSocket.readyState === StompSocketState.OPEN
  275. ) {
  276. this._closeOrDiscardWebsocket();
  277. }
  278. }
  279. }
  280. public _closeWebsocket() {
  281. this._webSocket.onmessage = () => {}; // ignore messages
  282. this._webSocket.close();
  283. }
  284. public discardWebsocket() {
  285. if (typeof this._webSocket.terminate !== 'function') {
  286. augmentWebsocket(this._webSocket, (msg: string) => this.debug(msg));
  287. }
  288. // @ts-ignore - this method will be there at this stage
  289. this._webSocket.terminate();
  290. }
  291. private _transmit(params: {
  292. command: string;
  293. headers?: StompHeaders;
  294. body?: string;
  295. binaryBody?: Uint8Array;
  296. skipContentLengthHeader?: boolean;
  297. }): void {
  298. const { command, headers, body, binaryBody, skipContentLengthHeader } =
  299. params;
  300. const frame = new FrameImpl({
  301. command,
  302. headers,
  303. body,
  304. binaryBody,
  305. escapeHeaderValues: this._escapeHeaderValues,
  306. skipContentLengthHeader,
  307. });
  308. let rawChunk = frame.serialize();
  309. if (this.logRawCommunication) {
  310. this.debug(`>>> ${rawChunk}`);
  311. } else {
  312. this.debug(`>>> ${frame}`);
  313. }
  314. if (this.forceBinaryWSFrames && typeof rawChunk === 'string') {
  315. rawChunk = new TextEncoder().encode(rawChunk);
  316. }
  317. if (typeof rawChunk !== 'string' || !this.splitLargeFrames) {
  318. this._webSocket.send(rawChunk);
  319. } else {
  320. let out = rawChunk as string;
  321. while (out.length > 0) {
  322. const chunk = out.substring(0, this.maxWebSocketChunkSize);
  323. out = out.substring(this.maxWebSocketChunkSize);
  324. this._webSocket.send(chunk);
  325. this.debug(`chunk sent = ${chunk.length}, remaining = ${out.length}`);
  326. }
  327. }
  328. }
  329. public dispose(): void {
  330. if (this.connected) {
  331. try {
  332. // clone before updating
  333. const disconnectHeaders = (Object as any).assign(
  334. {},
  335. this.disconnectHeaders
  336. );
  337. if (!disconnectHeaders.receipt) {
  338. disconnectHeaders.receipt = `close-${this._counter++}`;
  339. }
  340. this.watchForReceipt(disconnectHeaders.receipt, frame => {
  341. this._closeWebsocket();
  342. this._cleanUp();
  343. this.onDisconnect(frame);
  344. });
  345. this._transmit({ command: 'DISCONNECT', headers: disconnectHeaders });
  346. } catch (error) {
  347. this.debug(`Ignoring error during disconnect ${error}`);
  348. }
  349. } else {
  350. if (
  351. this._webSocket.readyState === StompSocketState.CONNECTING ||
  352. this._webSocket.readyState === StompSocketState.OPEN
  353. ) {
  354. this._closeWebsocket();
  355. }
  356. }
  357. }
  358. private _cleanUp() {
  359. this._connected = false;
  360. if (this._pinger) {
  361. clearInterval(this._pinger);
  362. this._pinger = undefined;
  363. }
  364. if (this._ponger) {
  365. clearInterval(this._ponger);
  366. this._ponger = undefined;
  367. }
  368. }
  369. public publish(params: IPublishParams): void {
  370. const { destination, headers, body, binaryBody, skipContentLengthHeader } =
  371. params;
  372. const hdrs: StompHeaders = (Object as any).assign({ destination }, headers);
  373. this._transmit({
  374. command: 'SEND',
  375. headers: hdrs,
  376. body,
  377. binaryBody,
  378. skipContentLengthHeader,
  379. });
  380. }
  381. public watchForReceipt(receiptId: string, callback: frameCallbackType): void {
  382. this._receiptWatchers[receiptId] = callback;
  383. }
  384. public subscribe(
  385. destination: string,
  386. callback: messageCallbackType,
  387. headers: StompHeaders = {}
  388. ): StompSubscription {
  389. headers = (Object as any).assign({}, headers);
  390. if (!headers.id) {
  391. headers.id = `sub-${this._counter++}`;
  392. }
  393. headers.destination = destination;
  394. this._subscriptions[headers.id] = callback;
  395. this._transmit({ command: 'SUBSCRIBE', headers });
  396. const client = this;
  397. return {
  398. id: headers.id,
  399. unsubscribe(hdrs) {
  400. return client.unsubscribe(headers.id, hdrs);
  401. },
  402. };
  403. }
  404. public unsubscribe(id: string, headers: StompHeaders = {}): void {
  405. headers = (Object as any).assign({}, headers);
  406. delete this._subscriptions[id];
  407. headers.id = id;
  408. this._transmit({ command: 'UNSUBSCRIBE', headers });
  409. }
  410. public begin(transactionId: string): ITransaction {
  411. const txId = transactionId || `tx-${this._counter++}`;
  412. this._transmit({
  413. command: 'BEGIN',
  414. headers: {
  415. transaction: txId,
  416. },
  417. });
  418. const client = this;
  419. return {
  420. id: txId,
  421. commit(): void {
  422. client.commit(txId);
  423. },
  424. abort(): void {
  425. client.abort(txId);
  426. },
  427. };
  428. }
  429. public commit(transactionId: string): void {
  430. this._transmit({
  431. command: 'COMMIT',
  432. headers: {
  433. transaction: transactionId,
  434. },
  435. });
  436. }
  437. public abort(transactionId: string): void {
  438. this._transmit({
  439. command: 'ABORT',
  440. headers: {
  441. transaction: transactionId,
  442. },
  443. });
  444. }
  445. public ack(
  446. messageId: string,
  447. subscriptionId: string,
  448. headers: StompHeaders = {}
  449. ): void {
  450. headers = (Object as any).assign({}, headers);
  451. if (this._connectedVersion === Versions.V1_2) {
  452. headers.id = messageId;
  453. } else {
  454. headers['message-id'] = messageId;
  455. }
  456. headers.subscription = subscriptionId;
  457. this._transmit({ command: 'ACK', headers });
  458. }
  459. public nack(
  460. messageId: string,
  461. subscriptionId: string,
  462. headers: StompHeaders = {}
  463. ): void {
  464. headers = (Object as any).assign({}, headers);
  465. if (this._connectedVersion === Versions.V1_2) {
  466. headers.id = messageId;
  467. } else {
  468. headers['message-id'] = messageId;
  469. }
  470. headers.subscription = subscriptionId;
  471. return this._transmit({ command: 'NACK', headers });
  472. }
  473. }