-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathrag-workflow.ts
More file actions
178 lines (160 loc) · 5.55 KB
/
rag-workflow.ts
File metadata and controls
178 lines (160 loc) · 5.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/**
* RAG Workflow — Retrieval-Augmented Generation pipeline
*
* Demonstrates a complete RAG pipeline:
* 1. Index a document (text → embeddings → vector DB)
* 2. Search the index with a user query
* 3. Use search results as context for LLM answer generation
*
* Prerequisites:
* - An LLM integration configured in Conductor
* - A vector DB integration configured (e.g., Pinecone, Weaviate)
*
* Run:
* CONDUCTOR_SERVER_URL=http://localhost:8080 npx ts-node examples/advanced/rag-workflow.ts
*/
import {
OrkesClients,
ConductorWorkflow,
llmIndexTextTask,
llmSearchIndexTask,
llmChatCompleteTask,
Role,
} from "../../src/sdk";
async function main() {
const clients = await OrkesClients.from();
const workflowClient = clients.getWorkflowClient();
const llmProvider = process.env.LLM_PROVIDER ?? "openai_integration";
const llmModel = process.env.LLM_MODEL ?? "gpt-4o";
const embeddingProvider = process.env.EMBEDDING_PROVIDER ?? "openai_integration";
const embeddingModel = process.env.EMBEDDING_MODEL ?? "text-embedding-3-small";
const vectorDb = process.env.VECTOR_DB ?? "pinecone_integration";
const vectorIndex = process.env.VECTOR_INDEX ?? "rag-example-index";
// ── 1. Indexing Workflow ──────────────────────────────────────────
const indexWf = new ConductorWorkflow(workflowClient, "rag_index_workflow")
.description("Index text documents into vector DB for RAG");
indexWf.add(
llmIndexTextTask(
"index_ref",
vectorDb,
vectorIndex,
{ provider: embeddingProvider, model: embeddingModel },
"${workflow.input.text}",
"${workflow.input.docId}",
{
namespace: "${workflow.input.namespace}",
chunkSize: 500,
chunkOverlap: 50,
metadata: {
source: "${workflow.input.source}",
category: "${workflow.input.category}",
},
}
)
);
indexWf.outputParameters({
docId: "${workflow.input.docId}",
indexed: true,
});
await indexWf.register(true);
console.log("Registered indexing workflow:", indexWf.getName());
// ── 2. RAG Query Workflow ─────────────────────────────────────────
const queryWf = new ConductorWorkflow(workflowClient, "rag_query_workflow")
.description("Search indexed documents and generate answer with LLM");
// Step 1: Search the vector index
queryWf.add(
llmSearchIndexTask(
"search_ref",
vectorDb,
vectorIndex,
{ provider: embeddingProvider, model: embeddingModel },
"${workflow.input.question}",
{
namespace: "${workflow.input.namespace}",
maxResults: 5,
}
)
);
// Step 2: Generate answer using search results as context
queryWf.add(
llmChatCompleteTask("answer_ref", llmProvider, llmModel, {
messages: [
{
role: Role.SYSTEM,
message: `You are a helpful assistant. Answer the user's question based ONLY on the provided context. If the context doesn't contain relevant information, say so.
Context from documents:
\${search_ref.output.result}`,
},
{
role: Role.USER,
message: "${workflow.input.question}",
},
],
temperature: 0.3,
maxTokens: 500,
})
);
queryWf.outputParameters({
question: "${workflow.input.question}",
searchResults: "${search_ref.output.result}",
answer: "${answer_ref.output.result}",
});
await queryWf.register(true);
console.log("Registered query workflow:", queryWf.getName());
// ── 3. Execute the pipeline ───────────────────────────────────────
console.log("\n--- Step 1: Indexing documents ---");
const documents = [
{
docId: "doc-1",
text: "Conductor is an open-source workflow orchestration engine originally developed at Netflix. It supports complex workflow patterns including fork/join, sub-workflows, and dynamic tasks.",
source: "docs",
category: "overview",
},
{
docId: "doc-2",
text: "The TypeScript SDK for Conductor provides a fluent API for building workflows, task builders for all task types, and a decorator-based worker framework with @worker.",
source: "docs",
category: "sdk",
},
{
docId: "doc-3",
text: "Workers in Conductor poll for tasks, execute business logic, and report results back. The SDK supports concurrency control, adaptive backoff, and metrics collection.",
source: "docs",
category: "workers",
},
];
for (const doc of documents) {
try {
const run = await indexWf.execute({
...doc,
namespace: "rag-example",
});
console.log(` Indexed ${doc.docId}: ${run.status}`);
} catch (err) {
console.log(
` Indexing ${doc.docId} skipped (requires vector DB): ${(err as Error).message}`
);
break;
}
}
console.log("\n--- Step 2: Querying ---");
try {
const queryRun = await queryWf.execute({
question: "How do workers function in Conductor?",
namespace: "rag-example",
});
console.log("Status:", queryRun.status);
console.log("Answer:", (queryRun.output as Record<string, string>)?.answer);
} catch (err) {
console.log(
"Query skipped (requires LLM + vector DB):",
(err as Error).message
);
}
console.log("\nDone.");
process.exit(0);
}
main().catch((err) => {
console.error(err);
process.exit(1);
});