![]() |
VOOZH | about |
Agentforce is built for enterprise teams. Its declarative tooling and API-first architecture make it straightforward to compose assistants and automate CRM-centric workflows. But when agents need to work with data beyond Elasticsearch data, many implementations fall back on periodic syncs or bespoke middleware to replicate external systems into local stores. That adds complexity, creates governance and maintenance overhead, introduces sync delays, and limits the real-time potential of your agents.
CData Connect AI fills this gap with direct, live connectivity to hundreds of enterprise apps, databases, ERPs, and finance platforms. Using CData's remote Model Context Protocol (MCP) Server, Agentforce agents can securely read, write, and act on fresh, contextual data at runtime; no replication required. The result is grounded responses, real-time decisions, and cross-system automations with fewer moving parts and stronger control.
This article outlines the steps required to configure Elasticsearch connectivity in Connect AI, register the MCP server in Agentforce, and build a workflow that queries Elasticsearch data data.
Accessing and integrating live data from Elasticsearch has never been easier with CData. Customers rely on CData connectivity to:
Users frequently integrate Elasticsearch data with analytics tools such as Crystal Reports, Power BI, and Excel, and leverage our tools to enable a single, federated access layer to all of their data sources, including Elasticsearch.
For more information on CData's Elasticsearch solutions, check out our Knowledge Base article: CData Elasticsearch Driver Features & Differentiators.
Before you begin, you'll need a few credentials mentioned below:
Note: You can create the base64 encoded version of MCP_AUTH using any Base64 encoding tool.
Connectivity to Elasticsearch from Agentforce is made possible through CData Connect AI Remote MCP. To interact with Elasticsearch data from Agentforce, we start by creating and configuring a Elasticsearch connection in CData Connect AI.
Set the Server and Port connection properties to connect. To authenticate, set the User and Password properties, PKI (public key infrastructure) properties, or both. To use PKI, set the SSLClientCert, SSLClientCertType, SSLClientCertSubject, and SSLClientCertPassword properties.
The data provider uses X-Pack Security for TLS/SSL and authentication. To connect over TLS/SSL, prefix the Server value with 'https://'. Note: TLS/SSL and client authentication must be enabled on X-Pack to use PKI.
Once the data provider is connected, X-Pack will then perform user authentication and grant role permissions based on the realms you have configured.
π Configuring a connection (Salesforce is shown)A Personal Access Token (PAT) is used to authenticate the connection to Connect AI from Agentforce. It is best practice to create a separate PAT for each service to maintain granularity of access.
With the connection configured and a PAT generated, we are ready to connect to Elasticsearch data from Agentforce.
Install packages:
pip install requests asyncio langchain-mcp-adapters
Create config.py (replace placeholders with your actual values). It should define:
import base64 # --- MCP (CData Connect AI) --- EMAIL = "[email protected]" PAT = "your_PAT" MCP_BASE_URL = "https://mcp.cloud.cdata.com/mcp" MCP_AUTH = base64.b64encode(f"{EMAIL}:{PAT}".encode()).decode() # --- Salesforce Agentforce --- SFDC_DOMAIN = "https://your_domain.my.salesforce.com" SFDC_CLIENT_ID = "your_SFDC_CLIENT_ID" SFDC_CLIENT_SECRET = "your_SFDC_CLIENT_SECRET" AGENT_ID = "your_AGENT_ID"
The Python script has three main sections:
Open a terminal in the project directory where the script is and run:
python agentforce_script.py
import asyncio
import requests
import time
import uuid
from langchain_mcp_adapters.client import MultiServerMCPClient
from config import SFDC_DOMAIN, SFDC_CLIENT_ID, SFDC_CLIENT_SECRET, AGENT_ID, MCP_BASE_URL, MCP_AUTH
# ---------------- Salesforce / Einstein Agent ---------------- #
def get_salesforce_token():
"""Fetch a fresh Salesforce OAuth token using client credentials."""
print(" Requesting fresh Salesforce token...")
token_url = f"{SFDC_DOMAIN}/services/oauth2/token"
data = {
"grant_type": "client_credentials",
"client_id": SFDC_CLIENT_ID,
"client_secret": SFDC_CLIENT_SECRET,
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
resp = requests.post(token_url, data=data, headers=headers)
resp.raise_for_status()
token_data = resp.json()
print(" Got access token.")
return token_data["access_token"], token_data["instance_url"]
def start_agent_session(access_token):
"""Start a session with Salesforce Einstein Agent."""
session_url = f"https://api.salesforce.com/einstein/ai-agent/v1/agents/{AGENT_ID}/sessions"
payload = {
"externalSessionKey": str(uuid.uuid4()),
"instanceConfig": {"endpoint": SFDC_DOMAIN},
"streamingCapabilities": {"chunkTypes": ["Text"]},
"bypassUser": True,
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
}
resp = requests.post(session_url, json=payload, headers=headers)
resp.raise_for_status()
session_data = resp.json()
print(f" Agent session started: {session_data['sessionId']}")
return session_data["sessionId"]
def post_message_to_agent(access_token, session_id, message_text):
"""Send a message to the Salesforce Einstein Agent session."""
messages_url = f"https://api.salesforce.com/einstein/ai-agent/v1/sessions/{session_id}/messages"
payload = {
"message": {
"sequenceId": int(time.time() * 1000),
"type": "Text",
"text": message_text,
},
"variables": [],
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
}
resp = requests.post(messages_url, json=payload, headers=headers)
resp.raise_for_status()
return resp.json()
# ---------------- MCP part ---------------- #
async def query_mcp():
"""Query MCP server for catalogs and a sample query."""
mcp_client = MultiServerMCPClient(
connections={
"default": {
"transport": "streamable_http",
"url": MCP_BASE_URL,
"headers": {"Authorization": f"Basic {MCP_AUTH}"},
}
}
)
tools = await mcp_client.get_tools()
print("Discovered MCP tools:", [t.name for t in tools])
# List catalogs
get_catalogs_tool = next(t for t in tools if t.name == "getCatalogs")
catalogs = await get_catalogs_tool.ainvoke({})
print("Catalogs:", catalogs)
# Run a sample query
query_tool = next(t for t in tools if t.name == "queryData")
query_result = await query_tool.ainvoke({
"query": "SELECT * FROM [Elasticsearch].[PUBLIC].[EMPLOYEES] LIMIT 5"
})
print("Query result:", query_result)
return catalogs, query_result
# ---------------- Main ---------------- #
async def main():
# Step 1: MCP
catalogs, query_result = await query_mcp()
# Step 2: Salesforce token and agent
access_token, _ = get_salesforce_token()
session_id = start_agent_session(access_token)
# Step 3: Post MCP results to agent with instructions
context_message = (
f"You are a helpful assistant.\n"
f"The following MCP data was retrieved:\n\n"
f"Catalogs: {catalogs}\n"
f"Query result: {query_result}\n\n"
f"Please answer the user's question based on this data: "
f"List all available catalogs and summarize the query results."
)
response = post_message_to_agent(access_token, session_id, context_message)
print("\n Agent response:")
print(response)
if __name__ == "__main__":
asyncio.run(main())
Agentforce lets you orchestrate AI agents and automate CRM workflows. With live connectivity through CData Connect AI and the remote MCP Server, your agents can operate on real-time, contextual data across your stack CRMs, ERPs, databases, finance platforms, and more.
Start your free trial today to see how CData can empower Agentforce with live, secure access to hundreds of external systems.
Learn more about CData Connect AI or sign up for free trial access:
Free Trial