VOOZH about

URL: https://www.javacodegeeks.com/2016/05/apache-storm-configure-kafkabolt-flux.html

⇱ Apache Storm: How to configure KafkaBolt with Flux - Java Code Geeks


Flux in a mini framework that can help us define and deploy a Storm topology.

Flux has various wrappers that help you define the required stream(s) and initialize your Bolts and Spouts (using constructor with or without arguments and call custom configuration methods automatically via reflection).

What you only need to use Flux is to add it as dependency in your “pom.xml”, configure it via a single YAML file (check flux examples) and then use it as main class to deploy your topology in a Storm cluster (or as local test).

In order to initialize a KafkaBolt the following steps are needed:

  1. Define a “topicSelector” via “withTopicSelector” method
  2. Define a “kafkaMapper” via “withTupleToKafkaMapper” method
  3. Define a “kafkaProducerProps” via “withProducerProperties” method
  4. Initialize “org.apache.storm.kafka.bolt.KafkaBolt” with above configuration
  5. Include above KafkaBolt as part of a stream

Minimal Flux configuration example for KafkaBolt:

components:
 - id: "stringScheme"
 className: "org.apache.storm.kafka.StringScheme"

 - id: "stringMultiScheme"
 className: "org.apache.storm.spout.SchemeAsMultiScheme"
 constructorArgs:
 - ref: "stringScheme"

 - id: "zkHosts"
 className: "org.apache.storm.kafka.ZkHosts"
 constructorArgs:
 - "localhost:2181"

 - id: "topicSelector"
 className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector"
 constructorArgs:
 - "myTopicName"

 - id: "kafkaMapper"
 className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"

 - id: "kafkaProducerProps"
 className: "java.util.Properties"
 configMethods:
 - name: "put"
 args:
 - "bootstrap.servers"
 - "localhost:9092"
 - name: "put"
 args:
 - "acks"
 - "1"
 - name: "put"
 args:
 - "key.serializer"
 - "org.apache.kafka.common.serialization.StringSerializer"
 - name: "put"
 args:
 - "value.serializer"
 - "org.apache.kafka.common.serialization.StringSerializer" 

bolts: 
 - id: "bolt-kafka"
 className: "org.apache.storm.kafka.bolt.KafkaBolt"
 parallelism: 1
 configMethods:
 - name: "withProducerProperties"
 args: [ref: "kafkaProducerProps"]
 - name: "withTopicSelector"
 args: [ref: "topicSelector"]
 - name: "withTupleToKafkaMapper"
 args: [ref: "kafkaMapper"]

streams:
 - name: "spout --> kafkaBolt"
 from: "spout-1"
 to: "bolt-kafka"
 grouping:
 type: LOCAL_OR_SHUFFLE

For a full working configuration example check this, which can be used like this.

Example command to deploy your topology on Storm:

storm jar target/sentiment-analysis-storm-0.0.1-SNAPSHOT.jar org.apache.storm.flux.Flux --remote --c nimbus.host=192.168.1.200 src/test/resources/flux/topology_kafka.yaml

Flux configuration for KafkaSpout is already described as official flux example. Flux is a really helpful framework that eliminates required custom code to define and initialize a topology

Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy

Thank you!

We will contact you soon.

👁 Photo of Adrianos Dadis
Adrianos Dadis
May 6th, 2016Last Updated: May 4th, 2016
0 164 1 minute read

Adrianos Dadis

Adrianos is working as senior software engineer in telcos business domain. Particularly interested in enterprise integration, multi-tier architecture and middleware services. He mainly works with Weblogic, JBoss, Java EE, Spring, Drools, Oracle SOA Suite and various ESBs.
Subscribe

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Back to top button
Close
wpDiscuz