Ana içeriğe geç

4.3. Distributed Transactions#

Dağıtık mimarilerde birden çok transaction bloğunun tek bir transaction gibi yönetilmesi olayıdır. Mimaride temel teknoloji olarak BPMN ve Kafka kullanılmaktadır.

Yetenekleri#

  • Özelleştirilmiş bpmn süreçleri(blackbox) kullanılarak hızlıca süreç tasarımı yapılabilmesi sağlanmaktadır.
  • Bpmn Log Producer uygulaması ile süreç adımlarında ve adım geçişlerinde loglama yapılabilmektedir.
  • Adımlar arasında payload taşınabilmektedir.
  • BusinessKey gibi süreç için gerekli değerler otomatik olarak adımlar arasında dolaştırılmaktadır.
  • TransactionalEntity nesnelerinde süreç içerisinde nesnede değişikilk yapıldığında nesne kilitlenir ve süreç dışarısından değiştirilmesi engellenebilmektedir.
  • Procsys uygulamasından sürecin hangi adımında olduğu takip edilebilir ve geçilen adımlarda basılan loglar görülebilir.

Procsys ile Distributed Transaction Süreci Oluşturma#

Distributed transaction tasarımı için procsys uygulaması kullanılabilir. Aşağıda çizilmiş örnek bir süreç bulunmaktadır. Sürecin bpmn xml'ine hvl-javalt-samples projesinden ulaşılabilir. distributedTransactionSample.bpmn20.xml

Örnekte görüldüğü üzere her bir adımın Run ve Rollback kısımları bulunmaktadır. Bu adımlar için aşağıda bahsedilecek olan hazır tasarlanmış süreçler Call Activity olarak kullanılabilir.

Run Süreç Adımı#

Süreç tasarımında Called Activity nesnesi ile kullanılacak Run adımı sürecinin detayları aşağıdaki gibidir.

Run Özellikleri

distributedTransactionServicePart.xml

Run süreç adımı için aşağıdaki propertylere dikkat edilerek distributed transaction sürecimize yeni bir adım eklenebilir.

Exclusive: Call activity'nin async olarak çalışması için gereklidir. Eğer retry kullanılacaksa bu değerin true olması gerekmektedir.

Called element type: key değeri veya Id değeri seçilebilir, örnekte key seçilmiştir buna karşılık Called element alanına çağırmak istediğimiz activity'nin key'i olan distributedTransactionServicePart kullanılmıştır.

Called element: distributedTransactionServicePart yazılmalıdır. Önceden tasarlanmış Run sürecinin key bilgisidir.

Out parameters: Called activity bitişinde tekrar ana sürece geri dönecek olan parametreleri belirtir. Örneğimizde bu değer payload'dır.

In parameters: Called activity içerisinde kullanılacak parametre bilgisidir. serviceRunTopic parametresi ayarlanmalıdır. Bu parametre süreç adımında hangi kafka topic'ini tetikleyeceğini belirlemektedir.

Inherit variables in sub process: Ana süreçteki değişkenlerin Called activity içerisine aktarılmasını sağlar. Bu değer true olmalıdır.

Inherit business key: Ana sürecin business key değerinin Called activity içerisine aktarılmasını sağlar. Bu değer true olmalıdır.

Rollback Süreç Adımı#

distributedTransactionRollbackPart.xml

Süreç tasarımında Called Activity nesnesi ile kullanılacak Rollback adımı sürecinin detayları Called element dışında aynıdır.

Called element: distributedTransactionRollbackPart yazılmalıdır. Önceden tasarlanmış Rollback sürecinin key bilgisidir.

Rollback adımını tetiklemek için de Boundary Error Event kullanılmalıdır. Detayları aşağıdaki gibi olmalıdır.

Transactional Entity Kullanımı#

Süreç başladıktan sonra süreç içerisinde kullanılacak entity'nin başka bir uygulama tarafından değiştirilmesi engellenmesi gerekebilir. Örneğin bir personel güncelleme süreci ilerlerken, o sırada ekrandan ya da farklı bir uygulamadan işlem yapılan personele müdahale edilmemesi gerekir. Aksi durumda istenmeyen sonuçlar oluşabilir.

Gradle Dependencies

api (
        [group: 'tr.com.havelsan.framework.bpmn', name: 'hvl-bpmn-dt-data-jpa-provider']
)

Bpmn süreçlerinde kilitlenmesi istenen entity HvlTransactionalHardDeleteEntity veya HvlTransactionalSoftDeleteEntity nesnelerinden türemelidir.

HvlBpmnDTSampleEntity.java

@Entity
@Table(name = "BPMN_SAMPLE")
public class HvlBpmnDTSampleEntity extends HvlTransactionalHardDeleteEntity {

    @Column(name = "NAME")
    private String name;

    @Column(name = "SURNAME")
    private String surname;

    //GETTER SETTER
}

Entity ayarlaması yapıldıktan sonra transactional bpmn contexinin oluşması için persist işlemlerinin yapıldığı methodun başına @HvlBpmnTransactional anotasyonu eklenmelidir. Aksi durumda context oluşmayacağı için entity kilitleme işlemi yapılamayacaktır.

Örnek Kod

@Transactional
@KafkaListener(topics = "service1Topic")
@HvlBpmnTransactional(type = HvlBpmTransactionalType.PARAMETER)
public void runService1Logic(HashMap<String, Object> payload) {

        .
        .
        .

    final HvlBpmnDTSampleEntity bpmnDTSampleEntity = new HvlBpmnDTSampleEntity();
    bpmnDTSampleEntity.setName("Arda");
    bpmnDTSampleEntity.setSurname("Cuhadaroglu");
    final HvlBpmnDTSampleEntity savedEntity = sampleDTRepository.save(bpmnDTSampleEntity);

        .
        .
        .
}

Distributed Transactions için Kafka Listener#

Distributed transaction'daki her bir adım için bir kafka kafka listener olmalıdır. Bu kafka listeneler içerisinde o adımda yapılması gereken işlemlerin yapılması gerekmektedir. Kafka'nın detaylı kullanımı için Hvl Kafka Altyapısı sayfası kullanılabilir.

Örnek Kod

@Transactional
@KafkaListener(topics = "service2Topic")
@HvlBpmnTransactional(type = HvlBpmTransactionalType.PARAMETER)
public void runService2Logic(HashMap<String, Object> payload) {
    System.out.println("SERVICE 2 RUNNN!!");

    final String businessKey = String.valueOf(payload.get(HvlBpmnCommonConstant.BUSINESS_KEY_KEY));
    final String processInstanceId = String.valueOf(payload.get(HvlBpmnCommonConstant.PROCESS_INSTANCE_ID_KEY));

    bpmnLogProducerService.log(HvlBpmnLogModelBuilder.create(businessKey, processInstanceId, "SERVICE 2 RUNNN!!", HvlBpmnLogLevel.INFO).build());

    final Integer entityId = (Integer) payload.get("service1EntityId");
    final HvlBpmnDTSampleEntity sampleEntity = sampleDTRepository.getById(Long.valueOf(entityId));
    sampleEntity.setName("Arda2");
    sampleDTRepository.update(sampleEntity);

    payload.put("service2Resultkey", "Service 2 Result Value");

    final HvlBpmnTransactionalEventModel transactionalEventModel
            = new HvlBpmnTransactionalEventModel(HvlBpmnTransactionalEventType.COMPLETE_STEP, businessKey, payload);
    eventNotifier.notifySync(new HvlBpmnTransactionalEvent(transactionalEventModel));
}

Örnek kod incelendiğinde methodun sonunda

final HvlBpmnTransactionalEventModel transactionalEventModel
            = new HvlBpmnTransactionalEventModel(HvlBpmnTransactionalEventType.COMPLETE_STEP, businessKey, payload);
eventNotifier.notifySync(new HvlBpmnTransactionalEvent(transactionalEventModel));

satırları görülür. Bu satırlar ile işlemin tamamlandığını ve sonraki adıma geçilebilir olduğu belirtilmektedir.

Method içerisinde hata alınması durumunda tip olarak HvlBpmnTransactionalEventType.START_ROLLBACK kullanılmalıdır. Bu durumda distributed transaction surecindeki rollback adımları çalışmaya başlayacaktır. Ele alınan rollback adımlarında ise HvlBpmnTransactionalEventType.COMPLETE_ROLLBACK tipi kullanılmaktadır.

Örnek Kod

@Transactional
@KafkaListener(topics = "service2RollbackTopic")
@HvlBpmnTransactional(type = HvlBpmTransactionalType.PARAMETER)
public void runService2RollbackLogic(HashMap<String, Object> payload) {
    System.out.println("SERVICE 2 ROLLBACK!!");

    final String businessKey = String.valueOf(payload.get(HvlBpmnCommonConstant.BUSINESS_KEY_KEY));
    final String processInstanceId = String.valueOf(payload.get(HvlBpmnCommonConstant.PROCESS_INSTANCE_ID_KEY));

    bpmnLogProducerService.log(HvlBpmnLogModelBuilder.create(businessKey, processInstanceId, "SERVICE 2 ROLLBACK!!", HvlBpmnLogLevel.INFO).build());

    payload.put("service2RollbackResultkey", "Service 2 Rollback Result Value");

    final HvlBpmnTransactionalEventModel transactionalEventModel
            = new HvlBpmnTransactionalEventModel(HvlBpmnTransactionalEventType.COMPLETE_ROLLBACK, businessKey, payload);
    eventNotifier.notifySync(new HvlBpmnTransactionalEvent(transactionalEventModel));
}

Örnek kodun tamamına hvl-javalt-samples projesinden ulaşılabilir.

@HvlBpmnTransactional Nedir ve Nasıl Kullanılır?#

Methodun başına eklenen bu anotasyon sayesinde method parametresinden veya request header'ından sürece ait business key bilgisi alınabilir.

HvlBpmnTransactional.java

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface HvlBpmnTransactional {

    HvlBpmTransactionalType type();

    String parameterField() default "payload";

    Class<?> parameterClass() default HashMap.class;

}

Anotasyon kullanımında aşağıdaki gibi 2 çeşit type bilgisi bulunmaktadır.

HvlBpmTransactionalType.java

public enum HvlBpmTransactionalType {

    PARAMETER,
    REQUEST

}

PARAMETER: Business key değerinin method parametresinde taşındığı durumlarda bu tip kullanılmalıdır. Default parametre adı payload ve default beklenen tip HashMap'tir. Contextin düzgün bir şekilde oluşabilmesi için payload içerisinde businessKey key'i bulunmalıdır.

@HvlBpmnTransactional(type = HvlBpmTransactionalType.PARAMETER)
public void runService1Logic(HashMap<String, Object> payload) {
        .
        .
        .
}

Eğer business key değeri direkt olarak businessKey isimli bir String ile taşındığı durumda anotasyon aşağıdaki şekilde kullanılmalıdır.

@HvlBpmnTransactional(type = HvlBpmTransactionalType.PARAMETER, parameterField = "businessKey", parameterClass = String.class)
public void runService1Logic(String businessKey) {
        .
        .
        .
}

REQUEST: Business key değerinin request header'ında taşınması durumunda bu tip kullanılmalıdır. Distributed transactionı yöneten methotta parametre taşınmadığı durumlarda bu tip kullanılabilir.

@HvlBpmnTransactional(type = HvlBpmTransactionalType.REQUEST)
public void runService1Logic() {
        .
        .
        .
}

Bpmn Log Producer#

Distributed transactions adımlarında yapılan işlemlerin loglanması için Bpmn Log Producer paketi kullanılmalıdır. Daha sonra loglanan veriler Procsys uygulamasından takip edilebilmektedir.

Gradle Dependencies

api (
        [group: 'tr.com.havelsan.framework.bpmn', name: 'hvl-bpmn-log-provider']
)

Bağımlılık eklendikten sonra HvlBpmnLogProducerService servisi inject edilmelidir.

@Autowired
private HvlBpmnLogProviderService bpmnLogProviderService;
Örnek Kod

@Transactional
@KafkaListener(topics = "service1Topic")
@HvlBpmnTransactional(type = HvlBpmTransactionalType.PARAMETER)
public void runService1Logic(HashMap<String, Object> payload) {
    System.out.println("SERVICE 1 RUNNN!!");

    final String businessKey = String.valueOf(payload.get(HvlBpmnCommonConstant.BUSINESS_KEY_KEY));
    final String processInstanceId = String.valueOf(payload.get(HvlBpmnCommonConstant.PROCESS_INSTANCE_ID_KEY));

    bpmnLogProviderService.log(HvlBpmnLogModelBuilder.create(businessKey, processInstanceId, "SERVICE 1 RUNNN!!", HvlBpmnLogLevel.INFO).build());

        .
        .
        .
}

Loglama seviyesi olarak örnekte HvlBpmnLogLevel.INFO kullanılmıştır. Bunun dışında farklı loglama seviyeleri de kullanılabilir.

public enum HvlBpmnLogLevel {

    INFO,
    WARNING,
    ERROR

}

Distributed Transaction Sürecini Başlatma#

Distributed transaction süreci aşağıdaki gibi başlatılabilir. Sürece start verebilmek için HvlBpmnTransactionalRestService rest servisi inject edilmelidir.

Örnek Kod

@Service
public class HvlBpmnDTServiceImpl implements HvlBpmnDTService {

    private final HvlBpmnTransactionalOperationalRestService bpmnTransactionalOperationalRestService;

    public HvlBpmnDTServiceImpl(HvlBpmnTransactionalOperationalRestService bpmnTransactionalOperationalRestService) {
        this.bpmnTransactionalOperationalRestService = bpmnTransactionalOperationalRestService;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void startProcess() {
        final HashMap<String, Object> payload = new HashMap<>();
        payload.put("name", "Arda Çuhadaroğlu");
        final HashMap<String, Object> map = new HashMap<>();
        map.put(HvlBpmnTransactionalServiceConstant.DT_SERVICE_VARIABLE_PAYLOAD_KEY, payload);
//        map.put(HvlBpmnTransactionalServiceConstant.DT_SERVICE_VARIABLE_RETRY_CYCLE_KEY, "R20/PT2M");
        bpmnTransactionalOperationalRestService.startProcessInstanceByKey("distributedTransaction", map).getBody();
    }

}