Spring Kafka Örnek Yml Dosyası#
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_ADDRESS:http://localhost:9092,http://localhost:9093,http://localhost:9094}
admin:
properties:
bootstrap.servers: ${KAFKA_BOOTSTRAP_ADDRESS:http://localhost:9092,http://localhost:9093,http://localhost:9094}
consumer:
group-id: ${KAFKA_CONSUMER_GROUP_ID:hvl-event-logger}
properties:
allow.auto.create.topics: true
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: tr.com.havelsan.nf.kafka.json.deserializer.HvlKafkaJsonDeserializer
spring.json.trusted.packages: '*'
retry.backoff.ms: 1000
enable-auto-commit: true
listener:
concurrency: 3
ack-mode: record
missing-topics-fatal: false
producer:
properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: tr.com.havelsan.nf.kafka.json.serizalizer.HvlKafkaJsonSerializer
max.in.flight.requests.per.connection: 1 #retries > 1 ise bu config 1 olmalı.
retries: 5
security:
protocol: PLAINTEXT
Clustered Kafka docker-compose örnek:
Kafka Docker Compose Örnek
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: hvlzookeeper
# network_mode: hvl_core_network
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
# volumes:
# - ../../../.data/zookeeper/data:/var/lib/zookeeper/data
# - ../../../.data/zookeeper/datalog:/var/lib/zookeeper/datalog
kafka:
image: confluentinc/cp-kafka:latest
container_name: hvlkafka
# network_mode: hvl_core_network
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://hvlkafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-172.17.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: hvlzookeeper:2181
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
# KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
ports:
- 9092:9092
- 19092:19092
# volumes:
# - ../../../.data/kafka/data:/var/lib/kafka/data
# - ../../../.data/kafka/datalog:/var/lib/kafka/datalog
depends_on:
- zookeeper
kafka2:
image: confluentinc/cp-kafka:latest
container_name: hvlkafka2
# network_mode: hvl_core_network
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://hvlkafka2:29092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-172.17.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: hvlzookeeper:2181
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
ports:
- 9093:9093
- 29092:29092
# volumes:
# - ../../../.data/kafka2/data:/var/lib/kafka/data
# - ../../../.data/kafka2/datalog:/var/lib/kafka/datalog
depends_on:
- zookeeper
kafka3:
image: confluentinc/cp-kafka:latest
container_name: hvlkafka3
# network_mode: hvl_core_network
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://hvlkafka3:39092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-172.17.0.1}:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: hvlzookeeper:2181
KAFKA_BROKER_ID: 3
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
ports:
- 9094:9094
- 39092:39092
# volumes:
# - ../../../.data/kafka3/data:/var/lib/kafka/data
# - ../../../.data/kafka3/datalog:/var/lib/kafka/datalog
depends_on:
- zookeeper
KRaft#
2.8.0 versiyonu sonrasında Kafka'yı Zookeeper olmadan kullanmak mümkün hale gelmiştir.
KAFKA_NODE_ID: Her broker bir identifier numarasına ihtiyaç duyar. KAFKA_PROCESS_ROLES: Cluster içindeki node’ların rollerini belirler. KRaft modda controller’lara ihtiyaç vardır. KAFKA_CONTROLLER_QUORUM_VOTERS: Oylama için nodeid@servername:port formatındaki adreslerdir.
Clustered Zookeeperless Kafka docker-compose örnek:
services:
kafka1:
extends:
service: kafka_cluster_image
file: images/kafka-images-docker-compose.yml
container_name: hvlkafka
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://hvlkafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-docker.local}:9092
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@hvlkafka:19093,2@hvlkafka2:29093,3@hvlkafka3:39093'
KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://hvlkafka:19092,CONTROLLER://hvlkafka:19093,LISTENER_DOCKER_EXTERNAL://0.0.0.0:9092
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
ports:
- 9092:9092
- 19092:19092
volumes:
- ./update_run.sh:/tmp/update_run.sh
- /vol1/kafka-data:/var/lib/kafka/data
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
kafka2:
extends:
service: kafka_cluster_image
file: images/kafka-images-docker-compose.yml
container_name: hvlkafka2
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://hvlkafka2:29092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-docker.local}:9093
KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@hvlkafka:19093,2@hvlkafka2:29093,3@hvlkafka3:39093'
KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://hvlkafka2:29092,CONTROLLER://hvlkafka2:29093,LISTENER_DOCKER_EXTERNAL://0.0.0.0:9093
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
ports:
- 9093:9093
- 29092:29092
volumes:
- ./update_run.sh:/tmp/update_run.sh
- /vol2/kafka-data:/var/lib/kafka/data
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
kafka3:
extends:
service: kafka_cluster_image
file: images/kafka-images-docker-compose.yml
container_name: hvlkafka3
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://hvlkafka3:39092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-docker.local}:9094
KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@hvlkafka:19093,2@hvlkafka2:29093,3@hvlkafka3:39093'
KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://hvlkafka3:39092,CONTROLLER://hvlkafka3:39093,LISTENER_DOCKER_EXTERNAL://0.0.0.0:9094
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
ports:
- 9094:9094
- 39092:39092
volumes:
- ./update_run.sh:/tmp/update_run.sh
- /vol3/kafka-data:/var/lib/kafka/data
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
#!/bin/sh
# Docker workaround: Remove check for KAFKA_ZOOKEEPER_CONNECT parameter
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
# Docker workaround: Ignore cub zk-ready
sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure
# KRaft required step: Format the storage directory with a new cluster ID
#echo "kafka-storage format --ignore-formatted -t $(kafka-storage random-uuid) -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure
echo "kafka-storage format --ignore-formatted -t 'CLH9YvVSQCuotPCYI3mTIA' -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure
#./bin/kafka-storage format --config ./etc/kafka/kraft/server.properties --cluster-id m1Ze6AjGRwqarkcxJscgyQ