![]() |
VOOZH | about |
Moonlit enables teams to design automated workflows and AI-driven processes through flexible orchestration blocks. But when workflows must rely on live enterprise data like CRM, ERP, finance, analytics, or cloud applications, developers often resort to custom backend logic or scheduled syncs. This increases maintenance overhead and restricts workflows to stale data snapshots.
CData Connect AI solves this problem by exposing real-time access to hundreds of enterprise systems through its secure Model Context Protocol (MCP) Server. Moonlit workflows can interact with live data using a lightweight Claude-based agent service that forwards workflow questions to CData Connect AI.
This guide outlines the steps required to configure the Claude MCP agent, deploy a FastAPI endpoint for live tool access, expose the agent to Moonlit, and build workflows that operate on real-time data through CData Connect AI.
Before you begin, you'll need a few credentials mentioned below:
Connectivity to Amazon Athena from Moonlit is made possible through CData Connect AI Remote MCP. To interact with Amazon Athena data from Moonlit, we start by creating and configuring a Amazon Athena connection in CData Connect AI.
To authorize Amazon Athena requests, provide the credentials for an administrator account or for an IAM user with custom permissions: Set to the access key Id. Set to the secret access key.
Note: Though you can connect as the AWS account administrator, it is recommended to use IAM user credentials to access AWS services.
To obtain the credentials for an IAM user, follow the steps below:
To obtain the credentials for your AWS root account, follow the steps below:
If you are using the CData Data Provider for Amazon Athena 2018 from an EC2 Instance and have an IAM Role assigned to the instance, you can use the IAM Role to authenticate. To do so, set to true and leave and empty. The CData Data Provider for Amazon Athena 2018 will automatically obtain your IAM Role credentials and authenticate with them.
In many situations it may be preferable to use an IAM role for authentication instead of the direct security credentials of an AWS root user. An AWS role may be used instead by specifying the . This will cause the CData Data Provider for Amazon Athena 2018 to attempt to retrieve credentials for the specified role. If you are connecting to AWS (instead of already being connected such as on an EC2 instance), you must additionally specify the and of an IAM user to assume the role for. Roles may not be used when specifying the and of an AWS root user.
For users and roles that require Multi-factor Authentication, specify the and connection properties. This will cause the CData Data Provider for Amazon Athena 2018 to submit the MFA credentials in a request to retrieve temporary authentication credentials. Note that the duration of the temporary credentials may be controlled via the (default 3600 seconds).
In addition to the and properties, specify , and . Set to the region where your Amazon Athena data is hosted. Set to a folder in S3 where you would like to store the results of queries.
If is not set in the connection, the data provider connects to the default database set in Amazon Athena.
π Configuring a connection (Salesforce is shown)A Personal Access Token (PAT) is used to authenticate the connection to Connect AI from Moonlit. It is best practice to create a separate PAT for each service to maintain granularity of access.
Begin by cloning the official CData MCP agent repository and setting up the required Python environment to prepare your project for integration.
git clone https://github.com/CDataSoftware/connect-ai-claude-agent.git cd connect-ai-claude-agent
(Optional but recommended):
python -m venv .venv source .venv/bin/activate # macOS/Linux .venv\Scripts\activate # Windows
Install dependencies:
pip install -r requirements.txt pip install fastapi uvicorn python-dotenv pydantic
Copy and configure the .env file:
cp .env.example .env
Update values:
ANTHROPIC_API_KEY=your_anthropic_key_here CDATA_EMAIL=your_email CDATA_ACCESS_TOKEN=your_cdata_pat MCP_SERVER_URL=https://mcp.cloud.cdata.com/mcp/
Replace your agent_chatbot.py with the full version below :
#!/usr/bin/env python3
import os
import json
import base64
import asyncio
from typing import Optional, Dict, Any, List
import requests
from dotenv import load_dotenv
from anthropic import Anthropic
# Load environment variables from .env file
load_dotenv()
class MCPClient:
"""Client for interacting with CData Connect AI MCP server over HTTP."""
def __init__(self, server_url: str, email: Optional[str] = None, access_token: Optional[str] = None):
self.server_url = server_url.rstrip("/")
self.session = requests.Session()
# Default headers for MCP JSON-RPC over SSE
self.session.headers.update(
{
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
}
)
if email and access_token:
# Basic authentication: email:personal_access_token
credentials = f"{email}:{access_token}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
self.session.headers.update(
{
"Authorization": f"Basic {encoded_credentials}",
}
)
def _parse_sse_response(self, response_text: str) -> dict:
"""Parse Server-Sent Events (SSE) response."""
# SSE format: event: message\n data: {...}\n\n
for line in response_text.split("\n"):
if line.startswith("data: "):
data_json = line[6:] # Remove 'data: ' prefix
return json.loads(data_json)
raise ValueError("No data found in SSE response")
def list_tools(self) -> list:
"""Get available tools from the MCP server."""
response = self.session.post(
self.server_url,
json={
"jsonrpc": "2.0",
"method": "tools/list",
"params": {},
"id": 1,
},
)
response.raise_for_status()
result = self._parse_sse_response(response.text)
return result.get("result", {}).get("tools", [])
def call_tool(self, tool_name: str, arguments: dict) -> dict:
"""Call a tool on the MCP server."""
response = self.session.post(
self.server_url,
json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments},
"id": 2,
},
)
response.raise_for_status()
result = self._parse_sse_response(response.text)
return result.get("result", {})
class MCPAgentChatbot:
"""
AI agent chatbot for CData Connect AI using Anthropic's HTTP API.
- Loads tools from the CData MCP server
- Maps them to Anthropic 'tools' spec
- Orchestrates tool_use / tool_result loop without Claude Code CLI
"""
def __init__(self, mcp_server_url: str, email: Optional[str] = None, access_token: Optional[str] = None):
# Anthropic client (HTTP only, no CLI)
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
raise RuntimeError("ANTHROPIC_API_KEY is not set in the environment/.env")
self.client = Anthropic(api_key=api_key)
# Hard-coded model: do NOT use ANTHROPIC_MODEL from env
self.model = "claude-3-haiku-20240307"
print(f"Using Anthropic model: {self.model} (hard-coded, env ANTHROPIC_MODEL ignored)")
# MCP client
self.mcp_client = MCPClient(mcp_server_url, email, access_token)
print("Connecting to CData Connect AI MCP server...")
self.mcp_tools_list = self.mcp_client.list_tools()
print(f"Loaded {len(self.mcp_tools_list)} tools from MCP server")
if self.mcp_tools_list:
print(" Available tools:")
for tool_info in self.mcp_tools_list[:5]:
print(f" - {tool_info.get('name', 'unknown')}")
if len(self.mcp_tools_list) > 5:
print(f" ... and {len(self.mcp_tools_list) - 5} more")
# Convert MCP tools to Anthropic tools spec
self.tools_for_claude = self._create_tools_for_claude()
# Stronger instructions for tool use and final answers
self.system_prompt = (
"You are a helpful assistant with access to CData Connect AI tools over MCP. "
"Use these tools to answer questions about data sources, connections, tables, and queries. "
"When tools fail or return errors, always explain clearly what went wrong instead of "
"continuing to call tools. After at most 3 tool calls, you MUST respond with the best "
"possible final answer using whatever data or error messages you have."
)
def _create_tools_for_claude(self) -> List[Dict[str, Any]]:
"""Convert MCP tool definitions into Anthropic 'tools' format."""
tools = []
for t in self.mcp_tools_list:
name = t.get("name")
if not name:
continue
tools.append(
{
"name": name,
"description": t.get("description", ""),
"input_schema": t.get("inputSchema", {"type": "object", "properties": {}}),
}
)
return tools
def _run_agent_sync(self, user_message: str) -> str:
"""
Synchronous agent loop using Anthropic tools API.
We run this in a thread from async contexts.
"""
messages: List[Dict[str, Any]] = [
{
"role": "user",
"content": user_message,
}
]
# Safety: avoid infinite loops
max_tool_rounds = 6
# Keep track of the last tool result so we can surface real errors
last_tool_result_text: Optional[str] = None
for _ in range(max_tool_rounds):
try:
response = self.client.messages.create(
model=self.model,
max_tokens=1024,
system=self.system_prompt,
tools=self.tools_for_claude,
messages=messages,
)
except Exception as e:
# Surface Anthropic errors (e.g., bad model) as a normal answer
return (
f"Error when calling Anthropic model '{self.model}': {e}. "
"Check that the model name is valid for your API key."
)
# Extract tool_use contents, if any
tool_uses = [
block
for block in response.content
if getattr(block, "type", None) == "tool_use"
]
# If no tools requested, return the assistant's text reply
if not tool_uses:
texts = [
getattr(block, "text", "")
for block in response.content
if getattr(block, "type", None) == "text"
]
texts = [t for t in texts if t]
if texts:
# Include model tag to make debugging easier
return f"[model={self.model}] " + "\n\n".join(texts)
# No tools and no text β dump raw response for debugging
return f"[model={self.model}] Raw response:\n" + json.dumps(
response.model_dump(), indent=2
)
# Add the tool_use content as an assistant message
messages.append(
{
"role": "assistant",
"content": response.content,
}
)
# For each requested tool, call the MCP server and send tool_result back
tool_result_messages: List[Dict[str, Any]] = []
for tu in tool_uses:
tool_name = getattr(tu, "name", None)
tool_use_id = getattr(tu, "id", None)
tool_input = getattr(tu, "input", {}) or {}
try:
result = self.mcp_client.call_tool(tool_name, tool_input)
result_text = json.dumps(result, indent=2)
except Exception as e:
result_text = f"Error calling tool {tool_name}: {e}"
# Remember the last tool result for better error messages later
last_tool_result_text = result_text
tool_result_messages.append(
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": result_text,
}
],
}
)
messages.extend(tool_result_messages)
# Fallback if we exit loop without a final answer β but show last tool result
if last_tool_result_text:
return (
f"[model={self.model}] I was unable to produce a final natural-language answer "
f"after using tools. Here is the last tool result I received:\n\n"
f"{last_tool_result_text}"
)
else:
return f"[model={self.model}] I was unable to produce a final answer after using tools."
async def chat_once(self, user_message: str) -> str:
"""Async wrapper around the synchronous agent loop."""
return await asyncio.to_thread(self._run_agent_sync, user_message)
async def interactive_mode(chatbot: MCPAgentChatbot):
"""Run the chatbot in interactive CLI mode (uses same HTTP-only agent)."""
print("\nChatbot ready! Type 'quit' to exit.\n")
while True:
try:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
response = await chatbot.chat_once(user_input)
print(f"\nAssistant: {response}\n")
except KeyboardInterrupt:
print("\nGoodbye!")
break
except Exception as e:
print(f"Error: {e}\n")
async def main():
"""Run the chatbot in interactive mode (for local testing)."""
MCP_SERVER_URL = os.getenv("MCP_SERVER_URL", "https://mcp.cloud.cdata.com/mcp/")
CDATA_EMAIL = os.environ.get("CDATA_EMAIL")
CDATA_ACCESS_TOKEN = os.environ.get("CDATA_ACCESS_TOKEN")
print("=== CData Connect AI Agent Chatbot ===")
print(f"MCP Server: {MCP_SERVER_URL}\n")
if not CDATA_EMAIL or not CDATA_ACCESS_TOKEN:
print("CData credentials not found.")
print("\nThis app requires your CData email and personal access token.")
print("\nTo configure:")
print(" set [email protected]")
print(" set CDATA_ACCESS_TOKEN=your_personal_access_token\n")
return
try:
chatbot = MCPAgentChatbot(MCP_SERVER_URL, CDATA_EMAIL, CDATA_ACCESS_TOKEN)
except Exception as e:
print(f"Error connecting to MCP server or Anthropic: {e}")
return
await interactive_mode(chatbot)
if __name__ == "__main__":
asyncio.run(main())
This implementation:
Below is the full api_server.py, which exposes the agent over a POST /ask endpoint :
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from agent_chatbot import MCPAgentChatbot
app = FastAPI()
# Environment variables (also loaded in agent_chatbot via load_dotenv)
CDATA_EMAIL = os.getenv("CDATA_EMAIL")
CDATA_ACCESS_TOKEN = os.getenv("CDATA_ACCESS_TOKEN")
MCP_SERVER_URL = os.getenv("MCP_SERVER_URL", "https://mcp.cloud.cdata.com/mcp/")
# Instantiate chatbot at startup
try:
chatbot = MCPAgentChatbot(
mcp_server_url=MCP_SERVER_URL,
email=CDATA_EMAIL,
access_token=CDATA_ACCESS_TOKEN,
)
except Exception as e:
# Fail fast if configuration is broken
raise RuntimeError(f"Failed to initialize MCPAgentChatbot: {e}")
class AskRequest(BaseModel):
question: str
class AskResponse(BaseModel):
answer: str
@app.post("/ask", response_model=AskResponse)
async def ask(req: AskRequest):
try:
answer = await chatbot.chat_once(req.question)
return AskResponse(answer=answer)
except Exception as e:
# Convert unexpected backend errors into a 500 with a readable message
raise HTTPException(status_code=500, detail=f"Agent error: {e}")
uvicorn api_server:app --host 0.0.0.0 --port 8000
http://localhost:8000/docs
To allow Moonlit or any external application to communicate with your FastAPI agent, you need to make your local server accessible over the internet. ngrok creates a secure public URL that tunnels directly to your running FastAPI service.
uvicorn api_server:app --host 0.0.0.0 --port 8000π Run the FastAPI Server
ngrok http 8000π Start an ngrok Tunnel
This ensures your Moonlit workflow can reach your agent service reliably during development.
Add a Text Input: UserQuestion
π Text Input ConfigurationConfigure:
Add Text Output: {{ custom_api_request_1.answer }}
π Text Output ConfigurationIn Moonlit's Test panel, run:
" What are the tools available in CData Connect AI MCP Server." π Testing the APPThe workflow will:
Moonlit and CData Connect AI together enable powerful AI-driven workflows where agents have live access to enterprise data and act intelligently without sync pipelines or manual integration logic.
Start your free trial today to see how CData can empower Moonlit with live, secure access to hundreds of external systems.
Learn more about CData Connect AI or sign up for free trial access:
Free Trial