![]() |
VOOZH | about |
Custom jobs use the OpenLineage standard to send job and lineage events to Datadog. With custom jobs, you can:
Use custom jobs when you need to:
Note: To centralize configuration and avoid distributing API keys to every application, you can set up the Datadog Agent as an OpenLineage proxy.
datadoghq.com; replace the hostname with the intake endpoint for your site.START eventUse one of the following options to send OpenLineage events to Datadog:
Note: Datadog requires the jobType Job Facet to process run events.
To also see lineage edges between your job and its datasets, include inputs and outputs in your event. Dataset namespaces must match the format Datadog expects for each platform. See Dataset naming conventions.
Send a raw OpenLineage RunEvent as JSON to Datadog’s intake endpoint.
curl -X POST "https://data-obs-intake.datadoghq.com/api/v1/lineage" \
-H "Authorization: Bearer <DD_API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"eventTime": "2024-01-01T10:00:00Z",
"eventType": "START",
"run": { "runId": "<RUN_UUID>" },
"job": {
"namespace": "<YOUR_NAMESPACE>",
"name": "<YOUR_JOB_NAME>",
"facets": {
"jobType": {
"_producer": "<YOUR_PRODUCER_ID>",
"_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json",
"processingType": "BATCH",
"integration": "custom",
"jobType": "JOB"
}
}
},
"inputs": [
{
"namespace": "postgres://demo-db.example.com:5432",
"name": "orders.public.orders"
}
],
"outputs": [
{
"namespace": "snowflake://myorg-myaccount",
"name": "ANALYTICS.PUBLIC.ORDERS"
}
],
"producer": "<YOUR_PRODUCER_ID>"
}'
Use the OpenLineage Python client with a manually specified HTTP transport.
from datetime import datetime
import uuid
from openlineage.client import OpenLineageClient, OpenLineageClientOptions
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run, InputDataset, OutputDataset
from openlineage.client.facet_v2 import job_type_job
client = OpenLineageClient(
url="https://data-obs-intake.datadoghq.com",
options=OpenLineageClientOptions(api_key="<DD_API_KEY>")
)
event = RunEvent(
eventType=RunState.START,
eventTime=datetime.utcnow().isoformat(),
run=Run(runId=str(uuid.uuid4())),
job=Job(
namespace="<YOUR_NAMESPACE>",
name="<YOUR_JOB_NAME>",
facets={
"jobType": job_type_job.JobTypeJobFacet(
processingType="BATCH",
integration="custom",
jobType="JOB"
)
}
),
inputs=[
InputDataset(
namespace="postgres://demo-db.example.com:5432",
name="orders.public.orders"
)
],
outputs=[
OutputDataset(
namespace="snowflake://myorg-myaccount",
name="ANALYTICS.PUBLIC.ORDERS"
)
],
producer="<YOUR_PRODUCER_ID>"
)
client.emit(event)
In OpenLineage 1.37.0+, use the Datadog transport for automatic configuration and optimized event delivery.
from datetime import datetime
import uuid
from openlineage.client import OpenLineageClient
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run, InputDataset, OutputDataset
from openlineage.client.facet_v2 import job_type_job
from openlineage.client.transport.datadog import DatadogConfig, DatadogTransport
config = DatadogConfig(
apiKey="<DD_API_KEY>",
site="datadoghq.com" # Change if using a different Datadog site
)
client = OpenLineageClient(transport=DatadogTransport(config))
event = RunEvent(
eventType=RunState.START,
eventTime=datetime.utcnow().isoformat(),
run=Run(runId=str(uuid.uuid4())),
job=Job(
namespace="<YOUR_NAMESPACE>",
name="<YOUR_JOB_NAME>",
facets={
"jobType": job_type_job.JobTypeJobFacet(
processingType="BATCH",
integration="custom",
jobType="JOB"
)
}
),
inputs=[
InputDataset(
namespace="postgres://demo-db.example.com:5432",
name="orders.public.orders"
)
],
outputs=[
OutputDataset(
namespace="snowflake://myorg-myaccount",
name="ANALYTICS.PUBLIC.ORDERS"
)
],
producer="<YOUR_PRODUCER_ID>"
)
client.emit(event)
You can also configure the Datadog transport with environment variables instead of DatadogConfig:
export DD_API_KEY=<DD_API_KEY>
export DD_SITE=datadoghq.com
export OPENLINEAGE__TRANSPORT__TYPE=datadog
client = OpenLineageClient.from_environment()
RUNNING event (optional)Note: This step is optional. RUNNING events let you see a job’s status before it finishes. If you only need to capture job completion status, skip to Step 3.
While the job is in progress, send a RUNNING event to track it in Datadog’s Jobs Monitoring. Use the same runId from the START event.
curl -X POST "https://data-obs-intake.datadoghq.com/api/v1/lineage" \
-H "Authorization: Bearer <DD_API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"eventTime": "2024-01-01T10:02:00Z",
"eventType": "RUNNING",
"run": { "runId": "<RUN_UUID>" },
"job": {
"namespace": "<YOUR_NAMESPACE>",
"name": "<YOUR_JOB_NAME>",
"facets": {
"jobType": {
"_producer": "<YOUR_PRODUCER_ID>",
"_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json",
"processingType": "BATCH",
"integration": "custom",
"jobType": "JOB"
}
}
},
"producer": "<YOUR_PRODUCER_ID>"
}'
from datetime import datetime
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run
from openlineage.client.facet_v2 import job_type_job
running_event = RunEvent(
eventType=RunState.RUNNING,
eventTime=datetime.utcnow().isoformat(),
run=Run(runId="<RUN_UUID>"), # same runId as START
job=Job(
namespace="<YOUR_NAMESPACE>",
name="<YOUR_JOB_NAME>",
facets={
"jobType": job_type_job.JobTypeJobFacet(
processingType="BATCH",
integration="custom",
jobType="JOB"
)
}
),
producer="<YOUR_PRODUCER_ID>"
)
client.emit(running_event)
from datetime import datetime
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run
from openlineage.client.facet_v2 import job_type_job
running_event = RunEvent(
eventType=RunState.RUNNING,
eventTime=datetime.utcnow().isoformat(),
run=Run(runId="<RUN_UUID>"), # same runId as START
job=Job(
namespace="<YOUR_NAMESPACE>",
name="<YOUR_JOB_NAME>",
facets={
"jobType": job_type_job.JobTypeJobFacet(
processingType="BATCH",
integration="custom",
jobType="JOB"
)
}
),
producer="<YOUR_PRODUCER_ID>"
)
client.emit(running_event)
COMPLETE or FAIL eventWhen the job finishes, send a COMPLETE or FAIL event using the same runId from the START event.
Success
curl -X POST "https://data-obs-intake.datadoghq.com/api/v1/lineage" \
-H "Authorization: Bearer <DD_API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"eventTime": "2024-01-01T10:05:00Z",
"eventType": "COMPLETE",
"run": { "runId": "<RUN_UUID>" },
"job": {
"namespace": "<YOUR_NAMESPACE>",
"name": "<YOUR_JOB_NAME>",
"facets": {
"jobType": {
"_producer": "<YOUR_PRODUCER_ID>",
"_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json",
"processingType": "BATCH",
"integration": "custom",
"jobType": "JOB"
}
}
},
"producer": "<YOUR_PRODUCER_ID>"
}'
Failure
curl -X POST "https://data-obs-intake.datadoghq.com/api/v1/lineage" \
-H "Authorization: Bearer <DD_API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"eventTime": "2024-01-01T10:05:00Z",
"eventType": "FAIL",
"run": {
"runId": "<RUN_UUID>",
"facets": {
"errorMessage": {
"_producer": "<YOUR_PRODUCER_ID>",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ErrorMessageRunFacet.json",
"message": "Job failed: division by zero",
"programmingLanguage": "Python",
"stackTrace": "Traceback (most recent call last):\n File \"job.py\", line 42, in run\n result = total / count\nZeroDivisionError: division by zero"
}
}
},
"job": {
"namespace": "<YOUR_NAMESPACE>",
"name": "<YOUR_JOB_NAME>",
"facets": {
"jobType": {
"_producer": "<YOUR_PRODUCER_ID>",
"_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json",
"processingType": "BATCH",
"integration": "custom",
"jobType": "JOB"
}
}
},
"producer": "<YOUR_PRODUCER_ID>"
}'
Success
from datetime import datetime
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run
complete_event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.utcnow().isoformat(),
run=Run(runId="<RUN_UUID>"), # same runId as START
job=Job(
namespace="<YOUR_NAMESPACE>",
name="<YOUR_JOB_NAME>",
facets={
"jobType": job_type_job.JobTypeJobFacet(
processingType="BATCH",
integration="custom",
jobType="JOB"
)
}
),
producer="<YOUR_PRODUCER_ID>"
)
client.emit(complete_event)
Failure
from datetime import datetime
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run
from openlineage.client.facet_v2 import error_message_run
fail_event = RunEvent(
eventType=RunState.FAIL,
eventTime=datetime.utcnow().isoformat(),
run=Run(
runId="<RUN_UUID>", # same runId as START
facets={
"errorMessage": error_message_run.ErrorMessageRunFacet(
message="Job failed: division by zero",
programmingLanguage="Python",
stackTrace="Traceback (most recent call last):\n File \"job.py\", line 42, in run\n result = total / count\nZeroDivisionError: division by zero"
)
}
),
job=Job(
namespace="<YOUR_NAMESPACE>",
name="<YOUR_JOB_NAME>",
facets={
"jobType": job_type_job.JobTypeJobFacet(
processingType="BATCH",
integration="custom",
jobType="JOB"
)
}
),
producer="<YOUR_PRODUCER_ID>"
)
client.emit(fail_event)
Success
from datetime import datetime
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run
from openlineage.client.facet_v2 import job_type_job
complete_event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.utcnow().isoformat(),
run=Run(runId="<RUN_UUID>"), # same runId as START
job=Job(
namespace="<YOUR_NAMESPACE>",
name="<YOUR_JOB_NAME>",
facets={
"jobType": job_type_job.JobTypeJobFacet(
processingType="BATCH",
integration="custom",
jobType="JOB"
)
}
),
producer="<YOUR_PRODUCER_ID>"
)
client.emit(complete_event)
Failure
from datetime import datetime
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run
from openlineage.client.facet_v2 import job_type_job, error_message_run
fail_event = RunEvent(
eventType=RunState.FAIL,
eventTime=datetime.utcnow().isoformat(),
run=Run(
runId="<RUN_UUID>", # same runId as START
facets={
"errorMessage": error_message_run.ErrorMessageRunFacet(
message="Job failed: division by zero",
programmingLanguage="Python",
stackTrace="Traceback (most recent call last):\n File \"job.py\", line 42, in run\n result = total / count\nZeroDivisionError: division by zero"
)
}
),
job=Job(
namespace="<YOUR_NAMESPACE>",
name="<YOUR_JOB_NAME>",
facets={
"jobType": job_type_job.JobTypeJobFacet(
processingType="BATCH",
integration="custom",
jobType="JOB"
)
}
),
producer="<YOUR_PRODUCER_ID>"
)
client.emit(fail_event)
After sending your events, check the following:
inputs or outputs in your event, your job appears as a node connected to the dataset nodes.To connect your custom job’s lineage to datasets already tracked by Datadog’s native integrations, include inputs and outputs in your event using the exact namespace and name that Datadog expects for that platform. For example, referencing a Snowflake table in your custom job’s outputs with the correct namespace and name links it to the existing dataset node in the lineage graph.
Datadog resolves datasets into a hierarchy of account, database, schema, and table. If a name has fewer parts than expected (for example, database.table instead of database.schema.table), Datadog falls back to the nearest higher-order node in the lineage graph.
| Platform | Namespace | Name |
|---|---|---|
| BigQuery | bigquery | {project}.{dataset}.{table} |
| Snowflake | snowflake://{org}-{account} | {database}.{schema}.{table} |
| Redshift | redshift://{aws_account_id}:{region}:{cluster} | {database}.{schema}.{table} |
| PostgreSQL | postgres://{host}:{port} | {database}.{schema}.{table} |
| Databricks | databricks://{workspace-url} | {database}.{schema}.{table} |
| Trino | trino://{host}:{port} | {catalog}.{schema}.{table} |
| AWS Glue | arn:aws:glue:{region}:{accountId} | {database}.{table} |
| S3 | s3://{bucket} | {path} |
For platforms not listed here, follow the OpenLineage naming conventions.
The following example shows a job reading from a PostgreSQL table and writing to a Snowflake table:
"inputs": [
{
"namespace": "postgres://db.example.com:5432",
"name": "mydb.public.raw_orders"
}
],
"outputs": [
{
"namespace": "snowflake://myorg-myaccount",
"name": "ANALYTICS.PUBLIC.ORDERS"
}
]
Note: If a dataset namespace is not recognized, Datadog still creates a lineage node for it but does not surface it in the Data Observability product. Use a recognized namespace format to have datasets appear in the catalog and lineage graph.
Facets are structured metadata attached to OpenLineage events. Each facet requires _producer (a URI identifying the system that produced it) and _schemaURL (a URI referencing its JSON schema).
JobTypeJobFacetThe jobType job facet is required. It determines how Datadog classifies and displays the job.
integration valuesUse custom for custom jobs. The values below are used by Datadog’s native integrations. Using them for custom jobs may produce unexpected behavior. In particular, SPARK prevents span generation.
| Value | Platform |
|---|---|
custom | Custom or unsupported platforms |
SPARK | Apache Spark (native integration only; do not use for custom jobs) |
AIRFLOW | Apache Airflow |
DBT | dbt |
BIGQUERY | Google BigQuery |
SNOWFLAKE | Snowflake |
TRINO | Trino |
ICEBERG | Apache Iceberg |
TABLEAU | Tableau |
processingType valuesBATCH or STREAMING.
jobType valuesCommon values include JOB, TASK, DAG, MODEL, COMMAND, and QUERY.
Note: If jobType is set to QUERY, Datadog does not generate lineage nodes for the job.
| Facet | What Datadog does |
|---|---|
parent | Creates parent-child job hierarchy in the lineage graph |
errorMessage | Generates error spans with error.message and error.stack tags |
tags | Adds span tags to the run; _dd.ol_service value maps to the Datadog service name |
sql | Parses and masks the SQL query; generates query events |
Additional helpful documentation, links, and articles:
| |