Ana içeriğe geç

1.9. Message Queue#

Uygulamalarda message queue yapısının kullanmasını sağlayan altyapı bileşenidir. Kafka desteği sağlanmaktadır.

Kafka#

Neden bu altyapıyı tercih etmeliyim:

  • Spring kafka ile tamamen entegre çalışmaktadır. Herhangi bir kısıtlama yapılmamıştır.
  • Transaction yönetimi için iyileştirmeler ve kullanım kolaylığı sağlanmıştır.
  • Kafka konfigüratif health check mekanizması barındırmaktadır.
  • Kafkayı konfigüratif olarak devreye alıp, devreden çıkarma mekanizması sağlanmaktadır.
  • Retry edilmeyecek exceptionların konfigüratif olarak ayarlanabilmesine olanak sağlar.
  • Konfigüratif olarak tanımlanan exception'lara göre Dead Letter Queue'lar oluşturulmasına olanak sağlar.
  • Bağlantı kopması ve sunucu çökmeleri sonucu veri kaybı durumlarının sıfıra indirildiği çözüm yöntemi sağlar.
  • Bağlantı problemleri gibi durumlar için sonsuza kadar tekrar edilebilmeyi sağlayacak exception tanımları yapılabilir. Tekrar özellikleri konfigüratiftir.

Kafka ürünü ve çalışma yapısı ile ilgili detaylı bilgiye Kafka sayfasından ulaşılabilir.


drawing Nasıl konfigure edilir?

hvl-infra altında bulunan 'application-kafka.yml' dosyasıyla konfigure edilebilmektedir.


drawing Uygulamaya nasıl eklenir?
group: 'tr.com.havelsan.framework.mq', name: 'hvl-kafka'

YML Konfigürasyonları#

hvl:
  core:
    kafka:
      support:
        enabled: ${KAFKA_SUPPORT_ENABLED:false}
      consumer:
        retries: ${KAFKA_CONSUMER_RETRIES:5}
        non-retryable-exceptions: ${KAFKA_NON_RETRYABLE_EXCEPTIONS:java.lang.IllegalArgumentException}
        exception-to-dlt-mapping:
          java.lang.NullPointerException: NPE
        retryable-exception-properties:
          exceptions: ${KAFKA_RETRYABLE_EXCEPTIONS:java.net.ConnectException}
          backoff:
            interval-ms: ${KAFKA_RETRYABLE_BACKOFF_INTERVAL_MS:2000}
            multiplier: ${KAFKA_RETRYABLE_BACKOFF_MULTIPLIER:1.5}
            max-interval-ms: ${KAFKA_RETRYABLE_BACKOFF_MAX_INTERVAL_MS:60000}
        log-interceptor:
          enabled: ${KAFKA_CONSUMER_LOG_INTERCEPTOR_ENABLED:false}
          index-name: ${KAFKA_CONSUMER_LOG_INTERCEPTOR_INDEX_NAME:kafka-logs-${spring.application.name}}
          topic-name: ${KAFKA_CONSUMER_LOG_INTERCEPTOR_TOPIC_NAME:javalt-kafka-message-logs}
        inbox:
          enabled: ${KAFKA_CONSUMER_INBOX_ENABLED:true}
          cleaner:
            enabled: ${KAFKA_CONSUMER_INBOX_CLEANER_ENABLED:false}
            job-cron: ${KAFKA_CONSUMER_INBOX_CLEANER_JOB_CRON:0 0 0 ? * SUN *}
            page-size: ${KAFKA_CONSUMER_INBOX_CLEANER_PAGE_SIZE:1000}
            ttl-hours: ${KAFKA_CONSUMER_INBOX_CLEANER_TTL_HOURS:168}
      producer:
        jpa:
          enabled: ${KAFKA_PRODUCER_JPA_ENABLED:true}
          job-duration: ${KAFKA_PRODUCER_JPA_JOB_DURATION:PT5M}
          page-size: ${KAFKA_PRODUCER_JPA_PAGE_SIZE:1000}
          chunk-size: ${KAFKA_PRODUCER_JPA_CHUNK_SIZE:100}
        outbox:
          job-duration: ${KAFKA_PRODUCER_OUTBOX_JOB_DURATION:PT5S}
          page-size: ${KAFKA_PRODUCER_OUTBOX_PAGE_SIZE:1000}
          chunk-size: ${KAFKA_PRODUCER_OUTBOX_CHUNK_SIZE:100}
          max-retry: ${KAFKA_PRODUCER_OUTBOX_MAX_RETRY:5}
          cleaner:
            enabled: ${KAFKA_PRODUCER_OUTBOX_CLEANER_ENABLED:false}
            job-cron: ${KAFKA_PRODUCER_OUTBOX_CLEANER_JOB_CRON:0 0 0 ? * SUN *}
            page-size: ${KAFKA_PRODUCER_OUTBOX_CLEANER_PAGE_SIZE:1000}
            ttl-hours: ${KAFKA_PRODUCER_OUTBOX_CLEANER_TTL_HOURS:168}
        log-interceptor:
          enabled: ${KAFKA_PRODUCER_LOG_INTERCEPTOR_ENABLED:false}
          index-name: ${KAFKA_PRODUCER_LOG_INTERCEPTOR_INDEX_NAME:kafka-logs-${spring.application.name}}
          topic-name: ${KAFKA_PRODUCER_LOG_INTERCEPTOR_TOPIC_NAME:javalt-kafka-message-logs}
management:
  health:
    kafka:
      enabled: ${KAFKA_HEALTH_CHECK_ENABLED:true}
spring:
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_ADDRESS:hvlkafka:19092,hvlkafka2:29092,hvlkafka3:39092}
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      group-id: ${KAFKA_CONSUMER_GROUP_ID:${spring.application.name}}
      isolation-level: read-committed
      properties:
        retry.backoff.ms: ${KAFKA_RETRY_BACKOFF_MS:1000}
        value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
        spring.deserializer.value.delegate.class: tr.com.havelsan.javarch.kafka.json.deserializer.HvlKafkaJsonDeserializer
    listener:
      concurrency: ${KAFKA_LISTENER_CONCURRENCY:3}
    producer:
      client-id: ${spring.application.name}:${random.uuid}
      value-serializer: tr.com.havelsan.javarch.kafka.json.serizalizer.HvlKafkaJsonSerializer
      #      acks: all
      retries: ${KAFKA_PRODUCER_RETRIES:}
      transaction-id-prefix: ${KAFKA_PRODUCER_TRANSACTIONAL_ID_PREFIX:${spring.kafka.producer.client-id}-tx_}
    security:
      protocol: ${KAFKA_SECURITY_PROTOCOL:PLAINTEXT}

management.health.kafka.enabled: Uygulama açılışında kafka'ya bağlantıyı kontrol eder.

hvl.core.kafka.support.enabled: Kafka altyapısını devreye sokar.

hvl.core.kafka.consumer.retries: Hata alınması durumunda kaç kere retry edileceği bilgisi.

hvl.core.kafka.consumer.non-retryable-exceptions: Retry edilmeden direkt olarak Dead Letter Queue(DLT)'ya atılacak exception listesi bilgisi. Yukarıdaki örnekte IllegalArgumentException alınması durumda retry edilmeceği ayarlanmıştır.

hvl.core.kafka.consumer.exception-to-dlt-mapping: Exception tipine göre oluşturulacak DLT mappingi. Yukarıdaki örnekte; consumer tarafından NullPointerException alınırsa, 5 kere tekrar denenip başarılı olamadığı durumda [TOPIC_NAME]_NPE.DLT şeklinde bir DLT topic oluşturulacak ve başarısız mesaj buraya yazılacaktır.

hvl.core.kafka.consumer.retryable-exception-properties.exceptions: Consume sırasında verilen exception sınıflarının oluşması durumunda aynı mesaj sürekli belirli aralıklarla consume edilmeye devam edecektir. Aralıklar backoff property'si altından ayarlanabilmektedir.

hvl.core.kafka.consumer.retryable-exception-properties.backoff.interval-ms: Consume sırasında retryable exceptionlardan biri alınırsa tekrardan önceki ilk bekleme süresini belirtir.

hvl.core.kafka.consumer.retryable-exception-properties.backoff.multiplier: Consume sırasında retryable exceptionlardan biri alınırsa ilk tekrardan sonraki beklemeler için bekleme süresi çarpanıdır.

hvl.core.kafka.consumer.retryable-exception-properties.backoff.max-interval-ms: Consume sırasında retryable exceptionlardan biri alınırsa artan tekrar sürelerinin maksimum değeridir. Bekleme süresi bu değerden daha büyük olamaz.

hvl.core.kafka.consumer.log-interceptor.enabled: Kafka consumer'ları tarafından tüketilen mesajların loglanmasını aktif eder. Loglar kafkaya yazılır, oradan fluentd ile elastic üzerine aktarılır.

hvl.core.kafka.consumer.log-interceptor.index-name: Elasticsearch'e aktarılan logların hangi index üzerinde saklanacağı bilgisidir.

hvl.core.kafka.consumer.log-interceptor.topic-name: Consumer loglarının gönderileceği kafka topic ismi bilgisidir.

hvl.core.kafka.consumer.inbox.enabled: Kafka consumer inbox özelliğini aktif eder. Bu özellik sayesinde tüketilen mesajlar veritabanında saklanarak yeniden işleme durumlarında idempotent davranış sağlanır.

hvl.core.kafka.consumer.inbox.cleaner.enabled: Kafka consumer inbox temizleme işlemini aktif eder. Bu özellik sayesinde eski inbox kayıtları otomatik olarak temizlenir.

hvl.core.kafka.consumer.inbox.cleaner.job-cron: Inbox temizleme işlemi için çalışacak cron ifadesidir. Varsayılan olarak her pazar günü gece yarısı çalışır.

hvl.core.kafka.consumer.inbox.cleaner.page-size: Inbox temizleme işlemi sırasında veritabanından bir seferde çekilecek maksimum kayıt sayısıdır.

hvl.core.kafka.consumer.inbox.cleaner.ttl-hours: Inbox kayıtlarının kaç saat saklanacağını belirler. Varsayılan olarak 168 saat (7 gün) sonra kayıtlar silinir.

hvl.core.kafka.producer.jpa.enabled: Kafka'ya mesaj gönderirken veritabanı desteğini devreye alır. Böylece veri kaybının önüne geçilmiş olur. Bunu kullanmak için hvl-kafka paketi yerine hvl-kafka-jpa paketi kullanılmalıdır.

hvl.core.kafka.producer.jpa.job-duration: Kafka işlemleri sırasında kesinti yaşandığında veritabanına atılan kayıtların işlenme aralığıdır. ISO 8601 formatı kullanılabilir. Varsayılan PT5M; 5 dakikada bir çalışması anlamına gelir.

hvl.core.kafka.producer.jpa.page-size: Kafka işlemleri sırasında kesinti yaşandığında veritabanına atılan kayıtların işlenirken veritabanından çekilen veri miktarıdır.

hvl.core.kafka.producer.jpa.chunk-size: Kafka işlemleri sırasında kesinti yaşandığında veritabanına atılan kayıtların işlenirken kullanılan buffer miktarıdır.

hvl.core.kafka.producer.outbox.job-duration: Outbox kayıtlarının işlenme aralığıdır. ISO 8601 formatı kullanılabilir. Varsayılan PT5S; 5 saniyede bir çalışması anlamına gelir.

hvl.core.kafka.producer.outbox.page-size: Outbox işlemi sırasında veritabanından bir seferde çekilecek maksimum kayıt sayısıdır.

hvl.core.kafka.producer.outbox.chunk-size: Outbox işlemi sırasında kullanılan buffer miktarıdır.

hvl.core.kafka.producer.outbox.max-retry: Outbox kayıtlarının gönderilmesi sırasında maksimum deneme sayısıdır.

hvl.core.kafka.producer.outbox.cleaner.enabled: Outbox temizleme işlemini aktif eder. Bu özellik sayesinde eski outbox kayıtları otomatik olarak temizlenir.

hvl.core.kafka.producer.outbox.cleaner.job-cron: Outbox temizleme işlemi için çalışacak cron ifadesidir. Varsayılan olarak her pazar günü gece yarısı çalışır.

hvl.core.kafka.producer.outbox.cleaner.page-size: Outbox temizleme işlemi sırasında veritabanından bir seferde çekilecek maksimum kayıt sayısıdır.

hvl.core.kafka.producer.outbox.cleaner.ttl-hours: Outbox kayıtlarının kaç saat saklanacağını belirler. Varsayılan olarak 168 saat (7 gün) sonra kayıtlar silinir.

hvl.core.kafka.producer.log-interceptor.enabled: Kafka'ya gönderilen mesajların loglanmasını aktif eder. Loglar kafkaya yazılır, oradan fluentd ile elastic üzerine aktarılır.

hvl.core.kafka.producer.log-interceptor.index-name: Elasticsearch'e aktarılan logların hangi index üzerinde saklanacağı bilgisidir.

hvl.core.kafka.producer.log-interceptor.topic-name: Kafkaya gönderilen mesajların loglarının gönderileceği kafka topic ismi bilgisidir.

spring.kafka.consumer.group-id: Consumer'ların group id bilgisidir. Bu bilgi ile kafka topiclerine register olurlar. Her consumer için farklı group id verilmek istenirse @KafkaListener anotasyonu içerisinde özelleştirilebilir. Consumer başlığı altında detaylı örnek verilecektir.

spring.kafka.consumer.properties.retry.backoff.ms: Her bir tekrar deneme(retry) arasında beklenilecek süre. ( milisaniye cinsinden)

spring.kafka.listener.concurrency: Her bir consumer'ın thread sayısı. Örnek olarak concurreny 3 ise, her bir consumer'dan 3 adet oluşturulur gibi düşünülebilir.

spring.kafka.producer.properties.transactional.id: Transaction için kullanılacak id prefix bilgisidir. Transaction devreye alınırsa bu alan doldurulmak zorundadır.

Producer#

Bağımlılıklar başlığında belirtilen bağımlılık eklendikten sonra, Kafkaya mesaj göndermek için alttaki gibi basit bir kod kullanılabilir. Mesajları gönderirken HvlEventNotifier kullanılmalıdır. İsteğe göre sync veya async mesaj gönderimi yapılabilir. Key, topic, partition gibi bilgilerin gonderimi sağlanabilir.

@Autowired
private final HvlEventNotifier eventNotifier;

public void send() {
    .
    .
    eventNotifier.notifySync(exampleEvent);
    .
    .
}

HvlEventNotifier.java

package tr.com.havelsan.javarch.kafka.notifier;

import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import tr.com.havelsan.javarch.event.commons.HvlEvent;
import tr.com.havelsan.javarch.kafka.exception.HvlKafkaProducerException;

import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
 * {@link KafkaTemplate} kullanilarak kafka uzerinden event firlatmayi saglayan siniftir.
 *
 * @author acuhadaroglu
 */
public interface HvlEventNotifier {

    /**
     * Ozel bir topic'e {@link HvlEvent} tipindeki event'i async olarak gondermek amaciyla kullanilir
     *
     * @param <E>       the type parameter
     * @param topicName topic ismi
     * @param event     Gonderilecek event ve topic bilgisi'
     * @param callback  async cagrinin success ve fail durumlarinda yapilacaklarin belirtildigi callback sinifi CompletableFuture
     * @return the completable future
     * @see SendResult
     */
    <E extends Serializable> CompletableFuture<SendResult<String, Object>> notifyAsync(
            @NotBlank String topicName,
            @NotNull E event,
            @NotNull Consumer<SendResult<String, Object>> callback);

    /**
     * Notify async completable future.
     *
     * @param <E>       the type parameter
     * @param topicName the topic name
     * @param key       the key
     * @param event     the event
     * @param callback  the callback
     * @return the completable future
     */
    <E extends Serializable> CompletableFuture<SendResult<String, Object>> notifyAsync(
            @NotBlank String topicName,
            @NotBlank String key,
            @NotNull E event,
            @NotNull Consumer<SendResult<String, Object>> callback);

    /**
     * Notify async completable future.
     *
     * @param <E>       the type parameter
     * @param topicName the topic name
     * @param partition the partition
     * @param key       the key
     * @param event     the event
     * @param callback  the callback
     * @return the completable future
     */
    <E extends Serializable> CompletableFuture<SendResult<String, Object>> notifyAsync(
            @NotBlank String topicName,
            @NotNull Integer partition,
            @NotBlank String key,
            @NotNull E event,
            @NotNull Consumer<SendResult<String, Object>> callback);

    /**
     * Notify async completable future.
     *
     * @param producerRecord the producer record
     * @param callback       the callback
     * @return the completable future
     */
    CompletableFuture<SendResult<String, Object>> notifyAsync(
            @NotNull ProducerRecord<String, Object> producerRecord,
            @NotNull Consumer<SendResult<String, Object>> callback);

    /**
     * Ozel bir topic'e {@link HvlEvent} tipindeki event'i sync olarak gondermek amaciyla kullanilir
     *
     * @param <E>       the type parameter
     * @param topicName topic ismi
     * @param event     Gonderilecek event ve topic bilgisi' CompletableFuture
     * @return the send result
     * @throws HvlKafkaProducerException the hvl kafka producer exception
     * @see SendResult
     */
    <E extends Serializable> SendResult<String, Object> notifySync(@NotBlank String topicName, @NotNull E event)
            throws HvlKafkaProducerException;


    /**
     * Notify sync send result.
     *
     * @param <E>       the type parameter
     * @param topicName the topic name
     * @param key       the key
     * @param event     the event
     * @return the send result
     * @throws HvlKafkaProducerException the hvl kafka producer exception
     */
    <E extends Serializable> SendResult<String, Object> notifySync(@NotBlank String topicName,
                                                                   @NotBlank String key,
                                                                   @NotNull E event) throws HvlKafkaProducerException;

    /**
     * Notify sync send result.
     *
     * @param <E>       the type parameter
     * @param topicName the topic name
     * @param partition the partition
     * @param key       the key
     * @param event     the event
     * @return the send result
     * @throws HvlKafkaProducerException the hvl kafka producer exception
     */
    <E extends Serializable> SendResult<String, Object> notifySync(@NotBlank String topicName,
                                                                   @NotNull Integer partition,
                                                                   @NotBlank String key,
                                                                   @NotNull E event) throws HvlKafkaProducerException;

    /**
     * Notify sync send result.
     *
     * @param producerRecord the producer record
     * @return the send result
     * @throws HvlKafkaProducerException the hvl kafka producer exception
     */
    SendResult<String, Object> notifySync(@NotNull ProducerRecord<String, Object> producerRecord)
            throws HvlKafkaProducerException;

    /**
     * {@link HvlEvent#getTopic()} ile belirtilen topic'e {@link HvlEvent} tipindeki event'i async olarak gondermek amaciyla kullanilir
     *
     * @param <HE>     the type parameter
     * @param event    Gonderilecek event ve topic bilgisi'
     * @param callback async cagrinin success ve fail durumlarinda yapilacaklarin belirtildigi callback sinifi CompletableFuture
     * @return the completable future
     * @see CompletableFuture
     */
    <HE extends HvlEvent<?>> CompletableFuture<SendResult<String, Object>> notifyAsync(
            @NotNull HE event,
            @NotNull Consumer<SendResult<String, Object>> callback);

    /**
     * Ozel bir topic'e {@link HvlEvent} tipindeki event'i async olarak gondermek amaciyla kullanilir
     *
     * @param <E>       the type parameter
     * @param topicName topic ismi
     * @param event     Gonderilecek event ve topic bilgisi'
     * @param callback  async cagrinin success ve fail durumlarinda yapilacaklarin belirtildigi callback sinifi CompletableFuture
     * @return the completable future
     * @see CompletableFuture
     * @deprecated use {@link #notifyAsync(HvlEvent, Consumer)} instead of this by setting topic in event.
     */
    @Deprecated(since = "3.2.0", forRemoval = true)
    <E extends HvlEvent<?>> CompletableFuture<SendResult<String, Object>> notifyAsync(
            @NotBlank String topicName,
            @NotNull E event,
            @NotNull Consumer<SendResult<String, Object>> callback);

    /**
     * {@link HvlEvent#getTopic()} ile belirtilen topic'e {@link HvlEvent} tipindeki event'i sync olarak gondermek amaciyla kullanilir
     *
     * @param <E>   the type parameter
     * @param event Gonderilecek event ve topic bilgisi CompletableFuture
     * @return the send result
     * @throws HvlKafkaProducerException the hvl kafka producer exception
     * @see CompletableFuture
     */
    <E extends HvlEvent<?>> SendResult<String, Object> notifySync(@NotNull E event) throws HvlKafkaProducerException;

    /**
     * Ozel bir topic'e {@link HvlEvent} tipindeki event'i sync olarak gondermek amaciyla kullanilir
     *
     * @param <E>       the type parameter
     * @param topicName topic ismi
     * @param event     Gonderilecek event ve topic bilgisi' CompletableFuture
     * @return the send result
     * @throws HvlKafkaProducerException the hvl kafka producer exception
     * @see CompletableFuture
     * @deprecated use {@link #notifySync(String, Serializable)} instead of this by setting topic in event.
     */
    @Deprecated(since = "3.2.0", forRemoval = true)
    <E extends HvlEvent<?>> SendResult<String, Object> notifySync(@NotBlank String topicName, @NotNull E event)
            throws HvlKafkaProducerException;

}

Veritabanı Destekli Producer (Safe Box)#

Kafka'ya mesaj gönderilirken, kafka'nın çökmesi, network problemleri ve ardından uygulama çökmesi gibi durumlara karşı veritabanı destekli kafka mesajı gönderme yapılabilir. Sadece hatalı mesajları veritabanında tutmaktadır ve bu sayede, veritabanında kafka mesajları ayrıca yer tutmayacaktır. Hatalı mesajlar da işlendikten sonra veritabanından temizlenmektedir. Bundan dolayı performans kaybı minimum seviyede olacaktır.

Konfigürasyon#

hvl-kafka paketi yerine hvl-kafka-jpa paketi kullanılmalıdır.

hvl-infra altında bulunan 'application-hvl-scheduling.yml' dosyasıyla konfigure edilebilmektedir.

Liquibase kullanılıyorsa gerekli tabloların oluşması için aşağıdaki şekilde liquibase root yaml dosyasına ekleme yapılmalıdır.

databaseChangeLog:
  - include:
      file: liquibase/hvl-kafka-jpa/changelog-root.yaml

hvl-javalt-samples projesinde örnek uygulamaya module/kafka/kafka-jpa pathi takip edilerek ulaşılabilir.

Outbox Producer#

Outbox Producer, Kafka mesajlarının güvenli bir şekilde iletilmesini sağlayan, veritabanı destekli bir mesajlaşma mekanizmasıdır. Bu yapı sayesinde:

  • Mesajlar önce veritabanında bir hvl_kafka_outbox_message tablosuna kaydedilir ve sonra Kafka'ya gönderilir, böylece veri kaybı önlenir.
  • Uygulama çökmesi veya Kafka'ya bağlantı sorunları gibi durumlarda bile mesajların kaybolmaması garanti edilir.
  • Periyodik çalışan bir iş ile bekleyen mesajlar otomatik olarak işlenir ve Kafka'ya gönderilir.
  • İşlenmiş eski mesajlar için otomatik temizleme mekanizması sağlanır.
  • Veri tutarlılığı için veritabanı işlemleri ile mesaj gönderimi arasında transactional bütünlük sağlanır.

Bu yaklaşım, özellikle yüksek güvenilirlik gerektiren sistemlerde veri kaybını önlemek için kullanılır ve "at-least-once" mesaj iletim garantisi sunar.

Konfigürasyon#

hvl-kafka paketi yerine hvl-kafka-box paketi kullanılmalıdır.

hvl-infra altında bulunan application-hvl-scheduling.yml, application-kafka.yml dosyasıyla konfigure edilebilmektedir. hvl.core.kafka.producer.outbox ile başlayan konfigürasyonlar incelenebilir.

Liquibase kullanılıyorsa gerekli tabloların oluşması için aşağıdaki şekilde liquibase root yaml dosyasına ekleme yapılmalıdır.

databaseChangeLog:
  - include:
      file: liquibase/hvl-kafka-box/changelog-root.yaml

hvl-javalt-samples projesinde örnek uygulamaya module/kafka/kafka-box pathi takip edilerek ulaşılabilir.

Kullanım#

Bu sistemde mesajların kafkaya iletilmesi için öncelikle hvl_kafka_outbox_message yazılması gerekmektedir. Bunun için HvlKafkaOutboxService sınıfı kullanılmalıdır. Bu sınıf inject edilerek içerisindeki publish metodları kullanılarak outbox tablosuna kayıt yazılabilir.

final HvlKafkaOutboxModel<HvlKafkaOutboxEvent> kafkaOutboxModel = HvlKafkaOutboxModel.builder()
        .withTopic(topic)
        .withKey("test")
        .withPartition(0)
        .withMessage(new HvlKafkaBoxSampleEvent(requestId, userId, "Arda", "Çuhadaroğlu"))
        .build();
kafkaOutboxService.publish(kafkaOutboxModel);

hvl_kafka_outbox_message tablosuna yazılan mesajların işlenip kafkaya gönderilmesi için arka planda bir iş çalışmaktadır. Bu iş varsayılan olarak 5 saniyede bir çalışmakta olup application-kafka.yml dosyasındaki hvl.core.kafka.producer.outbox.job-duration konfigürasyonu ile süresi değiştirilebilmektedir.

hvl_kafka_outbox_message tablosunda çok veri birikme durumu için arka planda bir temizleyici işi çalışmaktadır. Bu iş vasıtasıyla tabloda durumu SENT olan kayıtların temizlenmesi sağlanmaktadır.

Kafka Transaction#

Eğer kafka transaction aktif ise transaction başlatılmak istenen metodun başına @Transactional veya eksenin sağladığı @HvlTransactionalRollbackForCheckedException anotasyonunu eklemek yeterli olacaktır.

@Autowired
private final HvlEventNotifier eventNotifier;

@HvlTransactionalRollbackForCheckedException
public void send() {
    .
    .
    eventNotifier.notifySync(exampleEvent);
    eventNotifier.notifySync(exampleEvent2);
    eventNotifier.notifySync(exampleEvent3);
    .
    .
}

Consumer#

Bağımlılıklar başlığında belirtilen bağımlılık eklendikten sonra, aşağıda kafkadan gelecek mesajları okuyan basit bir consumer örneği verilmiştir. topics alanında verilen değer hangi kafka topic'ine register olacağı bilgisidir. Bunun yanında consumer group id, error handler, concurrecy gibi bilgiler de manuel olarak buradan konfigüre edilebilir. Buradan verilecek değerler default olarak altyapıdan gelen ve yml'dan gelen bilgileri yok sayar. Özetle, @KafkaListener içerisinden verilen değerler en üst önceliklidir.

@KafkaListener(topics = EVENT_LOG_TOPIC)
public void consumeEventLog(ExampleEvent event) {
    //do stuff
}

Farklı Bir Model Kullanarak Consume Etme İşlemi#

Produce edilen modelin tüm alanlarına ihtiyaç duyulmaması durumunda daha küçük modeller oluşturularak, consume işlemi o model ile yapılabilmektedir.

Uyarı

Oluşturulacak daha küçük modelin değişken isimleri diğer model ile aynı olmalıdır.

Java tip değişim işleminin gerçekleşebilmesi için json deserializer sınıfında kullanılmak üzere type resolver tanımlanması gerekmektedir. Aşağıdaki örnekte görüldüğü üzere public static ve örnekteki parametrelere sahip bir method tanımlanmalıdır.

public class HvlKafkaSampleCommonConfiguration extends HvlBaseConfiguration {

    public static JavaType typeResolver(String topic, byte[] data, Headers headers) {
        if (topic.equals(HvlKafkaSampleConstant.SIMPLE_TOPIC_NAME)) {
            return SimpleType.constructUnsafe(HvlKafkaSampleSimpleModel.class);
        }
        return null;
    }
}

Type resolver tanımlaması yapıldıktan sonra application.yml dosyasına aşağıdaki konfigürasyonun eklenmesi ve tanımlanan type resolver metodunun adreslenmesi gerekmektedir.

spring:
  kafka:
    consumer:
      properties:
        spring.json.value.type.method: tr.com.havelsan.javarch.samples.kafka.common.configuration.HvlKafkaSampleCommonConfiguration.typeResolver

Bu işlemler yapıldıktan sonra aşağıdaki örnekteki gibi consumer methodu yazılabilir.

@HvlTransactionalRollbackForCheckedException
@KafkaListener(topics = HvlKafkaSampleConstant.SIMPLE_TOPIC_NAME)
public void consumeAsSimpleModel(HvlKafkaSampleSimpleModel kafkaModel) {
    LOGGER.info("Message simple model consumed!" + kafkaModel.toString());
}

Kafka örneği ve detaylı kullanım ile ilgili tam bir örnek kod için hvl-javalt-samples projesindeki örnek incelenebilir.

Inbox Consumer#

Inbox Consumer, Kafka mesajlarının güvenilir bir şekilde işlenmesini sağlayan veritabanı destekli bir yapıdır. Bu mekanizma, tüketilen mesajları öncelikle veritabanında hvl_kafka_inbox_message tablosunda saklar ve böylece mesajların yalnızca bir kez işlenmesini (idempotency) garanti eder. Sistem çökmeleri veya yeniden başlatılma durumlarında aynı mesajın tekrar işlenmesini önler, duplikasyon sorunlarını ortadan kaldırır. Otomatik temizleme özelliği sayesinde işlenmiş eski kayıtlar belirli bir süre sonra sistemden temizlenir ve veritabanı kaynaklarının verimli kullanılması sağlanır.

Konfigürasyon#

hvl-kafka paketi yerine hvl-kafka-box paketi kullanılmalıdır.

hvl-infra altında bulunan application-hvl-scheduling.yml, application-kafka.yml dosyasıyla konfigure edilebilmektedir. hvl.core.kafka.consumer.inbox ile başlayan konfigürasyonlar incelenebilir.

Liquibase kullanılıyorsa gerekli tabloların oluşması için aşağıdaki şekilde liquibase root yaml dosyasına ekleme yapılmalıdır.

databaseChangeLog:
  - include:
      file: liquibase/hvl-kafka-box/changelog-root.yaml

hvl-javalt-samples projesinde örnek uygulamaya module/kafka/kafka-box pathi takip edilerek ulaşılabilir.

Kullanım#

Bu sistemde mesajların inbox pattern ile tüketilebilmesi için 2 yöntem sunulmaktadır.

  1. HvlKafkaInboxConsumer kullanımı.

    Bu yöntem, arka plandan sağlanan bir consumer üzerinden sağlanır. Bu yöntem arka planda yapılan işler ve consume sırasında yapılan işler için otomatik olarak tek bir transaction oluşturur. Hata durumunda rollback yapılır.

  2. @HvlKafkaInboxListener anotasyonu kullanımı.

    Bu yöntemde aspect kullanılır. Transactional olarak davranması için @HvlTransactionalRollbackForCheckedException anotasyonu da kullanılmalıdır.

Bu 2 yöntemin de kullanımı aşağıdaki örnekteki gibi gösterilmiştir.

@Component
public class HvlKafkaBoxSampleConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(HvlKafkaBoxSampleConsumer.class);

    private final HvlKafkaSampleRepository kafkaSampleRepository;

    private final HvlKafkaInboxConsumer<String, HvlKafkaBoxSampleEvent> kafkaInboxConsumer;

    public HvlKafkaBoxSampleConsumer(HvlKafkaSampleRepository kafkaSampleRepository,
                                     HvlKafkaInboxConsumer<String, HvlKafkaBoxSampleEvent> kafkaInboxConsumer) {
        this.kafkaSampleRepository = kafkaSampleRepository;
        this.kafkaInboxConsumer = kafkaInboxConsumer;

        this.kafkaInboxConsumer.subscribe("kafka_sample_box_topic0", this::consume);
    }

    @HvlTransactionalRollbackForCheckedException
    @HvlKafkaInboxListener(topics = "kafka_sample_box_topic1")
    public void consumeWithAnnotation(ConsumerRecord<String, HvlKafkaBoxSampleEvent> consumerRecord) {
        consume(consumerRecord);
    }

    private void consume(ConsumerRecord<String, HvlKafkaBoxSampleEvent> consumerRecord) {
        final HvlKafkaBoxSampleEvent value = consumerRecord.value();
        LOGGER.info("Message consumed! {}", value.toString());

        kafkaSampleRepository.save(new HvlKafkaSampleEntity(value.getId(), value.getName(), consumerRecord.topic()));
    }
}

Uyarı

Performans için HvlKafkaInboxConsumer kullanımı tavsiye edilir.

hvl_kafka_inbox_message tablosunda çok veri birikme durumu için arka planda bir temizleyici işi çalışmaktadır.

Kafka Docker-Compose Uygulaması#

Kafka docker compose örneklerine javalt framework'ün sağladığı infra projesi üzerinden ulaşılabilir.

Kafka docker compose: kafka-docker-compose.yml

Kafka cluster docker compose: kafka-cluster-docker-compose.yml