stomp-handler.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. import { BYTE } from './byte.js';
  2. import { FrameImpl } from './frame-impl.js';
  3. import { Parser } from './parser.js';
  4. import { StompSocketState, } from './types.js';
  5. import { Versions } from './versions.js';
  6. import { augmentWebsocket } from './augment-websocket.js';
  7. /**
  8. * The STOMP protocol handler
  9. *
  10. * Part of `@stomp/stompjs`.
  11. *
  12. * @internal
  13. */
  14. export class StompHandler {
  15. constructor(_client, _webSocket, config) {
  16. this._client = _client;
  17. this._webSocket = _webSocket;
  18. this._connected = false;
  19. this._serverFrameHandlers = {
  20. // [CONNECTED Frame](https://stomp.github.com/stomp-specification-1.2.html#CONNECTED_Frame)
  21. CONNECTED: frame => {
  22. this.debug(`connected to server ${frame.headers.server}`);
  23. this._connected = true;
  24. this._connectedVersion = frame.headers.version;
  25. // STOMP version 1.2 needs header values to be escaped
  26. if (this._connectedVersion === Versions.V1_2) {
  27. this._escapeHeaderValues = true;
  28. }
  29. this._setupHeartbeat(frame.headers);
  30. this.onConnect(frame);
  31. },
  32. // [MESSAGE Frame](https://stomp.github.com/stomp-specification-1.2.html#MESSAGE)
  33. MESSAGE: frame => {
  34. // the callback is registered when the client calls
  35. // `subscribe()`.
  36. // If there is no registered subscription for the received message,
  37. // the default `onUnhandledMessage` callback is used that the client can set.
  38. // This is useful for subscriptions that are automatically created
  39. // on the browser side (e.g. [RabbitMQ's temporary
  40. // queues](https://www.rabbitmq.com/stomp.html)).
  41. const subscription = frame.headers.subscription;
  42. const onReceive = this._subscriptions[subscription] || this.onUnhandledMessage;
  43. // bless the frame to be a Message
  44. const message = frame;
  45. const client = this;
  46. const messageId = this._connectedVersion === Versions.V1_2
  47. ? message.headers.ack
  48. : message.headers['message-id'];
  49. // add `ack()` and `nack()` methods directly to the returned frame
  50. // so that a simple call to `message.ack()` can acknowledge the message.
  51. message.ack = (headers = {}) => {
  52. return client.ack(messageId, subscription, headers);
  53. };
  54. message.nack = (headers = {}) => {
  55. return client.nack(messageId, subscription, headers);
  56. };
  57. onReceive(message);
  58. },
  59. // [RECEIPT Frame](https://stomp.github.com/stomp-specification-1.2.html#RECEIPT)
  60. RECEIPT: frame => {
  61. const callback = this._receiptWatchers[frame.headers['receipt-id']];
  62. if (callback) {
  63. callback(frame);
  64. // Server will acknowledge only once, remove the callback
  65. delete this._receiptWatchers[frame.headers['receipt-id']];
  66. }
  67. else {
  68. this.onUnhandledReceipt(frame);
  69. }
  70. },
  71. // [ERROR Frame](https://stomp.github.com/stomp-specification-1.2.html#ERROR)
  72. ERROR: frame => {
  73. this.onStompError(frame);
  74. },
  75. };
  76. // used to index subscribers
  77. this._counter = 0;
  78. // subscription callbacks indexed by subscriber's ID
  79. this._subscriptions = {};
  80. // receipt-watchers indexed by receipts-ids
  81. this._receiptWatchers = {};
  82. this._partialData = '';
  83. this._escapeHeaderValues = false;
  84. this._lastServerActivityTS = Date.now();
  85. this.debug = config.debug;
  86. this.stompVersions = config.stompVersions;
  87. this.connectHeaders = config.connectHeaders;
  88. this.disconnectHeaders = config.disconnectHeaders;
  89. this.heartbeatIncoming = config.heartbeatIncoming;
  90. this.heartbeatOutgoing = config.heartbeatOutgoing;
  91. this.splitLargeFrames = config.splitLargeFrames;
  92. this.maxWebSocketChunkSize = config.maxWebSocketChunkSize;
  93. this.forceBinaryWSFrames = config.forceBinaryWSFrames;
  94. this.logRawCommunication = config.logRawCommunication;
  95. this.appendMissingNULLonIncoming = config.appendMissingNULLonIncoming;
  96. this.discardWebsocketOnCommFailure = config.discardWebsocketOnCommFailure;
  97. this.onConnect = config.onConnect;
  98. this.onDisconnect = config.onDisconnect;
  99. this.onStompError = config.onStompError;
  100. this.onWebSocketClose = config.onWebSocketClose;
  101. this.onWebSocketError = config.onWebSocketError;
  102. this.onUnhandledMessage = config.onUnhandledMessage;
  103. this.onUnhandledReceipt = config.onUnhandledReceipt;
  104. this.onUnhandledFrame = config.onUnhandledFrame;
  105. }
  106. get connectedVersion() {
  107. return this._connectedVersion;
  108. }
  109. get connected() {
  110. return this._connected;
  111. }
  112. start() {
  113. const parser = new Parser(
  114. // On Frame
  115. rawFrame => {
  116. const frame = FrameImpl.fromRawFrame(rawFrame, this._escapeHeaderValues);
  117. // if this.logRawCommunication is set, the rawChunk is logged at this._webSocket.onmessage
  118. if (!this.logRawCommunication) {
  119. this.debug(`<<< ${frame}`);
  120. }
  121. const serverFrameHandler = this._serverFrameHandlers[frame.command] || this.onUnhandledFrame;
  122. serverFrameHandler(frame);
  123. },
  124. // On Incoming Ping
  125. () => {
  126. this.debug('<<< PONG');
  127. });
  128. this._webSocket.onmessage = (evt) => {
  129. this.debug('Received data');
  130. this._lastServerActivityTS = Date.now();
  131. if (this.logRawCommunication) {
  132. const rawChunkAsString = evt.data instanceof ArrayBuffer
  133. ? new TextDecoder().decode(evt.data)
  134. : evt.data;
  135. this.debug(`<<< ${rawChunkAsString}`);
  136. }
  137. parser.parseChunk(evt.data, this.appendMissingNULLonIncoming);
  138. };
  139. this._webSocket.onclose = (closeEvent) => {
  140. this.debug(`Connection closed to ${this._webSocket.url}`);
  141. this._cleanUp();
  142. this.onWebSocketClose(closeEvent);
  143. };
  144. this._webSocket.onerror = (errorEvent) => {
  145. this.onWebSocketError(errorEvent);
  146. };
  147. this._webSocket.onopen = () => {
  148. // Clone before updating
  149. const connectHeaders = Object.assign({}, this.connectHeaders);
  150. this.debug('Web Socket Opened...');
  151. connectHeaders['accept-version'] = this.stompVersions.supportedVersions();
  152. connectHeaders['heart-beat'] = [
  153. this.heartbeatOutgoing,
  154. this.heartbeatIncoming,
  155. ].join(',');
  156. this._transmit({ command: 'CONNECT', headers: connectHeaders });
  157. };
  158. }
  159. _setupHeartbeat(headers) {
  160. if (headers.version !== Versions.V1_1 &&
  161. headers.version !== Versions.V1_2) {
  162. return;
  163. }
  164. // It is valid for the server to not send this header
  165. // https://stomp.github.io/stomp-specification-1.2.html#Heart-beating
  166. if (!headers['heart-beat']) {
  167. return;
  168. }
  169. // heart-beat header received from the server looks like:
  170. //
  171. // heart-beat: sx, sy
  172. const [serverOutgoing, serverIncoming] = headers['heart-beat']
  173. .split(',')
  174. .map((v) => parseInt(v, 10));
  175. if (this.heartbeatOutgoing !== 0 && serverIncoming !== 0) {
  176. const ttl = Math.max(this.heartbeatOutgoing, serverIncoming);
  177. this.debug(`send PING every ${ttl}ms`);
  178. this._pinger = setInterval(() => {
  179. if (this._webSocket.readyState === StompSocketState.OPEN) {
  180. this._webSocket.send(BYTE.LF);
  181. this.debug('>>> PING');
  182. }
  183. }, ttl);
  184. }
  185. if (this.heartbeatIncoming !== 0 && serverOutgoing !== 0) {
  186. const ttl = Math.max(this.heartbeatIncoming, serverOutgoing);
  187. this.debug(`check PONG every ${ttl}ms`);
  188. this._ponger = setInterval(() => {
  189. const delta = Date.now() - this._lastServerActivityTS;
  190. // We wait twice the TTL to be flexible on window's setInterval calls
  191. if (delta > ttl * 2) {
  192. this.debug(`did not receive server activity for the last ${delta}ms`);
  193. this._closeOrDiscardWebsocket();
  194. }
  195. }, ttl);
  196. }
  197. }
  198. _closeOrDiscardWebsocket() {
  199. if (this.discardWebsocketOnCommFailure) {
  200. this.debug('Discarding websocket, the underlying socket may linger for a while');
  201. this.discardWebsocket();
  202. }
  203. else {
  204. this.debug('Issuing close on the websocket');
  205. this._closeWebsocket();
  206. }
  207. }
  208. forceDisconnect() {
  209. if (this._webSocket) {
  210. if (this._webSocket.readyState === StompSocketState.CONNECTING ||
  211. this._webSocket.readyState === StompSocketState.OPEN) {
  212. this._closeOrDiscardWebsocket();
  213. }
  214. }
  215. }
  216. _closeWebsocket() {
  217. this._webSocket.onmessage = () => { }; // ignore messages
  218. this._webSocket.close();
  219. }
  220. discardWebsocket() {
  221. if (typeof this._webSocket.terminate !== 'function') {
  222. augmentWebsocket(this._webSocket, (msg) => this.debug(msg));
  223. }
  224. // @ts-ignore - this method will be there at this stage
  225. this._webSocket.terminate();
  226. }
  227. _transmit(params) {
  228. const { command, headers, body, binaryBody, skipContentLengthHeader } = params;
  229. const frame = new FrameImpl({
  230. command,
  231. headers,
  232. body,
  233. binaryBody,
  234. escapeHeaderValues: this._escapeHeaderValues,
  235. skipContentLengthHeader,
  236. });
  237. let rawChunk = frame.serialize();
  238. if (this.logRawCommunication) {
  239. this.debug(`>>> ${rawChunk}`);
  240. }
  241. else {
  242. this.debug(`>>> ${frame}`);
  243. }
  244. if (this.forceBinaryWSFrames && typeof rawChunk === 'string') {
  245. rawChunk = new TextEncoder().encode(rawChunk);
  246. }
  247. if (typeof rawChunk !== 'string' || !this.splitLargeFrames) {
  248. this._webSocket.send(rawChunk);
  249. }
  250. else {
  251. let out = rawChunk;
  252. while (out.length > 0) {
  253. const chunk = out.substring(0, this.maxWebSocketChunkSize);
  254. out = out.substring(this.maxWebSocketChunkSize);
  255. this._webSocket.send(chunk);
  256. this.debug(`chunk sent = ${chunk.length}, remaining = ${out.length}`);
  257. }
  258. }
  259. }
  260. dispose() {
  261. if (this.connected) {
  262. try {
  263. // clone before updating
  264. const disconnectHeaders = Object.assign({}, this.disconnectHeaders);
  265. if (!disconnectHeaders.receipt) {
  266. disconnectHeaders.receipt = `close-${this._counter++}`;
  267. }
  268. this.watchForReceipt(disconnectHeaders.receipt, frame => {
  269. this._closeWebsocket();
  270. this._cleanUp();
  271. this.onDisconnect(frame);
  272. });
  273. this._transmit({ command: 'DISCONNECT', headers: disconnectHeaders });
  274. }
  275. catch (error) {
  276. this.debug(`Ignoring error during disconnect ${error}`);
  277. }
  278. }
  279. else {
  280. if (this._webSocket.readyState === StompSocketState.CONNECTING ||
  281. this._webSocket.readyState === StompSocketState.OPEN) {
  282. this._closeWebsocket();
  283. }
  284. }
  285. }
  286. _cleanUp() {
  287. this._connected = false;
  288. if (this._pinger) {
  289. clearInterval(this._pinger);
  290. this._pinger = undefined;
  291. }
  292. if (this._ponger) {
  293. clearInterval(this._ponger);
  294. this._ponger = undefined;
  295. }
  296. }
  297. publish(params) {
  298. const { destination, headers, body, binaryBody, skipContentLengthHeader } = params;
  299. const hdrs = Object.assign({ destination }, headers);
  300. this._transmit({
  301. command: 'SEND',
  302. headers: hdrs,
  303. body,
  304. binaryBody,
  305. skipContentLengthHeader,
  306. });
  307. }
  308. watchForReceipt(receiptId, callback) {
  309. this._receiptWatchers[receiptId] = callback;
  310. }
  311. subscribe(destination, callback, headers = {}) {
  312. headers = Object.assign({}, headers);
  313. if (!headers.id) {
  314. headers.id = `sub-${this._counter++}`;
  315. }
  316. headers.destination = destination;
  317. this._subscriptions[headers.id] = callback;
  318. this._transmit({ command: 'SUBSCRIBE', headers });
  319. const client = this;
  320. return {
  321. id: headers.id,
  322. unsubscribe(hdrs) {
  323. return client.unsubscribe(headers.id, hdrs);
  324. },
  325. };
  326. }
  327. unsubscribe(id, headers = {}) {
  328. headers = Object.assign({}, headers);
  329. delete this._subscriptions[id];
  330. headers.id = id;
  331. this._transmit({ command: 'UNSUBSCRIBE', headers });
  332. }
  333. begin(transactionId) {
  334. const txId = transactionId || `tx-${this._counter++}`;
  335. this._transmit({
  336. command: 'BEGIN',
  337. headers: {
  338. transaction: txId,
  339. },
  340. });
  341. const client = this;
  342. return {
  343. id: txId,
  344. commit() {
  345. client.commit(txId);
  346. },
  347. abort() {
  348. client.abort(txId);
  349. },
  350. };
  351. }
  352. commit(transactionId) {
  353. this._transmit({
  354. command: 'COMMIT',
  355. headers: {
  356. transaction: transactionId,
  357. },
  358. });
  359. }
  360. abort(transactionId) {
  361. this._transmit({
  362. command: 'ABORT',
  363. headers: {
  364. transaction: transactionId,
  365. },
  366. });
  367. }
  368. ack(messageId, subscriptionId, headers = {}) {
  369. headers = Object.assign({}, headers);
  370. if (this._connectedVersion === Versions.V1_2) {
  371. headers.id = messageId;
  372. }
  373. else {
  374. headers['message-id'] = messageId;
  375. }
  376. headers.subscription = subscriptionId;
  377. this._transmit({ command: 'ACK', headers });
  378. }
  379. nack(messageId, subscriptionId, headers = {}) {
  380. headers = Object.assign({}, headers);
  381. if (this._connectedVersion === Versions.V1_2) {
  382. headers.id = messageId;
  383. }
  384. else {
  385. headers['message-id'] = messageId;
  386. }
  387. headers.subscription = subscriptionId;
  388. return this._transmit({ command: 'NACK', headers });
  389. }
  390. }
  391. //# sourceMappingURL=stomp-handler.js.map