👁 Image
README
¶
otelkafka
Open Telemetry instrumentation for confluent-kafka-go.
Installation
go get -u github.com/jurabek/otelkafka
Usage
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/otelkafka/otelkafka"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
func main() {
// Create a new Kafka producer
producer, err := otelkafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
})
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
// Produce a message
topic := "my-topic"
message := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte("Hello, World!"),
}
// make sure the context is propagated before produce the message
otel.GetTextMapPropagator().Inject(ctx, otelkafka.NewMessageCarrier(message))
if err := producer.Produce(message, nil); err != nil {
log.Fatalf("Failed to produce message: %v", err)
}
// Wait for the message to be delivered
producer.Flush(15 * 1000)
// Create a new Kafka consumer
consumer, err := otelkafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"auto.offset.reset": "earliest",
})
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
defer consumer.Close()
// Subscribe to the topic
if err := consumer.Subscribe(topic, nil); err != nil {
log.Fatalf("Failed to subscribe to topic: %v", err)
}
// Consume messages
for {
message, err := consumer.ReadMessage(-1)
if err != nil {
log.Fatalf("Failed to read message: %v", err)
}
fmt.Printf("Message: %s\n", message.Value)
}
}
Metrics list
Table below lists the metrics that are collected by the instrumentation, and can exported using Otel Metric Exporter
| Name | Description | Type | Attributes |
|---|---|---|---|
messaging.client.sent.messages |
The total number of messages sent by the producer. | Counter | more attributes |
messaging.client.consumed.messages |
The total number of messages received by the consumer. | Counter | more attributes |
messaging.client.operation.duration |
The duration of the messaging operation initiated by a producer or consumer client. filtered by messaging.system.operation.name |
Histogram | more attributes |
👁 Image
Documentation
¶
Overview ¶
Package otelkafka instruments the github.com/confluentinc/confluent-kafka-go/v2 package and supports function based Producer and Consumer
The consumer's span will be created as a child of the producer's span.
Context propagation only works on Kafka versions higher than 0.11.0.0 which supports record headers. (https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html)
Based on: https://github.com/DataDog/dd-trace-go/tree/main/contrib/confluentinc/confluent-kafka-go/kafka.v2
Index ¶
- func Version() string
- type Consumer
- type MessageCarrier
- type Option
- func WithCustomAttributeInjector(fn func(msg *kafka.Message) []attribute.KeyValue) Option
- func WithMessageCarrier(carrier func(*kafka.Message) propagation.TextMapCarrier) Option
- func WithMeterProvider(meterProvider metric.MeterProvider) Option
- func WithPropagators(propagators propagation.TextMapPropagator) Option
- func WithTracerProvider(provider trace.TracerProvider) Option
- type Producer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Consumer ¶
func WrapConsumer ¶
WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.
type MessageCarrier ¶
type MessageCarrier struct {
// contains filtered or unexported fields
}
func NewMessageCarrier ¶
func NewMessageCarrier(msg *kafka.Message) *MessageCarrier
func (MessageCarrier) Get ¶
func (c MessageCarrier) Get(key string) string
func (MessageCarrier) Keys ¶
func (c MessageCarrier) Keys() []string
func (MessageCarrier) Set ¶
func (c MessageCarrier) Set(key string, value string)
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option interface used for setting optional config properties.
func WithMessageCarrier ¶ added in v1.0.1
func WithMessageCarrier(carrier func(*kafka.Message) propagation.TextMapCarrier) Option
func WithMeterProvider ¶
func WithMeterProvider(meterProvider metric.MeterProvider) Option
func WithPropagators ¶
func WithPropagators(propagators propagation.TextMapPropagator) Option
WithPropagators specifies propagators to use for extracting information from the HTTP requests. If none are specified, global ones will be used.
func WithTracerProvider ¶
func WithTracerProvider(provider trace.TracerProvider) Option
WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, the global provider is used.
