Ana içeriğe geç

1.8. Message Broker#

Uygulamalarda message queue yapısının kullanmasını sağlayan altyapı bileşenidir. Kafka desteği sağlanmaktadır.

Kafka#

Neden bu altyapıyı tercih etmeliyim:

  • Spring kafka ile tamamen entegre çalışmaktadır. Herhangi bir kısıtlama yapılmamıştır.
  • Transaction yönetimi için iyileştirmeler ve kullanım kolaylığı sağlanmıştır.
  • Kafka konfigüratif health check mekanizması barındırmaktadır.
  • Kafkayı konfigüratif olarak devreye alıp, devreden çıkarma mekanizması sağlanmaktadır.
  • Retry edilmeyecek exceptionların konfigüratif olarak ayarlanabilmesine olanak sağlar.
  • Konfigüratif olarak tanımlanan exception'lara göre Dead Letter Queue'lar oluşturulmasına olanak sağlar.

Kafka ürünü ve çalışma yapısı ile ilgili detaylı bilgiye Kafka sayfasından ulaşılabilir.


drawing Nasıl konfigure edilir?

hvl-infra altında bulunan 'application-kafka.yml' dosyasıyla konfigure edilebilmektedir.


drawing Uygulamaya nasıl eklenir?
group: 'tr.com.havelsan.framework.mq', name: 'hvl-kafka'

YML Konfigürasyonları#

hvl:
  core:
    kafka:
      healt-check:
        enabled: ${KAFKA_HEALTH_CHECK_ENABLED:true}
        timeout: ${KAFKA_HEALTH_CHECK_TIMEOUT:2000}
      support:
        enabled: ${KAFKA_SUPPORT_ENABLED:false}
      transaction:
        enabled: ${KAFKA_TRANSACTION_ENABLED:false}
      consumer:
        retries: ${KAFKA_CONSUMER_RETRIES:5}
        nonRetriableExceptions: ${KAFKA_NON_RETRIABLE_EXCEPTIONS:java.lang.IllegalArgumentException}
        exceptionToDltMapping:
          java.lang.NullPointerException: NPE

spring:
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_ADDRESS:http://localhost:9092,http://localhost:9093,http://localhost:9094}
    consumer:
      group-id: ${KAFKA_CONSUMER_GROUP_ID:hvl-event-logger}
      properties:
        retry.backoff.ms: ${KAFKA_RETRY_BACKOFF_MS:1000}
    listener:
      concurrency: ${KAFKA_LISTENER_CONCURRENCY:3}
    producer:
      properties:
        transactional.id: ${KAFKA_PRODUCER_TRANSACTIONAL_ID:logtx_}
    security:
      protocol: ${KAFKA_SECURITY_PROTOCOL:PLAINTEXT}

hvl.core.kafka.healt-check.enabled: Uygulama açılışında kafka'ya bağlantıyı kontrol eder. Bağlantı sağlamazsa uygulamayı kapatır.

hvl.core.kafka.support.enabled: Kafka altyapısını devreye sokar.

hvl.core.kafka.transaction.enabled: Kafka transaction'ı devreye sokar.

hvl.core.kafka.consumer.retries: Hata alınması durumunda kaç kere retry edileceği bilgisi.

hvl.core.kafka.consumer.nonRetriableExceptions: Retry edilmeden direkt olarak Dead Letter Queue(DLT)'ya atılacak exception listesi bilgisi. Yukarıdaki örnekte IllegalArgumentException alınması durumda retry edilmeceği ayarlanmıştır.

hvl.core.kafka.consumer.exceptionToDltMapping: Exception tipine göre oluşturulacak DLT mappingi. Yukarıdaki örnekte; consumer tarafından NullPointerException alınırsa, 5 kere tekrar denenip başarılı olamadığı durumda [TOPIC_NAME]_NPE.DLT şeklinde bir DLT topic oluşturulacak ve başarısız mesaj buraya yazılacaktır.

spring.kafka.consumer.group-id: Consumer'ların group id bilgisidir. Bu bilgi ile kafka topiclerine register olurlar. Her consumer için farklı group id verilmek istenirse @KafkaListener anotasyonu içerisinde özelleştirilebilir. Consumer başlığı altında detaylı örnek verilecektir.

spring.kafka.consumer.properties.retry.backoff.ms: Her bir tekrar deneme(retry) arasında beklenilecek süre. (milisaniye cinsinden)

spring.kafka.listener.concurrency: Her bir consumer'ın thread sayısı. Örnek olarak concurreny 3 ise, her bir consumer'dan 3 adet oluşturulur gibi düşünülebilir.

spring.kafka.producer.properties.transactional.id: Transaction için kullanılacak id prefix bilgisidir. Transaction devreye alınırsa bu alan doldurulmak zorundadır.

Producer#

Bağımlılıklar başlığında belirtilen bağımlılık eklendikten sonra, Kafkaya mesaj göndermek için alttaki gibi basit bir kod kullanılabilir. Mesajları gönderirken HvlEventNotifier kullanılmalıdır. İsteğe göre sync veya async mesaj gönderimi yapılabilir.

@Autowired
private final HvlEventNotifier eventNotifier;

public void send() {
    .
    .
    eventNotifier.notifySync(exampleEvent);
    .
    .
}

HvlEventNotifier.java

public interface HvlEventNotifier {

    /**
     * <code>HvlEvent.getName()</code> ile belirtilen topic'e <code>HvlEvent</code> tipindeki event'i async olarak gondermek amaciyla kullanilir
     *
     * @param event    Gonderilecek event ve topic bilgisi'
     * @param callback async cagrinin success ve fail durumlarinda yapilacaklarin belirtildigi callback sinifi
     * @return ListenableFuture
     * @see ListenableFuture org.springframework.util.concurrent.ListenableFuture
     */
    <E extends HvlEvent> ListenableFuture<?> notifyAsync(E event, ListenableFutureCallback<SendResult<Integer, String>> callback);

    /**
     * Ozel bir topic'e <code>HvlEvent</code> tipindeki event'i async olarak gondermek amaciyla kullanilir
     *
     * @param topicName topic ismi
     * @param event     Gonderilecek event ve topic bilgisi'
     * @param callback  async cagrinin success ve fail durumlarinda yapilacaklarin belirtildigi callback sinifi
     * @return ListenableFuture
     * @see ListenableFuture org.springframework.util.concurrent.ListenableFuture
     */
    <E extends HvlEvent> ListenableFuture<?> notifyAsync(String topicName, E event, ListenableFutureCallback<SendResult<Integer, String>> callback);

    /**
     * <code>HvlEvent.getName()</code> ile belirtilen topic'e <code>HvlEvent</code> tipindeki event'i sync olarak gondermek amaciyla kullanilir
     *
     * @param event Gonderilecek event ve topic bilgisi'
     * @return ListenableFuture
     * @see ListenableFuture org.springframework.util.concurrent.ListenableFuture
     */
    <E extends HvlEvent> SendResult<Integer, String> notifySync(E event) throws HvlKafkaProducerException;

    /**
     * Ozel bir topic'e <code>HvlEvent</code> tipindeki event'i sync olarak gondermek amaciyla kullanilir
     *
     * @param topicName topic ismi
     * @param event     Gonderilecek event ve topic bilgisi'
     * @return ListenableFuture
     * @see ListenableFuture org.springframework.util.concurrent.ListenableFuture
     */
    <E extends HvlEvent> SendResult<Integer, String> notifySync(String topicName, E event) throws HvlKafkaProducerException;

}

Kafka Transaction#

Eğer kafka transaction aktif ise transaction başlatılmak istenen metodun başına @Transactional anotasyonu eklemek yeterli olacaktır.

@Autowired
private final HvlEventNotifier eventNotifier;

@Transactional
public void send() {
    .
    .
    eventNotifier.notifySync(exampleEvent);
    eventNotifier.notifySync(exampleEvent2);
    eventNotifier.notifySync(exampleEvent3);
    .
    .
}

Consumer#

Bağımlılıklar başlığında belirtilen bağımlılık eklendikten sonra, aşağıda kafkadan gelecek mesajları okuyan basit bir consumer örneği verilmiştir. topics alanında verilen değer hangi kafka topic'ine register olacağı bilgisidir. Bunun yanında consumer group id, error handler, concurrecy gibi bilgiler de manuel olarak buradan konfigüre edilebilir. Buradan verilecek değerler default olarak altyapıdan gelen ve yml'dan gelen bilgileri yok sayar. Özetle, @KafkaListener içerisinden verilen değerler en üst önceliklidir.

@KafkaListener(topics = EVENT_LOG_TOPIC)
public void consumeEventLog(ExampleEvent event) {
    //do stuff
}

Kafka Docker-Compose Uygulaması#

Kafka docker compose örneklerine javalt framework'ün sağladığı infra projesi üzerinden ulaşılabilir.

Kafka docker compose: kafka-docker-compose.yml

Kafka cluster docker compose: kafka-cluster-docker-compose.yml