Skip to content

Commit a1894f1

Browse files
committed
fix: use single websocket for bi-di communication
1 parent b5f1159 commit a1894f1

File tree

3 files changed

+42
-38
lines changed

3 files changed

+42
-38
lines changed

examples/realtime-agents/src/index.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ interface Env {
2424
}
2525

2626
export class RealtimeVoiceAgent extends Agent<Env> {
27-
realtimePipelineComponents = this.createRealtimePipeline;
28-
29-
createRealtimePipeline() {
27+
realtimePipelineComponents = () => {
3028
// RealtimeKit transport for audio I/O
3129
const rtk = new RealtimeKitTransport(
3230
this.env.RTK_MEETING_ID || "default-meeting",
@@ -47,7 +45,7 @@ export class RealtimeVoiceAgent extends Agent<Env> {
4745
const tts = new ElevenLabsTTS(this.env.ELEVENLABS_API_KEY);
4846

4947
return [rtk, stt, this, tts, rtk];
50-
}
48+
};
5149

5250
/**
5351
* Handle incoming transcribed text and generate intelligent responses

packages/agents/src/index.ts

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import {
3636
processNDJSONStream,
3737
REALTIME_AGENTS_SERVICE,
3838
RealtimeKitTransport,
39+
TextProcessor,
3940
type RealtimePipelineComponent
4041
} from "./realtime-agent";
4142
import { randomUUID } from "node:crypto";
@@ -356,10 +357,11 @@ export class Agent<
356357
return DataKind.Text;
357358
}
358359

359-
schema() {
360+
schema(): { name: string; type: string; [K: string]: any } {
360361
return {
361362
name: this._ParentClass.name,
362-
type: "agent"
363+
type: "agent",
364+
internal_sdk: true
363365
};
364366
}
365367

@@ -1781,25 +1783,17 @@ export class Agent<
17811783
const { request } = getCurrentAgent();
17821784
if (!request) throw new Error("request is required");
17831785

1784-
console.log("initializing realtime pipeline");
17851786
const requestUrl = new URL(request.url);
17861787
const agentId = this.name;
17871788
const agentName = camelCaseToKebabCase(this._ParentClass.name);
17881789
const agentURL = `${requestUrl.host}/agents/${agentName}/${agentId}/realtime`;
17891790

1790-
const elements: { name: string; [K: string]: any }[] = [
1791-
{
1792-
name: "ws_out",
1793-
type: "websocket_out",
1794-
internal_sdk: true,
1795-
url: `wss://${agentURL}/ws`
1796-
},
1797-
{
1798-
name: "ws_in_text",
1799-
internal_sdk: true,
1800-
type: "websocket_in"
1801-
}
1802-
];
1791+
// We need to check if the components array have any instance of
1792+
// Agent or TextProcessor, in that case we need to split the components
1793+
// into two layers, where the first layers end at that component
1794+
// and the second layer starts from that component. The Agent/TextProcessor
1795+
// are websocket element, we will be using bidirectional websocket so only
1796+
// one websocket element will be acting as input and output both.
18031797

18041798
const layers: { id: number; name: string; elements: string[] }[] = [
18051799
{
@@ -1808,32 +1802,41 @@ export class Agent<
18081802
elements: []
18091803
}
18101804
];
1805+
let elements: { name: string; [K: string]: any }[] = [];
18111806

18121807
for (const component of components) {
1813-
if (component instanceof Agent) {
1814-
layers[layers.length - 1].elements.push("ws_out");
1808+
let schema = component.schema();
1809+
if (component instanceof Agent || component instanceof TextProcessor) {
1810+
if (component instanceof Agent) {
1811+
schema.type = "websocket";
1812+
schema.url = `wss://${agentURL}`;
1813+
}
1814+
layers[layers.length - 1].elements.push(schema.name);
1815+
18151816
layers.push({
18161817
id: layers.length + 1,
18171818
name: `default-${layers.length + 1}`,
1818-
elements: ["ws_in_text"]
1819+
elements: []
18191820
});
1820-
} else {
1821-
if (elements.filter((e) => e.name === component.name).length === 0) {
1822-
if (component instanceof RealtimeKitTransport)
1823-
elements.push({
1824-
...component.schema(),
1825-
worker_url: `https://${agentURL}`
1826-
});
1827-
else elements.push(component.schema());
1828-
}
1829-
layers[layers.length - 1].elements.push(component.name);
18301821
}
1822+
1823+
if (component instanceof RealtimeKitTransport) {
1824+
schema.worker_url = `https://${agentURL}`;
1825+
}
1826+
elements.push(schema);
1827+
layers[layers.length - 1].elements.push(schema.name);
18311828
}
18321829

1830+
elements = elements.filter(
1831+
(v, idx, arr) => idx === arr.findIndex((v1) => v1.name === v.name)
1832+
);
1833+
1834+
console.log("layers", layers, "elements", elements);
1835+
18331836
const response = await fetch(
18341837
`${CLOUDFLARE_BASE}/client/v4/accounts/${CF_ACCOUNT_ID}/realtime/agents/pipeline`,
1838+
// `${REALTIME_AGENTS_SERVICE}/pipeline`,
18351839
{
1836-
// const response = await fetch(`${REALTIME_AGENTS_SERVICE}/pipeline`, {
18371840
method: "POST",
18381841
headers: {
18391842
Authorization: `Bearer ${CF_API_TOKEN}`,
@@ -1878,8 +1881,7 @@ export class Agent<
18781881
// check if instance is already started
18791882
if (this.realtimePipelineRunning)
18801883
throw new Error("agent is already running");
1881-
const components = this.realtimePipelineComponents!();
1882-
await this.initRealtimePipeline(components);
1884+
await this.initRealtimePipeline(this.realtimePipelineComponents!());
18831885

18841886
const startResponse = await fetch(
18851887
`${REALTIME_AGENTS_SERVICE}/pipeline?authToken=${this._rtk_authToken}`,

packages/agents/src/realtime-agent.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { type Connection } from "./";
22
import RealtimeKitClient from "@cloudflare/realtimekit";
33

44
export const REALTIME_AGENTS_SERVICE = "https://agents.realtime.cloudflare.com";
5-
// export const REALTIME_AGENTS_SERVICE = "https://spotty-plums-start.loca.lt";
5+
// export const REALTIME_AGENTS_SERVICE = "https://curly-radios-eat.loca.lt";
66
export const CLOUDFLARE_BASE = "https://api.cloudflare.com";
77

88
export enum DataKind {
@@ -98,6 +98,7 @@ export class RealtimeKitTransport implements RealtimePipelineComponent {
9898
name: this.name,
9999
type: "rtk",
100100
meeting_id: this.meetingId,
101+
auth_token: this.authToken,
101102
filters: this.filters
102103
};
103104
}
@@ -188,6 +189,8 @@ export class ElevenLabsTTS implements RealtimePipelineComponent {
188189
export abstract class TextProcessor implements RealtimePipelineComponent {
189190
constructor() {}
190191

192+
abstract get url(): string;
193+
191194
abstract onRealtimeTranscript(
192195
text: string,
193196
reply: (response: string) => void
@@ -208,7 +211,8 @@ export abstract class TextProcessor implements RealtimePipelineComponent {
208211
schema() {
209212
return {
210213
name: this.name,
211-
type: "text_processor"
214+
type: "text_processor",
215+
url: this.url
212216
};
213217
}
214218
}

0 commit comments

Comments
 (0)