1.9. Message Queue#
Uygulamalarda message queue yapısının kullanmasını sağlayan altyapı bileşenidir. Kafka desteği sağlanmaktadır.
Kafka#
Neden bu altyapıyı tercih etmeliyim:
- Spring kafka ile tamamen entegre çalışmaktadır. Herhangi bir kısıtlama yapılmamıştır.
- Transaction yönetimi için iyileştirmeler ve kullanım kolaylığı sağlanmıştır.
- Kafka konfigüratif health check mekanizması barındırmaktadır.
- Kafkayı konfigüratif olarak devreye alıp, devreden çıkarma mekanizması sağlanmaktadır.
- Retry edilmeyecek exceptionların konfigüratif olarak ayarlanabilmesine olanak sağlar.
- Konfigüratif olarak tanımlanan exception'lara göre Dead Letter Queue'lar oluşturulmasına olanak sağlar.
- Bağlantı kopması ve sunucu çökmeleri sonucu veri kaybı durumlarının sıfıra indirildiği çözüm yöntemi sağlar.
- Bağlantı problemleri gibi durumlar için sonsuza kadar tekrar edilebilmeyi sağlayacak exception tanımları yapılabilir. Tekrar özellikleri konfigüratiftir.
Kafka ürünü ve çalışma yapısı ile ilgili detaylı bilgiye Kafka sayfasından ulaşılabilir.
hvl-infra altında bulunan 'application-kafka.yml' dosyasıyla konfigure edilebilmektedir.
YML Konfigürasyonları#
hvl:
core:
kafka:
support:
enabled: ${KAFKA_SUPPORT_ENABLED:false}
consumer:
retries: ${KAFKA_CONSUMER_RETRIES:5}
non-retryable-exceptions: ${KAFKA_NON_RETRYABLE_EXCEPTIONS:java.lang.IllegalArgumentException}
exception-to-dlt-mapping:
java.lang.NullPointerException: NPE
retryable-exception-properties:
exceptions: ${KAFKA_RETRYABLE_EXCEPTIONS:java.net.ConnectException}
backoff:
interval-ms: ${KAFKA_RETRYABLE_BACKOFF_INTERVAL_MS:2000}
multiplier: ${KAFKA_RETRYABLE_BACKOFF_MULTIPLIER:1.5}
max-interval-ms: ${KAFKA_RETRYABLE_BACKOFF_MAX_INTERVAL_MS:60000}
log-interceptor:
enabled: ${KAFKA_CONSUMER_LOG_INTERCEPTOR_ENABLED:false}
index-name: ${KAFKA_CONSUMER_LOG_INTERCEPTOR_INDEX_NAME:kafka-logs-${spring.application.name}}
topic-name: ${KAFKA_CONSUMER_LOG_INTERCEPTOR_TOPIC_NAME:javalt-kafka-message-logs}
inbox:
enabled: ${KAFKA_CONSUMER_INBOX_ENABLED:true}
cleaner:
enabled: ${KAFKA_CONSUMER_INBOX_CLEANER_ENABLED:false}
job-cron: ${KAFKA_CONSUMER_INBOX_CLEANER_JOB_CRON:0 0 0 ? * SUN *}
page-size: ${KAFKA_CONSUMER_INBOX_CLEANER_PAGE_SIZE:1000}
ttl-hours: ${KAFKA_CONSUMER_INBOX_CLEANER_TTL_HOURS:168}
producer:
jpa:
enabled: ${KAFKA_PRODUCER_JPA_ENABLED:true}
job-duration: ${KAFKA_PRODUCER_JPA_JOB_DURATION:PT5M}
page-size: ${KAFKA_PRODUCER_JPA_PAGE_SIZE:1000}
chunk-size: ${KAFKA_PRODUCER_JPA_CHUNK_SIZE:100}
outbox:
job-duration: ${KAFKA_PRODUCER_OUTBOX_JOB_DURATION:PT5S}
page-size: ${KAFKA_PRODUCER_OUTBOX_PAGE_SIZE:1000}
chunk-size: ${KAFKA_PRODUCER_OUTBOX_CHUNK_SIZE:100}
max-retry: ${KAFKA_PRODUCER_OUTBOX_MAX_RETRY:5}
cleaner:
enabled: ${KAFKA_PRODUCER_OUTBOX_CLEANER_ENABLED:false}
job-cron: ${KAFKA_PRODUCER_OUTBOX_CLEANER_JOB_CRON:0 0 0 ? * SUN *}
page-size: ${KAFKA_PRODUCER_OUTBOX_CLEANER_PAGE_SIZE:1000}
ttl-hours: ${KAFKA_PRODUCER_OUTBOX_CLEANER_TTL_HOURS:168}
log-interceptor:
enabled: ${KAFKA_PRODUCER_LOG_INTERCEPTOR_ENABLED:false}
index-name: ${KAFKA_PRODUCER_LOG_INTERCEPTOR_INDEX_NAME:kafka-logs-${spring.application.name}}
topic-name: ${KAFKA_PRODUCER_LOG_INTERCEPTOR_TOPIC_NAME:javalt-kafka-message-logs}
management:
health:
kafka:
enabled: ${KAFKA_HEALTH_CHECK_ENABLED:true}
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_ADDRESS:hvlkafka:19092,hvlkafka2:29092,hvlkafka3:39092}
consumer:
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
group-id: ${KAFKA_CONSUMER_GROUP_ID:${spring.application.name}}
isolation-level: read-committed
properties:
retry.backoff.ms: ${KAFKA_RETRY_BACKOFF_MS:1000}
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: tr.com.havelsan.javarch.kafka.json.deserializer.HvlKafkaJsonDeserializer
listener:
concurrency: ${KAFKA_LISTENER_CONCURRENCY:3}
producer:
client-id: ${spring.application.name}:${random.uuid}
value-serializer: tr.com.havelsan.javarch.kafka.json.serizalizer.HvlKafkaJsonSerializer
# acks: all
retries: ${KAFKA_PRODUCER_RETRIES:}
transaction-id-prefix: ${KAFKA_PRODUCER_TRANSACTIONAL_ID_PREFIX:${spring.kafka.producer.client-id}-tx_}
security:
protocol: ${KAFKA_SECURITY_PROTOCOL:PLAINTEXT}
management.health.kafka.enabled: Uygulama açılışında kafka'ya bağlantıyı kontrol eder.
hvl.core.kafka.support.enabled: Kafka altyapısını devreye sokar.
hvl.core.kafka.consumer.retries: Hata alınması durumunda kaç kere retry edileceği bilgisi.
hvl.core.kafka.consumer.non-retryable-exceptions: Retry edilmeden direkt olarak Dead Letter Queue(DLT)'ya atılacak exception listesi bilgisi. Yukarıdaki örnekte IllegalArgumentException alınması durumda retry edilmeceği ayarlanmıştır.
hvl.core.kafka.consumer.exception-to-dlt-mapping: Exception tipine göre oluşturulacak DLT mappingi. Yukarıdaki örnekte; consumer tarafından NullPointerException alınırsa, 5 kere tekrar denenip başarılı olamadığı durumda [TOPIC_NAME]_NPE.DLT şeklinde bir DLT topic oluşturulacak ve başarısız mesaj buraya yazılacaktır.
hvl.core.kafka.consumer.retryable-exception-properties.exceptions: Consume sırasında verilen exception sınıflarının oluşması durumunda aynı mesaj sürekli belirli aralıklarla consume edilmeye devam edecektir. Aralıklar backoff
property'si altından ayarlanabilmektedir.
hvl.core.kafka.consumer.retryable-exception-properties.backoff.interval-ms: Consume sırasında retryable exceptionlardan biri alınırsa tekrardan önceki ilk bekleme süresini belirtir.
hvl.core.kafka.consumer.retryable-exception-properties.backoff.multiplier: Consume sırasında retryable exceptionlardan biri alınırsa ilk tekrardan sonraki beklemeler için bekleme süresi çarpanıdır.
hvl.core.kafka.consumer.retryable-exception-properties.backoff.max-interval-ms: Consume sırasında retryable exceptionlardan biri alınırsa artan tekrar sürelerinin maksimum değeridir. Bekleme süresi bu değerden daha büyük olamaz.
hvl.core.kafka.consumer.log-interceptor.enabled: Kafka consumer'ları tarafından tüketilen mesajların loglanmasını aktif eder. Loglar kafkaya yazılır, oradan fluentd ile elastic üzerine aktarılır.
hvl.core.kafka.consumer.log-interceptor.index-name: Elasticsearch'e aktarılan logların hangi index üzerinde saklanacağı bilgisidir.
hvl.core.kafka.consumer.log-interceptor.topic-name: Consumer loglarının gönderileceği kafka topic ismi bilgisidir.
hvl.core.kafka.consumer.inbox.enabled: Kafka consumer inbox özelliğini aktif eder. Bu özellik sayesinde tüketilen mesajlar veritabanında saklanarak yeniden işleme durumlarında idempotent davranış sağlanır.
hvl.core.kafka.consumer.inbox.cleaner.enabled: Kafka consumer inbox temizleme işlemini aktif eder. Bu özellik sayesinde eski inbox kayıtları otomatik olarak temizlenir.
hvl.core.kafka.consumer.inbox.cleaner.job-cron: Inbox temizleme işlemi için çalışacak cron ifadesidir. Varsayılan olarak her pazar günü gece yarısı çalışır.
hvl.core.kafka.consumer.inbox.cleaner.page-size: Inbox temizleme işlemi sırasında veritabanından bir seferde çekilecek maksimum kayıt sayısıdır.
hvl.core.kafka.consumer.inbox.cleaner.ttl-hours: Inbox kayıtlarının kaç saat saklanacağını belirler. Varsayılan olarak 168 saat (7 gün) sonra kayıtlar silinir.
hvl.core.kafka.producer.jpa.enabled: Kafka'ya mesaj gönderirken veritabanı desteğini devreye alır. Böylece veri kaybının önüne geçilmiş olur. Bunu kullanmak için hvl-kafka
paketi yerine hvl-kafka-jpa
paketi kullanılmalıdır.
hvl.core.kafka.producer.jpa.job-duration: Kafka işlemleri sırasında kesinti yaşandığında veritabanına atılan kayıtların işlenme aralığıdır. ISO 8601 formatı kullanılabilir. Varsayılan PT5M; 5 dakikada bir çalışması anlamına gelir.
hvl.core.kafka.producer.jpa.page-size: Kafka işlemleri sırasında kesinti yaşandığında veritabanına atılan kayıtların işlenirken veritabanından çekilen veri miktarıdır.
hvl.core.kafka.producer.jpa.chunk-size: Kafka işlemleri sırasında kesinti yaşandığında veritabanına atılan kayıtların işlenirken kullanılan buffer miktarıdır.
hvl.core.kafka.producer.outbox.job-duration: Outbox kayıtlarının işlenme aralığıdır. ISO 8601 formatı kullanılabilir. Varsayılan PT5S; 5 saniyede bir çalışması anlamına gelir.
hvl.core.kafka.producer.outbox.page-size: Outbox işlemi sırasında veritabanından bir seferde çekilecek maksimum kayıt sayısıdır.
hvl.core.kafka.producer.outbox.chunk-size: Outbox işlemi sırasında kullanılan buffer miktarıdır.
hvl.core.kafka.producer.outbox.max-retry: Outbox kayıtlarının gönderilmesi sırasında maksimum deneme sayısıdır.
hvl.core.kafka.producer.outbox.cleaner.enabled: Outbox temizleme işlemini aktif eder. Bu özellik sayesinde eski outbox kayıtları otomatik olarak temizlenir.
hvl.core.kafka.producer.outbox.cleaner.job-cron: Outbox temizleme işlemi için çalışacak cron ifadesidir. Varsayılan olarak her pazar günü gece yarısı çalışır.
hvl.core.kafka.producer.outbox.cleaner.page-size: Outbox temizleme işlemi sırasında veritabanından bir seferde çekilecek maksimum kayıt sayısıdır.
hvl.core.kafka.producer.outbox.cleaner.ttl-hours: Outbox kayıtlarının kaç saat saklanacağını belirler. Varsayılan olarak 168 saat (7 gün) sonra kayıtlar silinir.
hvl.core.kafka.producer.log-interceptor.enabled: Kafka'ya gönderilen mesajların loglanmasını aktif eder. Loglar kafkaya yazılır, oradan fluentd ile elastic üzerine aktarılır.
hvl.core.kafka.producer.log-interceptor.index-name: Elasticsearch'e aktarılan logların hangi index üzerinde saklanacağı bilgisidir.
hvl.core.kafka.producer.log-interceptor.topic-name: Kafkaya gönderilen mesajların loglarının gönderileceği kafka topic ismi bilgisidir.
spring.kafka.consumer.group-id: Consumer'ların group id bilgisidir. Bu bilgi ile kafka topiclerine register olurlar. Her consumer için farklı group id verilmek istenirse @KafkaListener anotasyonu içerisinde özelleştirilebilir. Consumer başlığı altında detaylı örnek verilecektir.
spring.kafka.consumer.properties.retry.backoff.ms: Her bir tekrar deneme(retry) arasında beklenilecek süre. ( milisaniye cinsinden)
spring.kafka.listener.concurrency: Her bir consumer'ın thread sayısı. Örnek olarak concurreny 3 ise, her bir consumer'dan 3 adet oluşturulur gibi düşünülebilir.
spring.kafka.producer.properties.transactional.id: Transaction için kullanılacak id prefix bilgisidir. Transaction devreye alınırsa bu alan doldurulmak zorundadır.
Producer#
Bağımlılıklar başlığında belirtilen bağımlılık eklendikten sonra, Kafkaya mesaj göndermek için alttaki gibi basit bir kod kullanılabilir. Mesajları gönderirken HvlEventNotifier kullanılmalıdır. İsteğe göre sync veya async mesaj gönderimi yapılabilir. Key, topic, partition gibi bilgilerin gonderimi sağlanabilir.
@Autowired
private final HvlEventNotifier eventNotifier;
public void send() {
.
.
eventNotifier.notifySync(exampleEvent);
.
.
}
HvlEventNotifier.java
package tr.com.havelsan.javarch.kafka.notifier;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import tr.com.havelsan.javarch.event.commons.HvlEvent;
import tr.com.havelsan.javarch.kafka.exception.HvlKafkaProducerException;
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
* {@link KafkaTemplate} kullanilarak kafka uzerinden event firlatmayi saglayan siniftir.
*
* @author acuhadaroglu
*/
public interface HvlEventNotifier {
/**
* Ozel bir topic'e {@link HvlEvent} tipindeki event'i async olarak gondermek amaciyla kullanilir
*
* @param <E> the type parameter
* @param topicName topic ismi
* @param event Gonderilecek event ve topic bilgisi'
* @param callback async cagrinin success ve fail durumlarinda yapilacaklarin belirtildigi callback sinifi CompletableFuture
* @return the completable future
* @see SendResult
*/
<E extends Serializable> CompletableFuture<SendResult<String, Object>> notifyAsync(
@NotBlank String topicName,
@NotNull E event,
@NotNull Consumer<SendResult<String, Object>> callback);
/**
* Notify async completable future.
*
* @param <E> the type parameter
* @param topicName the topic name
* @param key the key
* @param event the event
* @param callback the callback
* @return the completable future
*/
<E extends Serializable> CompletableFuture<SendResult<String, Object>> notifyAsync(
@NotBlank String topicName,
@NotBlank String key,
@NotNull E event,
@NotNull Consumer<SendResult<String, Object>> callback);
/**
* Notify async completable future.
*
* @param <E> the type parameter
* @param topicName the topic name
* @param partition the partition
* @param key the key
* @param event the event
* @param callback the callback
* @return the completable future
*/
<E extends Serializable> CompletableFuture<SendResult<String, Object>> notifyAsync(
@NotBlank String topicName,
@NotNull Integer partition,
@NotBlank String key,
@NotNull E event,
@NotNull Consumer<SendResult<String, Object>> callback);
/**
* Notify async completable future.
*
* @param producerRecord the producer record
* @param callback the callback
* @return the completable future
*/
CompletableFuture<SendResult<String, Object>> notifyAsync(
@NotNull ProducerRecord<String, Object> producerRecord,
@NotNull Consumer<SendResult<String, Object>> callback);
/**
* Ozel bir topic'e {@link HvlEvent} tipindeki event'i sync olarak gondermek amaciyla kullanilir
*
* @param <E> the type parameter
* @param topicName topic ismi
* @param event Gonderilecek event ve topic bilgisi' CompletableFuture
* @return the send result
* @throws HvlKafkaProducerException the hvl kafka producer exception
* @see SendResult
*/
<E extends Serializable> SendResult<String, Object> notifySync(@NotBlank String topicName, @NotNull E event)
throws HvlKafkaProducerException;
/**
* Notify sync send result.
*
* @param <E> the type parameter
* @param topicName the topic name
* @param key the key
* @param event the event
* @return the send result
* @throws HvlKafkaProducerException the hvl kafka producer exception
*/
<E extends Serializable> SendResult<String, Object> notifySync(@NotBlank String topicName,
@NotBlank String key,
@NotNull E event) throws HvlKafkaProducerException;
/**
* Notify sync send result.
*
* @param <E> the type parameter
* @param topicName the topic name
* @param partition the partition
* @param key the key
* @param event the event
* @return the send result
* @throws HvlKafkaProducerException the hvl kafka producer exception
*/
<E extends Serializable> SendResult<String, Object> notifySync(@NotBlank String topicName,
@NotNull Integer partition,
@NotBlank String key,
@NotNull E event) throws HvlKafkaProducerException;
/**
* Notify sync send result.
*
* @param producerRecord the producer record
* @return the send result
* @throws HvlKafkaProducerException the hvl kafka producer exception
*/
SendResult<String, Object> notifySync(@NotNull ProducerRecord<String, Object> producerRecord)
throws HvlKafkaProducerException;
/**
* {@link HvlEvent#getTopic()} ile belirtilen topic'e {@link HvlEvent} tipindeki event'i async olarak gondermek amaciyla kullanilir
*
* @param <HE> the type parameter
* @param event Gonderilecek event ve topic bilgisi'
* @param callback async cagrinin success ve fail durumlarinda yapilacaklarin belirtildigi callback sinifi CompletableFuture
* @return the completable future
* @see CompletableFuture
*/
<HE extends HvlEvent<?>> CompletableFuture<SendResult<String, Object>> notifyAsync(
@NotNull HE event,
@NotNull Consumer<SendResult<String, Object>> callback);
/**
* Ozel bir topic'e {@link HvlEvent} tipindeki event'i async olarak gondermek amaciyla kullanilir
*
* @param <E> the type parameter
* @param topicName topic ismi
* @param event Gonderilecek event ve topic bilgisi'
* @param callback async cagrinin success ve fail durumlarinda yapilacaklarin belirtildigi callback sinifi CompletableFuture
* @return the completable future
* @see CompletableFuture
* @deprecated use {@link #notifyAsync(HvlEvent, Consumer)} instead of this by setting topic in event.
*/
@Deprecated(since = "3.2.0", forRemoval = true)
<E extends HvlEvent<?>> CompletableFuture<SendResult<String, Object>> notifyAsync(
@NotBlank String topicName,
@NotNull E event,
@NotNull Consumer<SendResult<String, Object>> callback);
/**
* {@link HvlEvent#getTopic()} ile belirtilen topic'e {@link HvlEvent} tipindeki event'i sync olarak gondermek amaciyla kullanilir
*
* @param <E> the type parameter
* @param event Gonderilecek event ve topic bilgisi CompletableFuture
* @return the send result
* @throws HvlKafkaProducerException the hvl kafka producer exception
* @see CompletableFuture
*/
<E extends HvlEvent<?>> SendResult<String, Object> notifySync(@NotNull E event) throws HvlKafkaProducerException;
/**
* Ozel bir topic'e {@link HvlEvent} tipindeki event'i sync olarak gondermek amaciyla kullanilir
*
* @param <E> the type parameter
* @param topicName topic ismi
* @param event Gonderilecek event ve topic bilgisi' CompletableFuture
* @return the send result
* @throws HvlKafkaProducerException the hvl kafka producer exception
* @see CompletableFuture
* @deprecated use {@link #notifySync(String, Serializable)} instead of this by setting topic in event.
*/
@Deprecated(since = "3.2.0", forRemoval = true)
<E extends HvlEvent<?>> SendResult<String, Object> notifySync(@NotBlank String topicName, @NotNull E event)
throws HvlKafkaProducerException;
}
Veritabanı Destekli Producer (Safe Box)#
Kafka'ya mesaj gönderilirken, kafka'nın çökmesi, network problemleri ve ardından uygulama çökmesi gibi durumlara karşı veritabanı destekli kafka mesajı gönderme yapılabilir. Sadece hatalı mesajları veritabanında tutmaktadır ve bu sayede, veritabanında kafka mesajları ayrıca yer tutmayacaktır. Hatalı mesajlar da işlendikten sonra veritabanından temizlenmektedir. Bundan dolayı performans kaybı minimum seviyede olacaktır.
Konfigürasyon#
hvl-kafka
paketi yerine hvl-kafka-jpa
paketi kullanılmalıdır.
hvl-infra altında bulunan 'application-hvl-scheduling.yml' dosyasıyla konfigure edilebilmektedir.
Liquibase kullanılıyorsa gerekli tabloların oluşması için aşağıdaki şekilde liquibase root yaml dosyasına ekleme yapılmalıdır.
hvl-javalt-samples projesinde örnek uygulamaya module/kafka/kafka-jpa
pathi takip edilerek ulaşılabilir.
Outbox Producer#
Outbox Producer, Kafka mesajlarının güvenli bir şekilde iletilmesini sağlayan, veritabanı destekli bir mesajlaşma mekanizmasıdır. Bu yapı sayesinde:
- Mesajlar önce veritabanında bir
hvl_kafka_outbox_message
tablosuna kaydedilir ve sonra Kafka'ya gönderilir, böylece veri kaybı önlenir. - Uygulama çökmesi veya Kafka'ya bağlantı sorunları gibi durumlarda bile mesajların kaybolmaması garanti edilir.
- Periyodik çalışan bir iş ile bekleyen mesajlar otomatik olarak işlenir ve Kafka'ya gönderilir.
- İşlenmiş eski mesajlar için otomatik temizleme mekanizması sağlanır.
- Veri tutarlılığı için veritabanı işlemleri ile mesaj gönderimi arasında transactional bütünlük sağlanır.
Bu yaklaşım, özellikle yüksek güvenilirlik gerektiren sistemlerde veri kaybını önlemek için kullanılır ve "at-least-once" mesaj iletim garantisi sunar.
Konfigürasyon#
hvl-kafka
paketi yerine hvl-kafka-box
paketi kullanılmalıdır.
hvl-infra altında bulunan application-hvl-scheduling.yml, application-kafka.yml dosyasıyla konfigure edilebilmektedir. hvl.core.kafka.producer.outbox
ile başlayan konfigürasyonlar incelenebilir.
Liquibase kullanılıyorsa gerekli tabloların oluşması için aşağıdaki şekilde liquibase root yaml dosyasına ekleme yapılmalıdır.
hvl-javalt-samples projesinde örnek uygulamaya module/kafka/kafka-box
pathi takip edilerek ulaşılabilir.
Kullanım#
Bu sistemde mesajların kafkaya iletilmesi için öncelikle hvl_kafka_outbox_message
yazılması gerekmektedir. Bunun için HvlKafkaOutboxService
sınıfı kullanılmalıdır. Bu sınıf inject edilerek içerisindeki publish
metodları kullanılarak outbox tablosuna kayıt yazılabilir.
final HvlKafkaOutboxModel<HvlKafkaOutboxEvent> kafkaOutboxModel = HvlKafkaOutboxModel.builder()
.withTopic(topic)
.withKey("test")
.withPartition(0)
.withMessage(new HvlKafkaBoxSampleEvent(requestId, userId, "Arda", "Çuhadaroğlu"))
.build();
kafkaOutboxService.publish(kafkaOutboxModel);
hvl_kafka_outbox_message
tablosuna yazılan mesajların işlenip kafkaya gönderilmesi için arka planda bir iş çalışmaktadır. Bu iş varsayılan olarak 5 saniyede bir çalışmakta olup application-kafka.yml
dosyasındaki hvl.core.kafka.producer.outbox.job-duration
konfigürasyonu ile süresi değiştirilebilmektedir.
hvl_kafka_outbox_message
tablosunda çok veri birikme durumu için arka planda bir temizleyici işi çalışmaktadır. Bu iş vasıtasıyla tabloda durumu SENT
olan kayıtların temizlenmesi sağlanmaktadır.
Kafka Transaction#
Eğer kafka transaction aktif ise transaction başlatılmak istenen metodun başına @Transactional
veya eksenin sağladığı @HvlTransactionalRollbackForCheckedException
anotasyonunu eklemek yeterli olacaktır.
@Autowired
private final HvlEventNotifier eventNotifier;
@HvlTransactionalRollbackForCheckedException
public void send() {
.
.
eventNotifier.notifySync(exampleEvent);
eventNotifier.notifySync(exampleEvent2);
eventNotifier.notifySync(exampleEvent3);
.
.
}
Consumer#
Bağımlılıklar başlığında belirtilen bağımlılık eklendikten sonra, aşağıda kafkadan gelecek mesajları okuyan basit bir consumer örneği verilmiştir. topics alanında verilen değer hangi kafka topic'ine register olacağı bilgisidir. Bunun yanında consumer group id, error handler, concurrecy gibi bilgiler de manuel olarak buradan konfigüre edilebilir. Buradan verilecek değerler default olarak altyapıdan gelen ve yml'dan gelen bilgileri yok sayar. Özetle, @KafkaListener içerisinden verilen değerler en üst önceliklidir.
@KafkaListener(topics = EVENT_LOG_TOPIC)
public void consumeEventLog(ExampleEvent event) {
//do stuff
}
Farklı Bir Model Kullanarak Consume Etme İşlemi#
Produce edilen modelin tüm alanlarına ihtiyaç duyulmaması durumunda daha küçük modeller oluşturularak, consume işlemi o model ile yapılabilmektedir.
Uyarı
Oluşturulacak daha küçük modelin değişken isimleri diğer model ile aynı olmalıdır.
Java tip değişim işleminin gerçekleşebilmesi için json deserializer
sınıfında kullanılmak üzere type resolver
tanımlanması gerekmektedir. Aşağıdaki örnekte görüldüğü üzere public static
ve örnekteki parametrelere sahip bir method tanımlanmalıdır.
public class HvlKafkaSampleCommonConfiguration extends HvlBaseConfiguration {
public static JavaType typeResolver(String topic, byte[] data, Headers headers) {
if (topic.equals(HvlKafkaSampleConstant.SIMPLE_TOPIC_NAME)) {
return SimpleType.constructUnsafe(HvlKafkaSampleSimpleModel.class);
}
return null;
}
}
Type resolver
tanımlaması yapıldıktan sonra application.yml dosyasına aşağıdaki konfigürasyonun eklenmesi ve tanımlanan type resolver metodunun adreslenmesi gerekmektedir.
spring:
kafka:
consumer:
properties:
spring.json.value.type.method: tr.com.havelsan.javarch.samples.kafka.common.configuration.HvlKafkaSampleCommonConfiguration.typeResolver
Bu işlemler yapıldıktan sonra aşağıdaki örnekteki gibi consumer
methodu yazılabilir.
@HvlTransactionalRollbackForCheckedException
@KafkaListener(topics = HvlKafkaSampleConstant.SIMPLE_TOPIC_NAME)
public void consumeAsSimpleModel(HvlKafkaSampleSimpleModel kafkaModel) {
LOGGER.info("Message simple model consumed!" + kafkaModel.toString());
}
Kafka örneği ve detaylı kullanım ile ilgili tam bir örnek kod için hvl-javalt-samples projesindeki örnek incelenebilir.
Inbox Consumer#
Inbox Consumer, Kafka mesajlarının güvenilir bir şekilde işlenmesini sağlayan veritabanı destekli bir yapıdır. Bu mekanizma, tüketilen mesajları öncelikle veritabanında hvl_kafka_inbox_message
tablosunda saklar ve böylece mesajların yalnızca bir kez işlenmesini (idempotency) garanti eder. Sistem çökmeleri veya yeniden başlatılma durumlarında aynı mesajın tekrar işlenmesini önler, duplikasyon sorunlarını ortadan kaldırır. Otomatik temizleme özelliği sayesinde işlenmiş eski kayıtlar belirli bir süre sonra sistemden temizlenir ve veritabanı kaynaklarının verimli kullanılması sağlanır.
Konfigürasyon#
hvl-kafka
paketi yerine hvl-kafka-box
paketi kullanılmalıdır.
hvl-infra altında bulunan application-hvl-scheduling.yml, application-kafka.yml dosyasıyla konfigure edilebilmektedir. hvl.core.kafka.consumer.inbox
ile başlayan konfigürasyonlar incelenebilir.
Liquibase kullanılıyorsa gerekli tabloların oluşması için aşağıdaki şekilde liquibase root yaml dosyasına ekleme yapılmalıdır.
hvl-javalt-samples projesinde örnek uygulamaya module/kafka/kafka-box
pathi takip edilerek ulaşılabilir.
Kullanım#
Bu sistemde mesajların inbox pattern ile tüketilebilmesi için 2 yöntem sunulmaktadır.
-
HvlKafkaInboxConsumer
kullanımı.Bu yöntem, arka plandan sağlanan bir consumer üzerinden sağlanır. Bu yöntem arka planda yapılan işler ve consume sırasında yapılan işler için otomatik olarak tek bir transaction oluşturur. Hata durumunda rollback yapılır.
-
@HvlKafkaInboxListener
anotasyonu kullanımı.Bu yöntemde aspect kullanılır. Transactional olarak davranması için
@HvlTransactionalRollbackForCheckedException
anotasyonu da kullanılmalıdır.
Bu 2 yöntemin de kullanımı aşağıdaki örnekteki gibi gösterilmiştir.
@Component
public class HvlKafkaBoxSampleConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(HvlKafkaBoxSampleConsumer.class);
private final HvlKafkaSampleRepository kafkaSampleRepository;
private final HvlKafkaInboxConsumer<String, HvlKafkaBoxSampleEvent> kafkaInboxConsumer;
public HvlKafkaBoxSampleConsumer(HvlKafkaSampleRepository kafkaSampleRepository,
HvlKafkaInboxConsumer<String, HvlKafkaBoxSampleEvent> kafkaInboxConsumer) {
this.kafkaSampleRepository = kafkaSampleRepository;
this.kafkaInboxConsumer = kafkaInboxConsumer;
this.kafkaInboxConsumer.subscribe("kafka_sample_box_topic0", this::consume);
}
@HvlTransactionalRollbackForCheckedException
@HvlKafkaInboxListener(topics = "kafka_sample_box_topic1")
public void consumeWithAnnotation(ConsumerRecord<String, HvlKafkaBoxSampleEvent> consumerRecord) {
consume(consumerRecord);
}
private void consume(ConsumerRecord<String, HvlKafkaBoxSampleEvent> consumerRecord) {
final HvlKafkaBoxSampleEvent value = consumerRecord.value();
LOGGER.info("Message consumed! {}", value.toString());
kafkaSampleRepository.save(new HvlKafkaSampleEntity(value.getId(), value.getName(), consumerRecord.topic()));
}
}
Uyarı
Performans için HvlKafkaInboxConsumer
kullanımı tavsiye edilir.
hvl_kafka_inbox_message
tablosunda çok veri birikme durumu için arka planda bir temizleyici işi çalışmaktadır.
Kafka Docker-Compose Uygulaması#
Kafka docker compose örneklerine javalt framework'ün sağladığı infra projesi üzerinden ulaşılabilir.
Kafka docker compose: kafka-docker-compose.yml
Kafka cluster docker compose: kafka-cluster-docker-compose.yml