![]() |
VOOZH | about |
is a cloud-based workflow automation platform that allows developers to connect APIs, automate tasks, and build event-driven workflows using serverless functions. When combined with CData Connect AI Remote MCP, Pipedream can interact with Presto data in real time using natural language, without the need for data replication to a natively supported database.
CData Connect AI offers a dedicated cloud-to-cloud interface for connecting to Presto data. The CData Connect AI Remote MCP Server enables secure communication between Pipedream and Presto, making it possible to ask questions and retrieve data from Presto using Pipedream workflows, all powered by an LLM that intelligently discovers data sources and generates SQL queries on the fly.
This article covers how to build a simple natural language data query workflow in Pipedream to conversationally explore Presto data. The connectivity principles apply to any Pipedream workflow. With Connect AI, workflows and agents can be built with access to live Presto data, plus hundreds of other sources.
Accessing and integrating live data from Trino and Presto SQL engines has never been easier with CData. Customers rely on CData connectivity to:
Presto and Trino allow users to access a variety of underlying data sources through a single endpoint. When paired with CData connectivity, users get pure, SQL-92 access to their instances, allowing them to integrate business data with a data warehouse or easily access live data directly from their preferred tools, like Power BI and Tableau.
In many cases, CData's live connectivity surpasses the native import functionality available in tools. One customer was unable to effectively use Power BI due to the size of the datasets needed for reporting. When the company implemented the CData Power BI Connector for Presto they were able to generate reports in real-time using the DirectQuery connection mode.
Connectivity to Presto from Pipedream is made possible through CData Connect AI Remote MCP. To interact with Presto from Pipedream, we start by creating and configuring a Presto connection in CData Connect AI.
Set the Server and Port connection properties to connect, in addition to any authentication properties that may be required.
To enable TLS/SSL, set UseSSL to true.
In order to authenticate with LDAP, set the following connection properties:
In order to authenticate with KERBEROS, set the following connection properties:
A Personal Access Token (PAT) is used to authenticate the connection to Connect AI from Pipedream. It is best practice to create a separate PAT for each service to maintain granularity of access.
With the connection configured and a PAT generated, we are ready to connect to Presto from Pipedream.
Store credentials securely as environment variables in Pipedream.
| Variable name | Value |
|---|---|
| CDATA_EMAIL | CData Connect AI login email |
| CDATA_PAT | CData Personal Access Token |
| OPENAI_API_KEY | OpenAI API Key |
Add a Node.js code step named LLM. This step extracts the natural language query from the incoming request.
Replace the default code in the step with the following:
import OpenAI from "openai";
export default defineComponent({
async run({ steps }) {
if (steps.trigger.event.method === "OPTIONS") {
return { userQuery: null, isOptions: true };
}
const body = steps.trigger.event.body;
const parsed = typeof body === "string" ? JSON.parse(body) : body;
const userQuery = parsed?.query;
console.log("USER QUERY:", userQuery);
if (!userQuery) throw new Error("No query found in request body");
return { userQuery };
}
});
Add a Node.js code step named MCP. This step implements the full agentic MCP flow, it automatically discovers all available connections, selects the most relevant one based on the question, discovers the schema and tables dynamically, generates a SQL query using the LLM, and executes it against Presto data.
The step uses the following CData Connect AI MCP tools in sequence:
| MCP Tool | Purpose |
|---|---|
| getCatalogs | Retrieves all available connections from CData Connect AI |
| getSchemas | Retrieves the database schemas for the selected connection |
| getTables | Retrieves all tables and views for the selected schema |
| queryData | Executes the generated SQL query and returns results |
Replace the default code in the step with the following:
import fetch from "node-fetch";
import OpenAI from "openai";
export default defineComponent({
async run({ steps }) {
const email = process.env.CDATA_EMAIL;
const pat = process.env.CDATA_PAT;
const credentials = email + ":" + pat;
const auth = Buffer.from(credentials).toString("base64");
const llmOutput = steps.LLM;
const userQuery = llmOutput.return_value.userQuery; // In Pipedream replace with: steps.LLM.$return_value.userQuery
const MCP_URL = "https://mcp.cloud.cdata.com/mcp";
const NL = String.fromCharCode(10);
const CRNL = String.fromCharCode(13) + String.fromCharCode(10);
const headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"Authorization": "Basic " + auth
};
function parseSSE(raw) {
try {
const lines = raw.split(NL);
for (let i = 0; i < lines.length; i++) {
const line = lines.at(i);
const trimmed = line.trim();
if (trimmed.indexOf("data:") === 0) {
const jsonStr = trimmed.slice(5).trim();
if (jsonStr) {
const json = JSON.parse(jsonStr);
const result = json && json.result;
const content = result && result.content;
if (Array.isArray(content)) {
return {
parsed: content.map(function(c) { return c.text || ""; }).join(NL),
isError: (result && result.isError) || false,
full: json
};
}
}
}
}
} catch (e) {
console.log("SSE parse error:", e.message);
}
return { parsed: raw, isError: false, full: null };
}
function parseCSV(text) {
let clean = text || "";
if (clean.charAt(0) === '"' && clean.charAt(clean.length - 1) === '"') {
clean = clean.slice(1, -1);
}
const ESC_CRNL = String.fromCharCode(92) + "r" + String.fromCharCode(92) + "n";
const ESC_QUOTE = String.fromCharCode(92) + '"';
const ESC_SLASH = String.fromCharCode(92) + String.fromCharCode(92);
const SINGLE_SLASH = String.fromCharCode(92);
clean = clean.split(ESC_CRNL).join(CRNL).split(ESC_QUOTE).join('"').split(ESC_SLASH).join(SINGLE_SLASH);
const lines = clean.split(CRNL).filter(function(l) { return l.trim(); });
return lines.slice(1).map(function(l) { return l.split(",").at(0).trim(); }).filter(Boolean);
}
async function initSession() {
const res = await fetch(MCP_URL, {
method: "POST",
headers: headers,
body: JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "initialize",
params: {
protocolVersion: "2024-11-05",
capabilities: {},
clientInfo: { name: "pipedream", version: "1.0" }
}
})
});
return res.headers.get("mcp-session-id");
}
async function callMCP(id, method, args, sessionId) {
const reqHeaders = Object.assign({}, headers);
if (sessionId) {
Object.assign(reqHeaders, { "mcp-session-id": sessionId });
}
const res = await fetch(MCP_URL, {
method: "POST",
headers: reqHeaders,
body: JSON.stringify({
jsonrpc: "2.0",
id: id,
method: "tools/call",
params: { name: method, arguments: args }
})
});
const raw = await res.text();
const result = parseSSE(raw);
result.raw = raw;
return result;
}
const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const completions = client.chat.completions;
const session1 = await initSession();
const catalogsResult = await callMCP(2, "getCatalogs", {}, session1);
const catalogs = parseCSV(catalogsResult.parsed);
const systemMsg1 = "You are a data routing expert. Pick the MOST relevant connection name from the list. Return ONLY the connection name. Available connections: " + catalogs.join(", ");
const connectionResponse = await completions.create({
model: "gpt-4o-mini",
messages: new Array(
{ role: "system", content: systemMsg1 },
{ role: "user", content: userQuery }
)
});
const connectionName = connectionResponse.choices.at(0).message.content.trim();
const session2 = await initSession();
const schemasResult = await callMCP(2, "getSchemas", {
connectionName: connectionName,
catalogName: connectionName
}, session2);
const schemas = parseCSV(schemasResult.parsed);
const schemaName = schemas.at(0) || "REST";
const session3 = await initSession();
const tablesResult = await callMCP(2, "getTables", {
connectionName: connectionName,
catalogName: connectionName,
schemaName: schemaName
}, session3);
const tableNames = parseCSV(tablesResult.parsed);
const queryLower = userQuery.toLowerCase();
const isListTablesQuery =
queryLower.indexOf("list") !== -1 ||
queryLower.indexOf("what tables") !== -1 ||
queryLower.indexOf("show tables") !== -1;
if (isListTablesQuery) {
return {
success: true,
connection: connectionName,
message: "Available tables in " + connectionName + "." + schemaName,
tables: tableNames
};
}
const tableList = tableNames.map(function(t) {
return connectionName + "." + schemaName + "." + t;
}).join(", ");
const systemMsg2 = "You are a SQL expert. Generate SQL for CData. Use format: connectionName.schemaName.TableName. Available tables: " + tableList + ". Return ONLY SQL. No markdown. No brackets.";
const sqlResponse = await completions.create({
model: "gpt-4o-mini",
messages: new Array(
{ role: "system", content: systemMsg2 },
{ role: "user", content: userQuery }
)
});
const sql = sqlResponse.choices.at(0).message.content.trim();
if (!sql) { return { error: "LLM returned empty SQL" }; }
const session4 = await initSession();
const queryResult = await callMCP(2, "queryData", {
query: sql,
connectionName: connectionName
}, session4);
if (queryResult.full) {
const content = queryResult.full.result && queryResult.full.result.content;
if (Array.isArray(content)) {
try {
const parsed = JSON.parse(content.at(0).text);
const results = parsed.results && parsed.results.at(0);
return {
sql: sql,
connection: connectionName,
data: (results && results.rows) || new Array(),
schema: (results && results.schema) || new Array(),
success: true
};
} catch (e) {
return { sql: sql, connection: connectionName, raw: content.at(0).text, success: true };
}
}
}
return { sql: sql, connection: connectionName, raw: queryResult.raw };
}
});
Note: When pasting into Pipedream, replace llmOutput.return_value.userQuery with steps.LLM.$return_value.userQuery as indicated in the comment on that line.
| Key | Value |
|---|---|
| Access-Control-Allow-Origin | * |
| Access-Control-Allow-Methods | POST, OPTIONS |
| Access-Control-Allow-Headers | Content-Type |
{
"query": "list all tables"
}
π Set raw request bodyAfter the test run completes, click each step tab and check the Exports tab to inspect outputs:
| Step | What to look for in exports |
|---|---|
| trigger | body.query - confirms the query was received |
| LLM | userQuery - confirms the query was extracted |
| MCP | connection, sql, data, schema - confirms data was fetched |
| Response | $response.body - the final JSON response |
The Logs tab inside any step shows detailed outputs including the generated SQL, selected connection, and raw MCP responses.
Note: The Response step's Exports tab only shows a summary like { "success": true } with "status 200"; this confirms the workflow ran successfully but does not show the full data.
π Response output snippet
To see the complete output including data rows, SQL, and schema, click the MCP step tab and check the Exports tab. Expand $return_value to see the full response:
π MCP output snippet
The workflow automatically:
The integration uses the following CData Connect AI MCP tools in sequence:
| MCP tool | Purpose |
|---|---|
| getCatalogs | Retrieves all available connections from CData Connect AI |
| getSchemas | Retrieves the database schemas for a specific connection |
| getTables | Retrieves all tables and views for a specific schema |
| queryData | Executes SQL queries and returns results |
The OpenAI LLM acts as the intelligent layer between the natural language question and the CData MCP tools, selecting the right connection, discovering the data structure, and generating accurate SQL queries automatically.
Pipedream and CData Connect AI together enable intelligent, AI-driven workflows where natural language queries are automatically translated into live data operations across enterprise systems, without ETL pipelines, data sync jobs, or custom integration logic. This streamlined approach delivers stronger governance, lower operational overhead, and faster, more grounded responses from AI-powered workflows.
Start a free trial today to see how CData Connect AI can empower Pipedream with live, secure access to hundreds of external systems.
Learn more about CData Connect AI or sign up for free trial access:
Free Trial