VOOZH about

URL: https://dzone.com/articles/migrate-data-across-kafka-cluster-using-mirrormake

⇱ Migrate Data Across Kafka Cluster Using mirrormaker2 in Strimzi


Related

  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. Migrate Data Across Kafka Cluster Using mirrormaker2 in Strimzi

Migrate Data Across Kafka Cluster Using mirrormaker2 in Strimzi

In this article, we will discuss a use case where data from one Kafka cluster has to be migrated to another Kafka Cluster. We will be using mirrormaker 2.

By Updated May. 18, 21 · Tutorial
Likes
Comment
Save
9.6K Views

Join the DZone community and get the full member experience.

Join For Free

In this article, we will discuss a use case where data from one kafka cluster has to be migrated to another Kafka Cluster. Here the target is strimzi and the source is a standalone Kafka cluster.  Target means where data has to be copied and the source is from where we want to copy/migrate data. I have an article on how to use mirrormaker with apache kafka clusters about mirrormaker version 1. This article is about mirrormaker 2, which has more features than mirrormaker1.

At the time of writing this article, the latest version of strimzi is 0.22.1 and can be downloaded from here.

I have installed strimzi on minikube version: v1.19.0. Standalone Kafka is installed on a different laptop with RHEL 8 OS. Also, I am using a simple Kafka producer to produce messages to the source Kafka cluster.

So let's begin the proof of concept.

1. Source Kafka(Standalone) configuration: To connect from external clients and mirror-maker we have to set advertised.listeners in [KAFKA_HOME]/config/server.properties. Start zookeeper and kafka node. Here I have only one zookeeper and kafka node.

Shell




xxxxxxxxxx
1
10


1
#[KAFKA_HOME]/config/server.properties
2

 
3
advertised.listeners=PLAINTEXT://192.168.1.25:9092
4
listeners=PLAINTEXT://0.0.0.0:9092
5

 
6
# create a topic myTestTopic
7
#[KAFKA_HOME]/bin
8

 
9
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic myTestTopic
10

 



2. Produce messages to Source kafka: I used ProducerKafka, available in my github repo. You can also use [KAFKA_HOME]/bin/kafka-console-producer.sh as well to produce a message to Source.

3. Strimzi setup: Follow upstream strimzi documentation for detailed setup in minikube. I have summarized the steps below; I followed all these commands for setup.

Shell




xxxxxxxxxx
1
63


1
$ minikube start --cpus 3 --memory 10000 -p strimzi0221
2

 
3
$ kubectl create ns kafka
4

 
5
$ kubectl config set-context --current --namespace=kafka
6

 
7
$ sed -i 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml
8

 
9
$ kubectl create -f install/cluster-operator
10

 
11
$ $ kubectl get deployments
12
NAME                       READY   UP-TO-DATE   AVAILABLE   AGE
13
strimzi-cluster-operator   1/1     1            1           6m36s
14

 
15
# strimzi-0.22.1/examples/kafka/kafka-ephemeral-single-replica.yaml
16
apiVersion: kafka.strimzi.io/v1beta2
17
kind: Kafka
18
metadata:
19
 name: my-cluster
20
spec:
21
 kafka:
22
   version: 2.7.0
23
   replicas: 1
24
   listeners:
25
      - name: plain
26
       port: 9092
27
       type: internal
28
       tls: false
29
      - name: tls
30
       port: 9093
31
       type: internal
32
       tls: true
33
   config:
34
     offsets.topic.replication.factor: 1
35
     transaction.state.log.replication.factor: 1
36
     transaction.state.log.min.isr: 1
37
     log.message.format.version: "2.7"
38
     inter.broker.protocol.version: "2.7"
39
   storage:
40
     type: ephemeral
41
   livenessProbe:
42
     initialDelaySeconds: 35
43
     timeoutSeconds: 35
44
   readinessProbe:
45
     initialDelaySeconds: 35
46
     timeoutSeconds: 35
47
 zookeeper:
48
   replicas: 1
49
   livenessProbe:
50
     initialDelaySeconds: 35
51
     timeoutSeconds: 35
52
   readinessProbe:
53
     initialDelaySeconds: 35
54
     timeoutSeconds: 35
55
   storage:
56
     type: ephemeral
57
 entityOperator:
58
   topicOperator: {}
59
   userOperator: {}
60

 
61

 
62
$ kubectl create -f examples/kafka/kafka-ephemeral-single.yaml
63

 
64
# setup is ready
65
$ kubectl get pods 
66
NAME                                         READY   STATUS   RESTARTS   AGE
67
my-cluster-entity-operator-98c779b75-j84mt   3/3     Running   0         99s
68
my-cluster-kafka-0                           1/1     Running   0         2m37s
69
my-cluster-zookeeper-0                       1/1     Running   0         5m28s
70
strimzi-cluster-operator-957688b5c-dzbl7     1/1     Running   0         8m48s
71

 



4. MirrorMaker 2 configuration:  Within the distribution itself, strimzi-0.22.1/examples/mirror-makerwe can find mirrormaker example yaml files. We will create another copy of kafka-mirror-maker-2-custom-replication-policy.yaml and modify that copied file.

YAML




x


1
apiVersion: kafka.strimzi.io/v1beta2
2
kind: KafkaMirrorMaker2
3
metadata:
4
 name: my-mirror-maker-2
5
spec:
6
 version: 2.7.0
7
 replicas: 1
8
 connectCluster: "my-target-cluster"
9
 clusters:
10
 - alias: "my-source-cluster"
11
   bootstrapServers: 192.168.1.25:9092
12
 - alias: "my-target-cluster"
13
   bootstrapServers: my-cluster-kafka-bootstrap:9092
14
   config:
15
     config.storage.replication.factor: 1
16
     offset.storage.replication.factor: 1
17
     status.storage.replication.factor: 1
18
 mirrors:
19
 - sourceCluster: "my-source-cluster"
20
   targetCluster: "my-target-cluster"
21
   sourceConnector:
22
     config:
23
       replication.factor: 1
24
       offset-syncs.topic.replication.factor: 1
25
       sync.topic.acls.enabled: "false"
26
       replication.policy.separator: ""
27
       replication.policy.class: "io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy"
28
   heartbeatConnector:
29
     config:
30
       heartbeats.topic.replication.factor: 1
31
   checkpointConnector:
32
     config:
33
       checkpoints.topic.replication.factor: 1
34
       replication.policy.separator: ""
35
       replication.policy.class: "io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy"
36
   topicsPattern: ".*"
37
   groupsPattern: ".*"
38
 logging: 
39
   type: inline
40
   loggers:
41
     connect.root.logger.level: "INFO"
42
 readinessProbe: 
43
   initialDelaySeconds: 25
44
   timeoutSeconds: 25
45
 livenessProbe:
46
   initialDelaySeconds: 25
47
   timeoutSeconds: 25
48

 



The important configurations are:

my-source-cluster: Here we have to provide the bootstrap-server url of Source Kafka against property bootstrapServers.

my-target-cluster Here we have to provide the bootstrap-server URL of targer Kafka against property bootstrapServersTarget Kafka node is installed using strimzi in minikube. Thus I set my-cluster-kafka-bootstrap listening on 9092.

Shell




xxxxxxxxxx
1


1
$ kubectl get svc
2
NAME                                 TYPE       CLUSTER-IP       EXTERNAL-IP   PORT(S)                     AGE
3
my-cluster-kafka-bootstrap           ClusterIP   10.99.60.80     <none>        9091/TCP,9092/TCP,9093/TCP   18h
4
my-cluster-kafka-brokers             ClusterIP   None             <none>        9091/TCP,9092/TCP,9093/TCP   18h
5
my-cluster-zookeeper-client         ClusterIP   10.103.165.80   <none>        2181/TCP                     19h
6
my-cluster-zookeeper-nodes           ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   19h
7
my-mirror-maker-2-mirrormaker2-api   ClusterIP   10.109.246.111   <none>        8083/TCP                     16h
8

 



replication.policy.class: We set it with io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy because we want the target kafka node to have the same topic as the source kafka node.

We have also set the readiness probe and liveliness probe with configurable seconds so that we can tune timeouts if any.

5. Apply MirrorMaker2 configuration:  Apply the mirrormaker2 yaml file. Then check topics in the target kafka node.

Shell




x


1
$ cd examples/mirror-maker/
2

 
3
$ kubectl apply -f kafka-mirror-maker-2-custom-replication-policy-modify.yaml
4

 
5
$ kubectl get KafkaMirrorMaker2
6
NAME               DESIRED REPLICAS   READY
7
my-mirror-maker-2   1                1
8

 
9
 kubectl get pods 
10
NAME                                             READY   STATUS   RESTARTS   AGE
11
my-cluster-entity-operator-98c779b75-j84mt       3/3     Running   0         3m50s
12
my-cluster-kafka-0                               1/1     Running   0         4m48s
13
my-cluster-zookeeper-0                           1/1     Running   0         7m39s
14
my-mirror-maker-2-mirrormaker2-d5465d47d-k2dfz   1/1     Running   0         78s
15
strimzi-cluster-operator-957688b5c-dzbl7         1/1     Running   0         10m
16

 
17

 
18
$ kubectl get kt
19
NAME                                                                                               CLUSTER     PARTITIONS   REPLICATION FACTOR   READY
20
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a                                       my-cluster   50           1                   True
21
heartbeats                                                                                         my-cluster   1            1                   True
22
mirrormaker2-cluster-configs                                                                       my-cluster   1            1                   True
23
mirrormaker2-cluster-offsets                                                                       my-cluster   25           1                   True
24
mirrormaker2-cluster-status                                                                       my-cluster   5            1                   True
25
my-source-cluster.checkpoints.internal                                                             my-cluster   1            1                   True
26
mytesttopic---ad8c4a4e03129cbd9ddc2900dfe8a763fb122ce7                                             my-cluster   3            1                   True
27
mytopic---c55e57fe2546a33f9e603caf57165db4072e827e                                                 my-cluster   1            1                   True
28
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55                                     my-cluster   1            1                   True
29
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b   my-cluster   1            1                   True
30

 



6. Consume messages from Target Kafka node i.e. Strimzi on Kubernetes.

Shell




xxxxxxxxxx
1
12


1
$ kubectl exec my-cluster-kafka-0 -it -- /opt/kafka/bin/kafka-topics.sh --describe --topic myTestTopic --bootstrap-server 0.0.0.0:9092
2
Topic: myTestTopicPartitionCount: 3ReplicationFactor: 1Configs: message.format.version=2.7-IV2
3
Topic: myTestTopicPartition: 0Leader: 0Replicas: 0Isr: 0
4
Topic: myTestTopicPartition: 1Leader: 0Replicas: 0Isr: 0
5
Topic: myTestTopicPartition: 2Leader: 0Replicas: 0Isr: 0
6

 
7

 
8
$ kubectl exec my-cluster-kafka-0 -it -- /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 0.0.0.0:9092 --topic myTestTopic --group Group1 --from-beginning
9
message: 0
10
message: 1
11
message: 2
12
message: 3
13
message: 4
14
message: 5
15
message: 6
16
message: 7
17
message: 8
18
message: 9
19

 



That's it, guys. I think you will find this article interesting and helpful!

kafka Kubernetes cluster Data (computing)

Opinions expressed by DZone contributors are their own.

Related

  • Container Attached Storage (CAS) vs. Software-Defined Storage - Which One to Choose?
  • Next-Gen Data Pipes With Spark, Kafka and k8s
  • Building Hybrid Multi-Cloud Event Mesh With Apache Camel and Kubernetes
  • Backup and Disaster Recovery in the Age of GitOps and CI/CD Deployments

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

Let's be friends: