Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| /** | |
| This file is a part of fetch-event-source package (as of v2.0.1) | |
| https://github.com/Azure/fetch-event-source/blob/v2.0.1/src/parse.ts | |
| Full package can be used after it is made compatible with nodejs: | |
| https://github.com/Azure/fetch-event-source/issues/20 | |
| Below is the fetch-event-source package license: | |
| MIT License | |
| Copyright (c) Microsoft Corporation. | |
| Permission is hereby granted, free of charge, to any person obtaining a copy | |
| of this software and associated documentation files (the "Software"), to deal | |
| in the Software without restriction, including without limitation the rights | |
| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
| copies of the Software, and to permit persons to whom the Software is | |
| furnished to do so, subject to the following conditions: | |
| The above copyright notice and this permission notice shall be included in all | |
| copies or substantial portions of the Software. | |
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
| SOFTWARE | |
| */ | |
| /** | |
| * Represents a message sent in an event stream | |
| * https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Event_stream_format | |
| */ | |
| export interface EventSourceMessage { | |
| /** The event ID to set the EventSource object's last event ID value. */ | |
| id: string; | |
| /** A string identifying the type of event described. */ | |
| event: string; | |
| /** The event data */ | |
| data: string; | |
| /** The reconnection interval (in milliseconds) to wait before retrying the connection */ | |
| retry?: number; | |
| } | |
| /** | |
| * Converts a ReadableStream into a callback pattern. | |
| * @param stream The input ReadableStream. | |
| * @param onChunk A function that will be called on each new byte chunk in the stream. | |
| * @returns {Promise<void>} A promise that will be resolved when the stream closes. | |
| */ | |
| export async function getBytes(stream: ReadableStream<Uint8Array>, onChunk: (arr: Uint8Array) => void) { | |
| const reader = stream.getReader(); | |
| let result: ReadableStreamReadResult<Uint8Array>; | |
| while (!(result = await reader.read()).done) { | |
| onChunk(result.value); | |
| } | |
| } | |
| const enum ControlChars { | |
| NewLine = 10, | |
| CarriageReturn = 13, | |
| Space = 32, | |
| Colon = 58, | |
| } | |
| /** | |
| * Parses arbitary byte chunks into EventSource line buffers. | |
| * Each line should be of the format "field: value" and ends with \r, \n, or \r\n. | |
| * @param onLine A function that will be called on each new EventSource line. | |
| * @returns A function that should be called for each incoming byte chunk. | |
| */ | |
| export function getLines(onLine: (line: Uint8Array, fieldLength: number) => void) { | |
| let buffer: Uint8Array | undefined; | |
| let position: number; // current read position | |
| let fieldLength: number; // length of the `field` portion of the line | |
| let discardTrailingNewline = false; | |
| // return a function that can process each incoming byte chunk: | |
| return function onChunk(arr: Uint8Array) { | |
| if (buffer === undefined) { | |
| buffer = arr; | |
| position = 0; | |
| fieldLength = -1; | |
| } else { | |
| // we're still parsing the old line. Append the new bytes into buffer: | |
| buffer = concat(buffer, arr); | |
| } | |
| const bufLength = buffer.length; | |
| let lineStart = 0; // index where the current line starts | |
| while (position < bufLength) { | |
| if (discardTrailingNewline) { | |
| if (buffer[position] === ControlChars.NewLine) { | |
| lineStart = ++position; // skip to next char | |
| } | |
| discardTrailingNewline = false; | |
| } | |
| // start looking forward till the end of line: | |
| let lineEnd = -1; // index of the \r or \n char | |
| for (; position < bufLength && lineEnd === -1; ++position) { | |
| switch (buffer[position]) { | |
| case ControlChars.Colon: | |
| if (fieldLength === -1) { // first colon in line | |
| fieldLength = position - lineStart; | |
| } | |
| break; | |
| // @ts-ignore:7029 \r case below should fallthrough to \n: | |
| case ControlChars.CarriageReturn: | |
| discardTrailingNewline = true; | |
| case ControlChars.NewLine: | |
| lineEnd = position; | |
| break; | |
| } | |
| } | |
| if (lineEnd === -1) { | |
| // We reached the end of the buffer but the line hasn't ended. | |
| // Wait for the next arr and then continue parsing: | |
| break; | |
| } | |
| // we've reached the line end, send it out: | |
| onLine(buffer.subarray(lineStart, lineEnd), fieldLength); | |
| lineStart = position; // we're now on the next line | |
| fieldLength = -1; | |
| } | |
| if (lineStart === bufLength) { | |
| buffer = undefined; // we've finished reading it | |
| } else if (lineStart !== 0) { | |
| // Create a new view into buffer beginning at lineStart so we don't | |
| // need to copy over the previous lines when we get the new arr: | |
| buffer = buffer.subarray(lineStart); | |
| position -= lineStart; | |
| } | |
| } | |
| } | |
| /** | |
| * Parses line buffers into EventSourceMessages. | |
| * @param onId A function that will be called on each `id` field. | |
| * @param onRetry A function that will be called on each `retry` field. | |
| * @param onMessage A function that will be called on each message. | |
| * @returns A function that should be called for each incoming line buffer. | |
| */ | |
| export function getMessages( | |
| onId: (id: string) => void, | |
| onRetry: (retry: number) => void, | |
| onMessage?: (msg: EventSourceMessage) => void | |
| ) { | |
| let message = newMessage(); | |
| const decoder = new TextDecoder(); | |
| // return a function that can process each incoming line buffer: | |
| return function onLine(line: Uint8Array, fieldLength: number) { | |
| if (line.length === 0) { | |
| // empty line denotes end of message. Trigger the callback and start a new message: | |
| onMessage?.(message); | |
| message = newMessage(); | |
| } else if (fieldLength > 0) { // exclude comments and lines with no values | |
| // line is of format "<field>:<value>" or "<field>: <value>" | |
| // https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation | |
| const field = decoder.decode(line.subarray(0, fieldLength)); | |
| const valueOffset = fieldLength + (line[fieldLength + 1] === ControlChars.Space ? 2 : 1); | |
| const value = decoder.decode(line.subarray(valueOffset)); | |
| switch (field) { | |
| case 'data': | |
| // if this message already has data, append the new value to the old. | |
| // otherwise, just set to the new value: | |
| message.data = message.data | |
| ? message.data + '\n' + value | |
| : value; // otherwise, | |
| break; | |
| case 'event': | |
| message.event = value; | |
| break; | |
| case 'id': | |
| onId(message.id = value); | |
| break; | |
| case 'retry': | |
| const retry = parseInt(value, 10); | |
| if (!isNaN(retry)) { // per spec, ignore non-integers | |
| onRetry(message.retry = retry); | |
| } | |
| break; | |
| } | |
| } | |
| } | |
| } | |
| function concat(a: Uint8Array, b: Uint8Array) { | |
| const res = new Uint8Array(a.length + b.length); | |
| res.set(a); | |
| res.set(b, a.length); | |
| return res; | |
| } | |
| function newMessage(): EventSourceMessage { | |
| // data, event, and id must be initialized to empty strings: | |
| // https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation | |
| // retry should be initialized to undefined so we return a consistent shape | |
| // to the js engine all the time: https://mathiasbynens.be/notes/shapes-ics#takeaways | |
| return { | |
| data: '', | |
| event: '', | |
| id: '', | |
| retry: undefined, | |
| }; | |
| } | |