VOOZH about

URL: https://docs.datadoghq.com/data_observability/jobs_monitoring/openlineage/

⇱ Custom Jobs using OpenLineage


For AI agents: A markdown version of this page is available at https://docs.datadoghq.com/data_observability/jobs_monitoring/openlineage.md. A documentation index is available at /llms.txt.
This product is not supported for your selected Datadog site. ().
Custom jobs using OpenLineage is in Preview.

Overview

Custom jobs use the OpenLineage standard to send job and lineage events to Datadog. With custom jobs, you can:

  • Detect failing and long-running jobs
  • Pinpoint and resolve the root cause of failed and long-running jobs
  • Understand upstream dependencies and downstream data consumers with data lineage

Use custom jobs when you need to:

  • Capture lineage from systems Datadog doesn’t integrate with natively, such as in-house tools or custom ETL scripts
  • Emit lineage events for jobs or orchestrators where a native Datadog integration isn’t available

Note: To centralize configuration and avoid distributing API keys to every application, you can set up the Datadog Agent as an OpenLineage proxy.

Prerequisites

  • A Datadog API key. See API and Application Keys.
  • Your Datadog site URL. The examples on this page use datadoghq.com; replace the hostname with the intake endpoint for your site.

Step 1: Send a START event

Use one of the following options to send OpenLineage events to Datadog:

Note: Datadog requires the jobType Job Facet to process run events.

To also see lineage edges between your job and its datasets, include inputs and outputs in your event. Dataset namespaces must match the format Datadog expects for each platform. See Dataset naming conventions.

Send a raw OpenLineage RunEvent as JSON to Datadog’s intake endpoint.

curl -X POST "https://data-obs-intake.datadoghq.com/api/v1/lineage" \
 -H "Authorization: Bearer <DD_API_KEY>" \
 -H "Content-Type: application/json" \
 -d '{
 "eventTime": "2024-01-01T10:00:00Z",
 "eventType": "START",
 "run": { "runId": "<RUN_UUID>" },
 "job": {
 "namespace": "<YOUR_NAMESPACE>",
 "name": "<YOUR_JOB_NAME>",
 "facets": {
 "jobType": {
 "_producer": "<YOUR_PRODUCER_ID>",
 "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json",
 "processingType": "BATCH",
 "integration": "custom",
 "jobType": "JOB"
 }
 }
 },
 "inputs": [
 {
 "namespace": "postgres://demo-db.example.com:5432",
 "name": "orders.public.orders"
 }
 ],
 "outputs": [
 {
 "namespace": "snowflake://myorg-myaccount",
 "name": "ANALYTICS.PUBLIC.ORDERS"
 }
 ],
 "producer": "<YOUR_PRODUCER_ID>"
 }'

Use the OpenLineage Python client with a manually specified HTTP transport.

from datetime import datetime
import uuid
from openlineage.client import OpenLineageClient, OpenLineageClientOptions
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run, InputDataset, OutputDataset
from openlineage.client.facet_v2 import job_type_job
client = OpenLineageClient(
 url="https://data-obs-intake.datadoghq.com",
 options=OpenLineageClientOptions(api_key="<DD_API_KEY>")
)
event = RunEvent(
 eventType=RunState.START,
 eventTime=datetime.utcnow().isoformat(),
 run=Run(runId=str(uuid.uuid4())),
 job=Job(
 namespace="<YOUR_NAMESPACE>",
 name="<YOUR_JOB_NAME>",
 facets={
 "jobType": job_type_job.JobTypeJobFacet(
 processingType="BATCH",
 integration="custom",
 jobType="JOB"
 )
 }
 ),
 inputs=[
 InputDataset(
 namespace="postgres://demo-db.example.com:5432",
 name="orders.public.orders"
 )
 ],
 outputs=[
 OutputDataset(
 namespace="snowflake://myorg-myaccount",
 name="ANALYTICS.PUBLIC.ORDERS"
 )
 ],
 producer="<YOUR_PRODUCER_ID>"
)
client.emit(event)

In OpenLineage 1.37.0+, use the Datadog transport for automatic configuration and optimized event delivery.

from datetime import datetime
import uuid
from openlineage.client import OpenLineageClient
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run, InputDataset, OutputDataset
from openlineage.client.facet_v2 import job_type_job
from openlineage.client.transport.datadog import DatadogConfig, DatadogTransport
config = DatadogConfig(
 apiKey="<DD_API_KEY>",
 site="datadoghq.com" # Change if using a different Datadog site
)
client = OpenLineageClient(transport=DatadogTransport(config))
event = RunEvent(
 eventType=RunState.START,
 eventTime=datetime.utcnow().isoformat(),
 run=Run(runId=str(uuid.uuid4())),
 job=Job(
 namespace="<YOUR_NAMESPACE>",
 name="<YOUR_JOB_NAME>",
 facets={
 "jobType": job_type_job.JobTypeJobFacet(
 processingType="BATCH",
 integration="custom",
 jobType="JOB"
 )
 }
 ),
 inputs=[
 InputDataset(
 namespace="postgres://demo-db.example.com:5432",
 name="orders.public.orders"
 )
 ],
 outputs=[
 OutputDataset(
 namespace="snowflake://myorg-myaccount",
 name="ANALYTICS.PUBLIC.ORDERS"
 )
 ],
 producer="<YOUR_PRODUCER_ID>"
)
client.emit(event)

You can also configure the Datadog transport with environment variables instead of DatadogConfig:

export DD_API_KEY=<DD_API_KEY>
export DD_SITE=datadoghq.com
export OPENLINEAGE__TRANSPORT__TYPE=datadog
client = OpenLineageClient.from_environment()

Step 2: Send a RUNNING event (optional)

Note: This step is optional. RUNNING events let you see a job’s status before it finishes. If you only need to capture job completion status, skip to Step 3.

While the job is in progress, send a RUNNING event to track it in Datadog’s Jobs Monitoring. Use the same runId from the START event.

curl -X POST "https://data-obs-intake.datadoghq.com/api/v1/lineage" \
 -H "Authorization: Bearer <DD_API_KEY>" \
 -H "Content-Type: application/json" \
 -d '{
 "eventTime": "2024-01-01T10:02:00Z",
 "eventType": "RUNNING",
 "run": { "runId": "<RUN_UUID>" },
 "job": {
 "namespace": "<YOUR_NAMESPACE>",
 "name": "<YOUR_JOB_NAME>",
 "facets": {
 "jobType": {
 "_producer": "<YOUR_PRODUCER_ID>",
 "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json",
 "processingType": "BATCH",
 "integration": "custom",
 "jobType": "JOB"
 }
 }
 },
 "producer": "<YOUR_PRODUCER_ID>"
 }'
from datetime import datetime
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run
from openlineage.client.facet_v2 import job_type_job
running_event = RunEvent(
 eventType=RunState.RUNNING,
 eventTime=datetime.utcnow().isoformat(),
 run=Run(runId="<RUN_UUID>"), # same runId as START
 job=Job(
 namespace="<YOUR_NAMESPACE>",
 name="<YOUR_JOB_NAME>",
 facets={
 "jobType": job_type_job.JobTypeJobFacet(
 processingType="BATCH",
 integration="custom",
 jobType="JOB"
 )
 }
 ),
 producer="<YOUR_PRODUCER_ID>"
)
client.emit(running_event)
from datetime import datetime
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run
from openlineage.client.facet_v2 import job_type_job
running_event = RunEvent(
 eventType=RunState.RUNNING,
 eventTime=datetime.utcnow().isoformat(),
 run=Run(runId="<RUN_UUID>"), # same runId as START
 job=Job(
 namespace="<YOUR_NAMESPACE>",
 name="<YOUR_JOB_NAME>",
 facets={
 "jobType": job_type_job.JobTypeJobFacet(
 processingType="BATCH",
 integration="custom",
 jobType="JOB"
 )
 }
 ),
 producer="<YOUR_PRODUCER_ID>"
)
client.emit(running_event)

Step 3: Send a COMPLETE or FAIL event

When the job finishes, send a COMPLETE or FAIL event using the same runId from the START event.

Success

curl -X POST "https://data-obs-intake.datadoghq.com/api/v1/lineage" \
 -H "Authorization: Bearer <DD_API_KEY>" \
 -H "Content-Type: application/json" \
 -d '{
 "eventTime": "2024-01-01T10:05:00Z",
 "eventType": "COMPLETE",
 "run": { "runId": "<RUN_UUID>" },
 "job": {
 "namespace": "<YOUR_NAMESPACE>",
 "name": "<YOUR_JOB_NAME>",
 "facets": {
 "jobType": {
 "_producer": "<YOUR_PRODUCER_ID>",
 "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json",
 "processingType": "BATCH",
 "integration": "custom",
 "jobType": "JOB"
 }
 }
 },
 "producer": "<YOUR_PRODUCER_ID>"
 }'

Failure

curl -X POST "https://data-obs-intake.datadoghq.com/api/v1/lineage" \
 -H "Authorization: Bearer <DD_API_KEY>" \
 -H "Content-Type: application/json" \
 -d '{
 "eventTime": "2024-01-01T10:05:00Z",
 "eventType": "FAIL",
 "run": {
 "runId": "<RUN_UUID>",
 "facets": {
 "errorMessage": {
 "_producer": "<YOUR_PRODUCER_ID>",
 "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ErrorMessageRunFacet.json",
 "message": "Job failed: division by zero",
 "programmingLanguage": "Python",
 "stackTrace": "Traceback (most recent call last):\n File \"job.py\", line 42, in run\n result = total / count\nZeroDivisionError: division by zero"
 }
 }
 },
 "job": {
 "namespace": "<YOUR_NAMESPACE>",
 "name": "<YOUR_JOB_NAME>",
 "facets": {
 "jobType": {
 "_producer": "<YOUR_PRODUCER_ID>",
 "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json",
 "processingType": "BATCH",
 "integration": "custom",
 "jobType": "JOB"
 }
 }
 },
 "producer": "<YOUR_PRODUCER_ID>"
 }'

Success

from datetime import datetime
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run
complete_event = RunEvent(
 eventType=RunState.COMPLETE,
 eventTime=datetime.utcnow().isoformat(),
 run=Run(runId="<RUN_UUID>"), # same runId as START
 job=Job(
 namespace="<YOUR_NAMESPACE>",
 name="<YOUR_JOB_NAME>",
 facets={
 "jobType": job_type_job.JobTypeJobFacet(
 processingType="BATCH",
 integration="custom",
 jobType="JOB"
 )
 }
 ),
 producer="<YOUR_PRODUCER_ID>"
)
client.emit(complete_event)

Failure

from datetime import datetime
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run
from openlineage.client.facet_v2 import error_message_run
fail_event = RunEvent(
 eventType=RunState.FAIL,
 eventTime=datetime.utcnow().isoformat(),
 run=Run(
 runId="<RUN_UUID>", # same runId as START
 facets={
 "errorMessage": error_message_run.ErrorMessageRunFacet(
 message="Job failed: division by zero",
 programmingLanguage="Python",
 stackTrace="Traceback (most recent call last):\n File \"job.py\", line 42, in run\n result = total / count\nZeroDivisionError: division by zero"
 )
 }
 ),
 job=Job(
 namespace="<YOUR_NAMESPACE>",
 name="<YOUR_JOB_NAME>",
 facets={
 "jobType": job_type_job.JobTypeJobFacet(
 processingType="BATCH",
 integration="custom",
 jobType="JOB"
 )
 }
 ),
 producer="<YOUR_PRODUCER_ID>"
)
client.emit(fail_event)

Success

from datetime import datetime
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run
from openlineage.client.facet_v2 import job_type_job
complete_event = RunEvent(
 eventType=RunState.COMPLETE,
 eventTime=datetime.utcnow().isoformat(),
 run=Run(runId="<RUN_UUID>"), # same runId as START
 job=Job(
 namespace="<YOUR_NAMESPACE>",
 name="<YOUR_JOB_NAME>",
 facets={
 "jobType": job_type_job.JobTypeJobFacet(
 processingType="BATCH",
 integration="custom",
 jobType="JOB"
 )
 }
 ),
 producer="<YOUR_PRODUCER_ID>"
)
client.emit(complete_event)

Failure

from datetime import datetime
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run
from openlineage.client.facet_v2 import job_type_job, error_message_run
fail_event = RunEvent(
 eventType=RunState.FAIL,
 eventTime=datetime.utcnow().isoformat(),
 run=Run(
 runId="<RUN_UUID>", # same runId as START
 facets={
 "errorMessage": error_message_run.ErrorMessageRunFacet(
 message="Job failed: division by zero",
 programmingLanguage="Python",
 stackTrace="Traceback (most recent call last):\n File \"job.py\", line 42, in run\n result = total / count\nZeroDivisionError: division by zero"
 )
 }
 ),
 job=Job(
 namespace="<YOUR_NAMESPACE>",
 name="<YOUR_JOB_NAME>",
 facets={
 "jobType": job_type_job.JobTypeJobFacet(
 processingType="BATCH",
 integration="custom",
 jobType="JOB"
 )
 }
 ),
 producer="<YOUR_PRODUCER_ID>"
)
client.emit(fail_event)

Step 4: Verify in Datadog

After sending your events, check the following:

  • Jobs Monitoring: Your job run appears with start time, duration, and status.
  • Lineage graph: If you included inputs or outputs in your event, your job appears as a node connected to the dataset nodes.

Dataset naming conventions

To connect your custom job’s lineage to datasets already tracked by Datadog’s native integrations, include inputs and outputs in your event using the exact namespace and name that Datadog expects for that platform. For example, referencing a Snowflake table in your custom job’s outputs with the correct namespace and name links it to the existing dataset node in the lineage graph.

Datadog resolves datasets into a hierarchy of account, database, schema, and table. If a name has fewer parts than expected (for example, database.table instead of database.schema.table), Datadog falls back to the nearest higher-order node in the lineage graph.

PlatformNamespaceName
BigQuerybigquery{project}.{dataset}.{table}
Snowflakesnowflake://{org}-{account}{database}.{schema}.{table}
Redshiftredshift://{aws_account_id}:{region}:{cluster}{database}.{schema}.{table}
PostgreSQLpostgres://{host}:{port}{database}.{schema}.{table}
Databricksdatabricks://{workspace-url}{database}.{schema}.{table}
Trinotrino://{host}:{port}{catalog}.{schema}.{table}
AWS Gluearn:aws:glue:{region}:{accountId}{database}.{table}
S3s3://{bucket}{path}

For platforms not listed here, follow the OpenLineage naming conventions.

The following example shows a job reading from a PostgreSQL table and writing to a Snowflake table:

"inputs": [
 {
 "namespace": "postgres://db.example.com:5432",
 "name": "mydb.public.raw_orders"
 }
],
"outputs": [
 {
 "namespace": "snowflake://myorg-myaccount",
 "name": "ANALYTICS.PUBLIC.ORDERS"
 }
]

Note: If a dataset namespace is not recognized, Datadog still creates a lineage node for it but does not surface it in the Data Observability product. Use a recognized namespace format to have datasets appear in the catalog and lineage graph.

Supported facets

Facets are structured metadata attached to OpenLineage events. Each facet requires _producer (a URI identifying the system that produced it) and _schemaURL (a URI referencing its JSON schema).

JobTypeJobFacet

The jobType job facet is required. It determines how Datadog classifies and displays the job.

integration values

Use custom for custom jobs. The values below are used by Datadog’s native integrations. Using them for custom jobs may produce unexpected behavior. In particular, SPARK prevents span generation.

ValuePlatform
customCustom or unsupported platforms
SPARKApache Spark (native integration only; do not use for custom jobs)
AIRFLOWApache Airflow
DBTdbt
BIGQUERYGoogle BigQuery
SNOWFLAKESnowflake
TRINOTrino
ICEBERGApache Iceberg
TABLEAUTableau

processingType values

BATCH or STREAMING.

jobType values

Common values include JOB, TASK, DAG, MODEL, COMMAND, and QUERY.

Note: If jobType is set to QUERY, Datadog does not generate lineage nodes for the job.

Other supported facets

FacetWhat Datadog does
parentCreates parent-child job hierarchy in the lineage graph
errorMessageGenerates error spans with error.message and error.stack tags
tagsAdds span tags to the run; _dd.ol_service value maps to the Datadog service name
sqlParses and masks the SQL query; generates query events

Further reading