![]() |
VOOZH | about |
Dataiku is a collaborative data science and AI platform that enables teams to design, deploy, and manage machine learning and generative AI projects within a governed environment. It's Agent and GenAI framework allows users to build intelligent agents that can analyze, generate, and act on data through custom workflows and model orchestration.
By integrating Dataiku with CData Connect AI through the built-in MCP (Model Context Protocol) Server, these agents gain secure, real-time access to live Elasticsearch data. The integration bridges Dataiku's agent execution environment with CData's governed enterprise connectivity layer, allowing every query or instruction to run safely against authorized data sources without manual exports or staging.
This article demonstrates how to configure Elasticsearch connectivity in Connect AI, prepare a Python code environment in Dataiku with MCP support, and create an agent that queries and interacts with live Elasticsearch data directly from within Dataiku.
Accessing and integrating live data from Elasticsearch has never been easier with CData. Customers rely on CData connectivity to:
Users frequently integrate Elasticsearch data with analytics tools such as Crystal Reports, Power BI, and Excel, and leverage our tools to enable a single, federated access layer to all of their data sources, including Elasticsearch.
For more information on CData's Elasticsearch solutions, check out our Knowledge Base article: CData Elasticsearch Driver Features & Differentiators.
Connectivity to Elasticsearch from Dataiku is made possible through CData Connect AI's Remote MCP Server. To interact with Elasticsearch data from Dataiku, you start by creating and configuring a Elasticsearch connection in CData Connect AI.
Set the Server and Port connection properties to connect. To authenticate, set the User and Password properties, PKI (public key infrastructure) properties, or both. To use PKI, set the SSLClientCert, SSLClientCertType, SSLClientCertSubject, and SSLClientCertPassword properties.
The data provider uses X-Pack Security for TLS/SSL and authentication. To connect over TLS/SSL, prefix the Server value with 'https://'. Note: TLS/SSL and client authentication must be enabled on X-Pack to use PKI.
Once the data provider is connected, X-Pack will then perform user authentication and grant role permissions based on the realms you have configured.
π Configuring a connection (Salesforce is shown)A Personal Access Token (PAT) is used to authenticate the connection to Connect AI from Dataiku. It is best practice to create a separate PAT for each integration to maintain granular access control
With the Elasticsearch connection configured and a PAT generated, Dataiku can now connect to Elasticsearch data through the Connect AI.
A dedicated python code environment in Dataiku provides the runtime support needed for MCP-based communication. To enable Dataiku Agents to connect to CData Connect AI, create a Python environment and install the MCP client dependencies required for agent-to-server interaction.
The Dataiku Agent serves as the bridge between the Dataiku workspace and Connect AI. To enable this connection, create a custom code-based agent, assign it the configured Python environment, and embed your Connect AI credentials to allow the agent to query and interact with live Elasticsearch data.
import os
import base64
from typing import Dict, Any, List
from dataiku.llm.python import BaseLLM
from langchain_mcp_adapters.client import MultiServerMCPClient
# ---------- Persistent MCP client (cached between calls) ----------
_MCP_CLIENT = None
def _get_mcp_client() -> MultiServerMCPClient:
"""Create (or reuse) a MultiServerMCPClient to CData Cloud MCP."""
global _MCP_CLIENT
if _MCP_CLIENT is not None:
return _MCP_CLIENT
# Set creds via env/project variables ideally
EMAIL = os.getenv("CDATA_EMAIL", "YOUR_EMAIL")
PAT = os.getenv("CDATA_PAT", "YOUR_PAT")
BASE_URL = "https://mcp.cloud.cdata.com/mcp"
if not EMAIL or PAT == "YOUR_PAT":
raise ValueError("Set CDATA_EMAIL and CDATA_PAT as env variables or inline in the code.")
token = base64.b64encode(f"{EMAIL}:{PAT}".encode()).decode()
headers = {"Authorization": f"Basic {token}"}
_MCP_CLIENT = MultiServerMCPClient(
connections={
"cdata": {
"transport": "streamable_http",
"url": BASE_URL,
"headers": headers,
}
}
)
return _MCP_CLIENT
def _pick_tool(tools, names: List[str]):
L = [n.lower() for n in names]
return next((t for t in tools if t.name.lower() in L), None)
async def _route(prompt: str) -> str:
"""
Simple intent router:
- 'list connections' / 'list catalogs' -> getCatalogs
- 'sql: ...' or 'query: ...' -> queryData
- otherwise -> help text
"""
client = _get_mcp_client()
tools = await client.get_tools()
p = prompt.strip()
low = p.lower()
# 1) List connections (catalogs)
if "list connections" in low or "list catalogs" in low:
t = _pick_tool(tools, ["getCatalogs", "listCatalogs"])
if not t:
return "No 'getCatalogs' tool found on the MCP server."
res = await t.ainvoke({})
return str(res)[:4000]
# 2) Run SQL
if low.startswith("sql:") or low.startswith("query:"):
sql = p.split(":", 1)[1].strip()
t = _pick_tool(tools, ["queryData", "sqlQuery", "runQuery", "query"])
if not t:
return "No query-capable tool (queryData/sqlQuery) found on the MCP server."
try:
res = await t.ainvoke({"query": sql})
return str(res)[:4000]
except Exception as e:
return f"Query failed: {e}"
# 3) Help
return (
"Connected to CData MCP
"
"Say **'list connections'** to view available sources, or run a SQL like:
"
" sql: SELECT * FROM [Salesforce1].[SYS].[Connections] LIMIT 5
"
"Remember to use bracket quoting for catalog/schema/table names."
)
class MyLLM(BaseLLM):
async def aprocess(self, query: Dict[str, Any], settings: Dict[str, Any], trace: Any):
# Extract last user message from the Quick Test payload
prompt = ""
try:
prompt = (query.get("messages") or [])[-1].get("content", "")
except Exception:
prompt = ""
try:
reply = await _route(prompt)
except Exception as e:
reply = f"Error: {e}"
# The template expects a dict with a 'text' key
return {"text": reply}
{
"messages": [
{
"role": "user",
"content": "list connections"
}
],
"context": {}
}
Switch to the Chat tab and try prompting like, "List all connections". The chat output will show a list of connection catalogs.
π Chat: listing catalogs and running queriesTo access hundreds of SaaS, Big Data, and NoSQL sources from your AI agents, try CData Connect AI today.
Learn more about CData Connect AI or sign up for free trial access:
Free Trial