VOOZH about

URL: https://dzone.com/articles/ways-to-stop-amp-resume-your-kafka-producerconsume

⇱ Stop and Resume in Kafka


Related

  1. DZone
  2. Software Design and Architecture
  3. Integration
  4. Ways To Stop and Resume Your Kafka Producer/Consumer at Run-Time

Ways To Stop and Resume Your Kafka Producer/Consumer at Run-Time

In this blog, the reader will learn how to stop and resume a Kafka client or producer at runtime using two distinct methods.

By May. 09, 23 · Tutorial
Likes
Comment
Save
6.3K Views

Join the DZone community and get the full member experience.

Join For Free

Imagine you are running a Kafka cluster, and suddenly you need to perform maintenance on one of your Kafka clients or producers. What do you do? In this blog, we will explore how to stop and resume a Kafka client or producer at runtime using the Java client API.

Kafka has become an indispensable building block for streaming data pipelines due to its high throughput, fault tolerance, and scalability, which make it an excellent option for processing large volumes of data in real time. Additionally, it offers the significant advantage of supporting several programming languages, including Java, Python, Kotlin, Rust, and others.

In this blog, we will discuss how to stop and resume a Kafka client or producer at runtime. We will explore two distinct methods: one involves utilizing REST service endpoints, while the other involves using Spring Actuator endpoints.

Tech Stack

  • Spring Boot
  • Spring Integration
  • Kafka Cluster (running in Docker)
  • Java 17 ( Or 8)

Demo Scene

Let’s begin by creating a Kafka producer. Here I am using Spring Integration to create a Kafka Producer. As I have mentioned in my previous blogs, Spring integration is the most powerful module that Spring Introducer, which works with Message Driven Approach backed by Enterprise Integration Patterns.

ProducerIntegrationConfig.java

Java
@Configuration
public class KafkaProducerConfig {
 
 private KafkaProperties kafkaProperties;
 private String kafkaTopic;
 
 public KafkaProducerConfig(KafkaProperties kafkaProperties, @Value("${app.topic-name}") String kafkaTopic){
 this.kafkaProperties = kafkaProperties;
 this.kafkaTopic = kafkaTopic;
 }
 public IntegrationFlow producerIntegrationFlow(){
 
 return IntegrationFlow.from(() -> new GenericMessage<>(""),
 c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(5)))
 .id("kafkaProducerBean"))
 .transform(message -> new Date().toString())
 .log()
 .channel("to-kafka-producer-template")
 .get(); 
 }
 
 public IntegrationFlow kafkaProducerTemplate(KafkaTemplate<?,?> kafkaTemplate){
 kafkaTemplate.setDefaultTopic(this.kafkaTopic);
 return IntegrationFlow.from("to-kafka-producer-template")
 .handle(Kafka.outboundChannelAdapter(kafkaTemplate))
 .get();
 }
 
}


application.yml

YAML
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 3

app:
  topic-name: demo-topic


Now Let's Create a Simple Streams Processor (By Configuring the Binders). I Am Using Spring Cloud Streams To Create a Streaming Processor.

StreamConsumer.java

Java
@Configuration
@Slf4j
public class StreamConsumer {
 
 @Bean
 public Consumer<KStream<?,String>> myConsumer(){
 return input -> 
 input.foreach((key, value) -> {
 log.debug("Key: {} Value: {}", key, value);
 });
 }
}


application.yml

YAML
spring:
  application:
    name: processor-demo

  cloud:
   stream:
     bindings:
       myConsumer-in-0:
         destination: demo-topic
         binder: kstream-consumer
         group: processor-group
     kafka:
      streams:
        binder:
          brokers: localhost:9092

     binders:
      kstream:
       type: kstream
       environment:
         spring.cloud.stream.kafka.streams.binder.brokers: localhost:9092


With Rest Service Endpoints…

Spring Integration allows us to control and monitor the messaging endpoints that we created. This can be done in two steps.

Step 1: Create a Control Bus Message Channel, Define a Flow, and Finally, a Gateway Function

ProducerIntegrationConfig.java. ~Modify Producer’s Integration Config.

Java
 //Add This channel to Integration Config
 @Bean
 public MessageChannel controlChannel() {
 return MessageChannels.direct().get();
    }


Step 2: Call the Above Function in a Rest Controller

ProducerDemoController.java

Java
@RestController
public class ProducerDemoController {
 
 private MessageChannel controlChannel;
 
 public ProducerDemoController(@Qualifier("controlChannel") MessageChannel controlChannel){
 this.controlChannel = controlChannel;
 }
 
 @GetMapping("/stopProducer")
 public void stopProducer(){
 controlChannel.send(new GenericMessage<>("@kafkaProducerBean.stop()"));
 }
 
 @GetMapping("/startProducer")
 public void startProducer(){
 controlChannel.send(new GenericMessage<>("@kafkaProducerBean.start()"));
 }
}


Start and Resume the Producer Through Rest Endpoint

http://localhost:8080/startProducer
http://localhost:8080/stopProducer

With Spring Boot’s Actuator Endpoints…

Add the below block to expose the bindings through actuator endpoints.

application.yml

YAML
management:
 endpoints:
 web:
 exposure:
 include:
          -bindings


Stop and Resume the Consumer Function Through Actuator Endpoint

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST localhost:8282/actuator/bindings/myConsumer-in-0
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST localhost:8282/actuator/bindings/myConsumer-in-0

Summary

In this blog, we discussed how to stop and resume a Kafka client or producer at runtime using the REST API and Actuator. The ability to stop and resume Kafka clients or producers is essential for maintaining the health of a Kafka cluster and ensuring the smooth operation of real-time data pipelines.

The source code can be found on my GitHub. Also, you can reach out to me on LinkedIn for any questions or suggestions.

That’s all for now. Happy Learning!

API REST Spring Integration kafka

Opinions expressed by DZone contributors are their own.

Related

  • API Design First: AsyncAPI in .Net
  • Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka
  • Evolving Spring Boot APIs to an Event-Driven Mesh
  • From APIs to Event-Driven Systems: Modern Java Backend Design

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

Let's be friends: