Ana içeriğe geç

Kafka Transactions#

Spring Boot ve Spring Kafka ile çalışırken Kafka’nın transaction kabiliyetini devreye almak için aşağıdaki property tanımını yapmak yeterlidir.

spring.kafka.producer.transaction-id-prefix=tx-

transaction-id-prefix tanımı sayesinde Spring Boot Kafka AutoConfiguration’ı bir KafkaTransactionManager bean’i tanımlamaktadır. transaction-id-prefix tanımının yanı sıra Kafka transaction’ları ile çalışırken aşağıdaki property tanımlarını yapmak da önem arz etmektedir. Bunun yanında broker tarafından yönetilen transaction state log topic'i için default replication factor 3'dür.

spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.isolation-level=read_committed

enable-auto-commit=false olarak set edildiğinde container, offset bilgisini Kafka Transaction’a ancak MessageListener başarılı biçimde sonlandığı vakit gönderecektir. enable-auto-commit=true olma durumunda ise consumer offset bilgisi transaction’ın durumuna bakılmaksızın periyodik olarak gönderilmektedir. auto.commit.interval.ms property’si milliseconds düzeyinde bu periyodu yönetmeyi sağlar.

isolation-level=read_committed ise MessageListener, yani consumer’ların sadece commit’lenmiş transaction içerisinden veya transaction olmaksızın gönderilmiş mesajları okumasına olanak tanır. Kafka Broker read_committed durumunda abort/rollback olmuş transaction’a ait gönderilmiş mesajların consumer tarafından okunmasına izin vermez.

Kafka transaction kabiliyetinin devreye alınması ile birlikte artık KafkaTemplate kullanarak yapılacak olan send işlemlerinin aktif bir transaction içerisinde yapılması zorunlu hale gelmiştir. Bunun için Spring’in dekleratif transaction yönetim kabiliyetinden yararlanılabilir. @Transactional anotasyonunu metot veya sınıf düzeyinde kullanabilirsiniz.

Spring transaction yönetim altyapısının en önemli temel kabiliyetlerinden birisi “transaction senkronizasyon” kabiliyetidir. Bu kabiliyet sayesinde uygulamadaki transactional metotlar içerisinde bir takım işlemleri hemen o an değil, aktif transaction sonlanırken (commit, rollback veya her ikisi durumunda da) çalışmalarını sağlayabiliriz. Ancak KafkaTransactionManager bu transaction senkronizasyon kabiliyetini default olarak devre dışı bırakmıştır. Buna neden olarak da Kafka transaction’larının ve KafkaTransactionManager’ın genellikle JDBC DataSourceTransactionManager gibi datastore tabanlı başka bir TransactionManager ile birlikte kullanılması gösterilmektedir. Bu birlikte kullanım da Spring Data projesinin sunduğu ChainedTransactionManager vasıtası ile olmalıdır. Spring Kafka projelerinde ChainedTransactionManager olarak ChainedKafkaTransactionManager alt sınıfı kullanılmalıdır.

Kafka Streams#

Kafka Streams, Kafka’da depolanan verilerin anlık olarak işlenmesi ve analiz edilmesi için geliştirilmiş bir client kütüphanesidir. Girdi ve çıktı verilerinin Kafka’da saklandığı, ölçeklenebilir, esnek, hataya dayanıklı, dağıtık uygulamalar ve mikro servisler geliştirmek için kullanılır. Kafka Streams API, temel işinize güç katacak gerçek zamanlı uygulamalar oluşturmanıza imkan tanır. Akan veri üzerinden hesaplamalar yapmamızı sağlar. Arkasındaki Kafka Cluster ile entegre çalışır.

Aktarılan verilerin şifrelenmesini destekler. Kimlik doğrulama ve yetkilendirmeyi destekler

Kafka Security#

Kafkada authentication ve authorization yapmak mümkündür. Bunun dışında SSL ile veri şifrelemesini de destekler.

Authentication#

  • SSL Authentication
  • SASL Authentication
    • SASL PLAINTEXT
    • SASL SCRAM
    • SASL GSSAPI (Kerberos)

Authorization (ACL)#

Erişim kullanıcı bazlı kısıtlanmak istenirse yetkilendirme yapılabilir.

kafka-acl --topic test --producer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice

Sonuç:

Adding ACLs for resource \Topic:test:

User:alice has Allow permission for operations: Describe from hosts: *

User:alice has Allow permission for operations: Write from hosts: *

Adding ACLs for resource \Cluster:kafka-cluster:

User:alice has Allow permission for operations: Create from hosts: *