1.8. Message Queue#
Uygulamalarda message queue yapısının kullanmasını sağlayan altyapı bileşenidir. Kafka desteği sağlanmaktadır.
Kafka#
Neden bu altyapıyı tercih etmeliyim:
- Spring kafka ile tamamen entegre çalışmaktadır. Herhangi bir kısıtlama yapılmamıştır.
- Transaction yönetimi için iyileştirmeler ve kullanım kolaylığı sağlanmıştır.
- Kafka konfigüratif health check mekanizması barındırmaktadır.
- Kafkayı konfigüratif olarak devreye alıp, devreden çıkarma mekanizması sağlanmaktadır.
- Retry edilmeyecek exceptionların konfigüratif olarak ayarlanabilmesine olanak sağlar.
- Konfigüratif olarak tanımlanan exception'lara göre Dead Letter Queue'lar oluşturulmasına olanak sağlar.
- Bağlantı kopması ve sunucu çökmeleri sonucu veri kaybı durumlarının sıfıra indirildiği çözüm yöntemi sağlar.
Kafka ürünü ve çalışma yapısı ile ilgili detaylı bilgiye Kafka sayfasından ulaşılabilir.
hvl-infra altında bulunan 'application-kafka.yml' dosyasıyla konfigure edilebilmektedir.
YML Konfigürasyonları#
hvl:
core:
kafka:
support:
enabled: ${KAFKA_SUPPORT_ENABLED:false}
consumer:
retries: ${KAFKA_CONSUMER_RETRIES:5}
non-retriable-exceptions: ${KAFKA_NON_RETRIABLE_EXCEPTIONS:java.lang.IllegalArgumentException}
exception-to-dlt-mapping:
java.lang.NullPointerException: NPE
log-interceptor:
enabled: ${KAFKA_CONSUMER_LOG_INTERCEPTOR_ENABLED:false}
index-name: ${KAFKA_CONSUMER_LOG_INTERCEPTOR_INDEX_NAME:kafka-logs-${spring.application.name}}
topic-name: ${KAFKA_CONSUMER_LOG_INTERCEPTOR_TOPIC_NAME:javalt-kafka-message-logs}
producer:
jpa:
enabled: ${KAFKA_PRODUCER_JPA_ENABLED:true}
job-duration: ${KAFKA_PRODUCER_JPA_JOB_DURATION:PT5M}
page-size: ${KAFKA_PRODUCER_JPA_PAGE_SIZE:1000}
chunk-size: ${KAFKA_PRODUCER_JPA_CHUNK_SIZE:100}
log-interceptor:
enabled: ${KAFKA_PRODUCER_LOG_INTERCEPTOR_ENABLED:false}
index-name: ${KAFKA_PRODUCER_LOG_INTERCEPTOR_INDEX_NAME:kafka-logs-${spring.application.name}}
topic-name: ${KAFKA_PRODUCER_LOG_INTERCEPTOR_TOPIC_NAME:javalt-kafka-message-logs}
management:
health:
kafka:
enabled: ${KAFKA_HEALTH_CHECK_ENABLED:true}
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_ADDRESS:hvlkafka:19092,hvlkafka2:29092,hvlkafka3:39092}
consumer:
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
group-id: ${KAFKA_CONSUMER_GROUP_ID:${spring.application.name}}
isolation-level: read-committed
properties:
retry.backoff.ms: ${KAFKA_RETRY_BACKOFF_MS:1000}
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: tr.com.havelsan.javarch.kafka.json.deserializer.HvlKafkaJsonDeserializer
listener:
concurrency: ${KAFKA_LISTENER_CONCURRENCY:3}
producer:
client-id: ${spring.application.name}:${random.uuid}
value-serializer: tr.com.havelsan.javarch.kafka.json.serizalizer.HvlKafkaJsonSerializer
# acks: all
retries: ${KAFKA_PRODUCER_RETRIES:}
transaction-id-prefix: ${KAFKA_PRODUCER_TRANSACTIONAL_ID_PREFIX:${spring.kafka.producer.client-id}-tx_}
security:
protocol: ${KAFKA_SECURITY_PROTOCOL:PLAINTEXT}
management.health.kafka.enabled: Uygulama açılışında kafka'ya bağlantıyı kontrol eder.
hvl.core.kafka.support.enabled: Kafka altyapısını devreye sokar.
hvl.core.kafka.consumer.retries: Hata alınması durumunda kaç kere retry edileceği bilgisi.
hvl.core.kafka.consumer.non-retriable-exceptions: Retry edilmeden direkt olarak Dead Letter Queue(DLT)'ya atılacak exception listesi bilgisi. Yukarıdaki örnekte IllegalArgumentException alınması durumda retry edilmeceği ayarlanmıştır.
hvl.core.kafka.consumer.exception-to-dlt-mapping: Exception tipine göre oluşturulacak DLT mappingi. Yukarıdaki örnekte; consumer tarafından NullPointerException alınırsa, 5 kere tekrar denenip başarılı olamadığı durumda [TOPIC_NAME]_NPE.DLT şeklinde bir DLT topic oluşturulacak ve başarısız mesaj buraya yazılacaktır.
hvl.core.kafka.consumer.log-interceptor.enabled: Kafka consumer'ları tarafından tüketilen mesajların loglanmasını aktif eder. Loglar kafkaya yazılır, oradan fluentd ile elastic üzerine aktarılır.
hvl.core.kafka.consumer.log-interceptor.index-name: Elasticsearch'e aktarılan logların hangi index üzerinde saklanacağı bilgisidir.
hvl.core.kafka.consumer.log-interceptor.topic-name: Consumer loglarının gönderileceği kafka topic ismi bilgisidir.
hvl.core.kafka.producer.jpa.enabled: Kafka'ya mesaj gönderirken veritabanı desteğini devreye alır. Böylece veri kaybının önüne geçilmiş olur. Bunu kullanmak için hvl-kafka
paketi yerine hvl-kafka-jpa
paketi kullanılmalıdır.
hvl.core.kafka.producer.jpa.job-duration: Kafka işlemleri sırasında kesinti yaşandığında veritabanına atılan kayıtların işlenme aralığıdır. ISO 8601 formatı kullanılabilir. Varsayılan PT5M; 5 dakikada bir çalışması anlamına gelir.
hvl.core.kafka.producer.jpa.page-size: Kafka işlemleri sırasında kesinti yaşandığında veritabanına atılan kayıtların işlenirken veritabanından çekilen veri miktarıdır.
hvl.core.kafka.producer.jpa.chunk-size: Kafka işlemleri sırasında kesinti yaşandığında veritabanına atılan kayıtların işlenirken kullanılan buffer miktarıdır.
hvl.core.kafka.producer.jpa.job-interval-second: Veritabanına yazılan hatalı kafka mesajlarının işlenmesi için gerekli job'ın saniye cinsinden çalışma aralığı süresidir.
hvl.core.kafka.producer.log-interceptor.enabled: Kafka'ya gönderilen mesajların loglanmasını aktif eder. Loglar kafkaya yazılır, oradan fluentd ile elastic üzerine aktarılır.
hvl.core.kafka.producer.log-interceptor.index-name: Elasticsearch'e aktarılan logların hangi index üzerinde saklanacağı bilgisidir.
hvl.core.kafka.producer.log-interceptor.topic-name: Kafkaya gönderilen mesajların loglarının gönderileceği kafka topic ismi bilgisidir.
spring.kafka.consumer.group-id: Consumer'ların group id bilgisidir. Bu bilgi ile kafka topiclerine register olurlar. Her consumer için farklı group id verilmek istenirse @KafkaListener anotasyonu içerisinde özelleştirilebilir. Consumer başlığı altında detaylı örnek verilecektir.
spring.kafka.consumer.properties.retry.backoff.ms: Her bir tekrar deneme(retry) arasında beklenilecek süre. ( milisaniye cinsinden)
spring.kafka.listener.concurrency: Her bir consumer'ın thread sayısı. Örnek olarak concurreny 3 ise, her bir consumer'dan 3 adet oluşturulur gibi düşünülebilir.
spring.kafka.producer.properties.transactional.id: Transaction için kullanılacak id prefix bilgisidir. Transaction devreye alınırsa bu alan doldurulmak zorundadır.
Producer#
Bağımlılıklar başlığında belirtilen bağımlılık eklendikten sonra, Kafkaya mesaj göndermek için alttaki gibi basit bir kod kullanılabilir. Mesajları gönderirken HvlEventNotifier kullanılmalıdır. İsteğe göre sync veya async mesaj gönderimi yapılabilir. Key, topic, partition gibi bilgilerin gonderimi sağlanabilir.
@Autowired
private final HvlEventNotifier eventNotifier;
public void send() {
.
.
eventNotifier.notifySync(exampleEvent);
.
.
}
HvlEventNotifier.java
package tr.com.havelsan.javarch.kafka.notifier;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import tr.com.havelsan.javarch.event.commons.HvlEvent;
import tr.com.havelsan.javarch.kafka.exception.HvlKafkaProducerException;
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
* {@link KafkaTemplate} kullanilarak kafka uzerinden event firlatmayi saglayan siniftir.
*
* @author acuhadaroglu
*/
public interface HvlEventNotifier {
/**
* Ozel bir topic'e {@link HvlEvent} tipindeki event'i async olarak gondermek amaciyla kullanilir
*
* @param <E> the type parameter
* @param topicName topic ismi
* @param event Gonderilecek event ve topic bilgisi'
* @param callback async cagrinin success ve fail durumlarinda yapilacaklarin belirtildigi callback sinifi CompletableFuture
* @return the completable future
* @see SendResult
*/
<E extends Serializable> CompletableFuture<SendResult<String, Object>> notifyAsync(
@NotBlank String topicName,
@NotNull E event,
@NotNull Consumer<SendResult<String, Object>> callback);
/**
* Notify async completable future.
*
* @param <E> the type parameter
* @param topicName the topic name
* @param key the key
* @param event the event
* @param callback the callback
* @return the completable future
*/
<E extends Serializable> CompletableFuture<SendResult<String, Object>> notifyAsync(
@NotBlank String topicName,
@NotBlank String key,
@NotNull E event,
@NotNull Consumer<SendResult<String, Object>> callback);
/**
* Notify async completable future.
*
* @param <E> the type parameter
* @param topicName the topic name
* @param partition the partition
* @param key the key
* @param event the event
* @param callback the callback
* @return the completable future
*/
<E extends Serializable> CompletableFuture<SendResult<String, Object>> notifyAsync(
@NotBlank String topicName,
@NotNull Integer partition,
@NotBlank String key,
@NotNull E event,
@NotNull Consumer<SendResult<String, Object>> callback);
/**
* Notify async completable future.
*
* @param producerRecord the producer record
* @param callback the callback
* @return the completable future
*/
CompletableFuture<SendResult<String, Object>> notifyAsync(
@NotNull ProducerRecord<String, Object> producerRecord,
@NotNull Consumer<SendResult<String, Object>> callback);
/**
* Ozel bir topic'e {@link HvlEvent} tipindeki event'i sync olarak gondermek amaciyla kullanilir
*
* @param <E> the type parameter
* @param topicName topic ismi
* @param event Gonderilecek event ve topic bilgisi' CompletableFuture
* @return the send result
* @throws HvlKafkaProducerException the hvl kafka producer exception
* @see SendResult
*/
<E extends Serializable> SendResult<String, Object> notifySync(@NotBlank String topicName, @NotNull E event)
throws HvlKafkaProducerException;
/**
* Notify sync send result.
*
* @param <E> the type parameter
* @param topicName the topic name
* @param key the key
* @param event the event
* @return the send result
* @throws HvlKafkaProducerException the hvl kafka producer exception
*/
<E extends Serializable> SendResult<String, Object> notifySync(@NotBlank String topicName,
@NotBlank String key,
@NotNull E event) throws HvlKafkaProducerException;
/**
* Notify sync send result.
*
* @param <E> the type parameter
* @param topicName the topic name
* @param partition the partition
* @param key the key
* @param event the event
* @return the send result
* @throws HvlKafkaProducerException the hvl kafka producer exception
*/
<E extends Serializable> SendResult<String, Object> notifySync(@NotBlank String topicName,
@NotNull Integer partition,
@NotBlank String key,
@NotNull E event) throws HvlKafkaProducerException;
/**
* Notify sync send result.
*
* @param producerRecord the producer record
* @return the send result
* @throws HvlKafkaProducerException the hvl kafka producer exception
*/
SendResult<String, Object> notifySync(@NotNull ProducerRecord<String, Object> producerRecord)
throws HvlKafkaProducerException;
/**
* {@link HvlEvent#getTopic()} ile belirtilen topic'e {@link HvlEvent} tipindeki event'i async olarak gondermek amaciyla kullanilir
*
* @param <HE> the type parameter
* @param event Gonderilecek event ve topic bilgisi'
* @param callback async cagrinin success ve fail durumlarinda yapilacaklarin belirtildigi callback sinifi CompletableFuture
* @return the completable future
* @see CompletableFuture
*/
<HE extends HvlEvent<?>> CompletableFuture<SendResult<String, Object>> notifyAsync(
@NotNull HE event,
@NotNull Consumer<SendResult<String, Object>> callback);
/**
* Ozel bir topic'e {@link HvlEvent} tipindeki event'i async olarak gondermek amaciyla kullanilir
*
* @param <E> the type parameter
* @param topicName topic ismi
* @param event Gonderilecek event ve topic bilgisi'
* @param callback async cagrinin success ve fail durumlarinda yapilacaklarin belirtildigi callback sinifi CompletableFuture
* @return the completable future
* @see CompletableFuture
* @deprecated use {@link #notifyAsync(HvlEvent, Consumer)} instead of this by setting topic in event.
*/
@Deprecated(since = "3.2.0", forRemoval = true)
<E extends HvlEvent<?>> CompletableFuture<SendResult<String, Object>> notifyAsync(
@NotBlank String topicName,
@NotNull E event,
@NotNull Consumer<SendResult<String, Object>> callback);
/**
* {@link HvlEvent#getTopic()} ile belirtilen topic'e {@link HvlEvent} tipindeki event'i sync olarak gondermek amaciyla kullanilir
*
* @param <E> the type parameter
* @param event Gonderilecek event ve topic bilgisi CompletableFuture
* @return the send result
* @throws HvlKafkaProducerException the hvl kafka producer exception
* @see CompletableFuture
*/
<E extends HvlEvent<?>> SendResult<String, Object> notifySync(@NotNull E event) throws HvlKafkaProducerException;
/**
* Ozel bir topic'e {@link HvlEvent} tipindeki event'i sync olarak gondermek amaciyla kullanilir
*
* @param <E> the type parameter
* @param topicName topic ismi
* @param event Gonderilecek event ve topic bilgisi' CompletableFuture
* @return the send result
* @throws HvlKafkaProducerException the hvl kafka producer exception
* @see CompletableFuture
* @deprecated use {@link #notifyAsync(HvlEvent, Consumer)} instead of this by setting topic in event.
*/
@Deprecated(since = "3.2.0", forRemoval = true)
<E extends HvlEvent<?>> SendResult<String, Object> notifySync(@NotBlank String topicName, @NotNull E event)
throws HvlKafkaProducerException;
}
Veritabanı Destekli Producer#
Kafka'ya mesaj gönderilirken, kafka'nın çökmesi, network problemleri ve ardından uygulama çökmesi gibi durumlara karşı veritabanı destekli kafka mesajı gönderme yapılabilir. Sadece hatalı mesajları veritabanında tutmaktadır ve bu sayede, veritabanında kafka mesajları ayrıca yer tutmayacaktır. Hatalı mesajlar da işlendikten sonra veritabanından temizlenmektedir. Bundan dolayı performans kaybı minimum seviyede olacaktır.
Konfigürasyon#
hvl-kafka
paketi yerine hvl-kafka-jpa
paketi kullanılmalıdır.
hvl-infra altında bulunan 'application-hvl-scheduling.yml' dosyasıyla konfigure edilebilmektedir.
Liquibase kullanılıyorsa gerekli tabloların oluşması için aşağıdaki şekilde liquibase root yaml dosyasına ekleme yapılmalıdır.
hvl-javalt-samples projesinde örnek uygulamaya module/kafka/kafka-jpa
pathi takip edilerek ulaşılabilir.
Kafka Transaction#
Eğer kafka transaction aktif ise transaction başlatılmak istenen metodun başına @Transactional
veya eksenin sağladığı @HvlTransactionalRollbackForCheckedException
anotasyonunu eklemek yeterli olacaktır.
@Autowired
private final HvlEventNotifier eventNotifier;
@HvlTransactionalRollbackForCheckedException
public void send() {
.
.
eventNotifier.notifySync(exampleEvent);
eventNotifier.notifySync(exampleEvent2);
eventNotifier.notifySync(exampleEvent3);
.
.
}
Consumer#
Bağımlılıklar başlığında belirtilen bağımlılık eklendikten sonra, aşağıda kafkadan gelecek mesajları okuyan basit bir consumer örneği verilmiştir. topics alanında verilen değer hangi kafka topic'ine register olacağı bilgisidir. Bunun yanında consumer group id, error handler, concurrecy gibi bilgiler de manuel olarak buradan konfigüre edilebilir. Buradan verilecek değerler default olarak altyapıdan gelen ve yml'dan gelen bilgileri yok sayar. Özetle, @KafkaListener içerisinden verilen değerler en üst önceliklidir.
@KafkaListener(topics = EVENT_LOG_TOPIC)
public void consumeEventLog(ExampleEvent event) {
//do stuff
}
Kafka Docker-Compose Uygulaması#
Kafka docker compose örneklerine javalt framework'ün sağladığı infra projesi üzerinden ulaşılabilir.
Kafka docker compose: kafka-docker-compose.yml
Kafka cluster docker compose: kafka-cluster-docker-compose.yml