Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions packages/web-api/src/WebClient.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,11 @@ describe('WebClient', () => {
ok: true,
ts: '123.123',
})
.post('/api/chat.stopStream', { channel: 'C0123456789', ts: '123.123', markdown_text: 'nice!' })
.post('/api/chat.stopStream', {
channel: 'C0123456789',
ts: '123.123',
chunks: JSON.stringify([{ type: 'markdown_text', text: 'nice!' }]),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: I made commit fc5ed1a which updates this test to support chunks instead of markdown_text because the streaming helper will now convert all markdown text into a chunk.

})
.reply(200, {
ok: true,
});
Expand Down Expand Up @@ -1279,7 +1283,7 @@ describe('WebClient', () => {
const scope = nock('https://slack.com')
.post('/api/chat.startStream', {
channel: 'C0123456789',
markdown_text: '**this messag',
chunks: JSON.stringify([{ type: 'markdown_text', text: '**this messag' }]),
recipient_team_id: 'T0123456789',
recipient_user_id: 'U0123456789',
thread_ts: '123.000',
Expand All @@ -1290,7 +1294,7 @@ describe('WebClient', () => {
})
.post('/api/chat.appendStream', {
channel: 'C0123456789',
markdown_text: 'e is bold!',
chunks: JSON.stringify([{ type: 'markdown_text', text: 'e is bold!' }]),
token: 'xoxb-updated-1',
ts: '123.123',
})
Expand All @@ -1300,7 +1304,7 @@ describe('WebClient', () => {
.post('/api/chat.stopStream', {
blocks: JSON.stringify([contextActionsBlock]),
channel: 'C0123456789',
markdown_text: '**',
chunks: JSON.stringify([{ type: 'markdown_text', text: '**' }]),
token: 'xoxb-updated-2',
ts: '123.123',
})
Expand Down Expand Up @@ -1370,7 +1374,11 @@ describe('WebClient', () => {
ok: true,
ts: '123.123',
})
.post('/api/chat.stopStream', { channel: 'C0123456789', ts: '123.123', markdown_text: 'nice!' })
.post('/api/chat.stopStream', {
channel: 'C0123456789',
ts: '123.123',
chunks: JSON.stringify([{ type: 'markdown_text', text: 'nice!' }]),
})
.reply(200, {
ok: true,
});
Expand Down
63 changes: 41 additions & 22 deletions packages/web-api/src/chat-stream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Logger } from '@slack/logger';
import type { AnyChunk } from '@slack/types';
import type { ChatAppendStreamArguments, ChatStartStreamArguments, ChatStopStreamArguments } from './types/request';
import type { ChatAppendStreamResponse, ChatStartStreamResponse, ChatStopStreamResponse } from './types/response';
import type WebClient from './WebClient';
Expand All @@ -13,19 +14,12 @@ export interface ChatStreamerOptions {

export class ChatStreamer {
private buffer = '';

private client: WebClient;

private logger: Logger;

private options: Required<ChatStreamerOptions>;

private state: 'starting' | 'in_progress' | 'completed';

private streamArgs: ChatStartStreamArguments;

private streamTs: string | undefined;

private token: string | undefined;

/**
Expand Down Expand Up @@ -86,12 +80,15 @@ export class ChatStreamer {
if (this.state === 'completed') {
throw new Error(`failed to append stream: stream state is ${this.state}`);
}
if (args.token) {
this.token = args.token;
const { markdown_text, chunks, ...opts } = args;
if (opts.token) {
this.token = opts.token;
}
this.buffer += args.markdown_text;
if (this.buffer.length >= this.options.buffer_size) {
return await this.flushBuffer(args);
if (markdown_text) {
this.buffer += markdown_text;
}
if (this.buffer.length >= this.options.buffer_size || chunks) {
return await this.flushBuffer({ chunks, ...opts });
}
const details = {
bufferLength: this.buffer.length,
Expand Down Expand Up @@ -127,11 +124,12 @@ export class ChatStreamer {
if (this.state === 'completed') {
throw new Error(`failed to stop stream: stream state is ${this.state}`);
}
if (args?.token) {
this.token = args.token;
const { markdown_text, chunks, ...opts } = args ?? {};
if (opts.token) {
this.token = opts.token;
}
if (args?.markdown_text) {
this.buffer += args.markdown_text;
if (markdown_text) {
this.buffer += markdown_text;
}
if (!this.streamTs) {
const response = await this.client.chat.startStream({
Expand All @@ -144,12 +142,22 @@ export class ChatStreamer {
this.streamTs = response.ts;
this.state = 'in_progress';
}
const chunksToFlush: AnyChunk[] = [];
if (this.buffer.length > 0) {
chunksToFlush.push({
type: 'markdown_text',
text: this.buffer,
});
}
if (chunks) {
chunksToFlush.push(...chunks);
}
const response = await this.client.chat.stopStream({
token: this.token,
channel: this.streamArgs.channel,
ts: this.streamTs,
...args,
markdown_text: this.buffer,
chunks: chunksToFlush,
...opts,
});
this.state = 'completed';
return response;
Expand All @@ -158,12 +166,23 @@ export class ChatStreamer {
private async flushBuffer(
args: Omit<ChatStartStreamArguments | ChatAppendStreamArguments, 'channel' | 'ts'>,
): Promise<ChatStartStreamResponse | ChatAppendStreamResponse> {
const { chunks, ...opts } = args ?? {};
const chunksToFlush: AnyChunk[] = [];
if (this.buffer.length > 0) {
chunksToFlush.push({
type: 'markdown_text',
text: this.buffer,
});
}
if (chunks) {
chunksToFlush.push(...chunks);
}
if (!this.streamTs) {
const response = await this.client.chat.startStream({
...this.streamArgs,
token: this.token,
...args,
markdown_text: this.buffer,
chunks: chunksToFlush,
...opts,
});
this.buffer = '';
this.streamTs = response.ts;
Expand All @@ -174,8 +193,8 @@ export class ChatStreamer {
token: this.token,
channel: this.streamArgs.channel,
ts: this.streamTs,
...args,
markdown_text: this.buffer,
chunks: chunksToFlush,
...opts,
});
this.buffer = '';
return response;
Expand Down
18 changes: 17 additions & 1 deletion packages/web-api/test/types/methods/chat.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,23 @@ expectAssignable<Parameters<typeof web.chat.startStream>>([
{
channel: 'C1234',
thread_ts: '1234.56',
markdown_text: 'hello',
chunks: [
{
type: 'markdown_text',
text: 'Hello world',
},
{
type: 'plan_update',
title: 'Analyzing request',
},
{
type: 'task_update',
id: 'task-1',
title: 'Processing request',
status: 'in_progress',
details: 'Working on it...',
},
],
recipient_team_id: 'T1234',
recipient_user_id: 'U1234',
},
Expand Down
Loading