본문 바로가기
MSA

[MSA][Saga] 스프링 부트를 사용한 오케스트레이션 사가 패턴

by Real Iron 2023. 12. 22.

스프링 부트를 사용한 오케스트레이션 사가 패턴

개요:

이 튜토리얼에서는 Spring Boot를 사용한 Orchestration Saga Pattern의 간단한 구현을 보여드리고 싶습니다  .

수년에 걸쳐 마이크로서비스는 매우 인기를 얻었습니다. 마이크로서비스는 분산 시스템입니다. 더 작고 모듈식이며 배포 및 확장이 쉽습니다. 단일 마이크로서비스 애플리케이션을 개발하는 것이 흥미로울 수 있습니다! 하지만 여러 마이크로서비스에 걸쳐 있는 비즈니스 트랜잭션을 처리하는 것은 재미가 없습니다! 애플리케이션 워크플로/작업을 완료하려면 여러 마이크로서비스가 함께 작동해야 할 수도 있습니다.

 

이 기사에서는 분산 시스템의 트랜잭션/데이터 일관성을 처리하는 것이 얼마나 어려울 수 있는지와 오케스트레이션 사가 패턴이 우리에게 어떻게 도움이 될 수 있는지 살펴보겠습니다  .

간단한 거래:

비즈니스 규칙에 따르면 사용자가 주문할 때 제품 가격이 사용자의 신용 한도/잔액 내에 있고 해당 제품에 대한 재고가 있는 경우 주문이 이행된다고 가정해 보겠습니다. 그렇지 않으면 이행되지 않습니다. 정말 단순해 보입니다. 이는 모놀리식 애플리케이션에서 구현하기가 매우 쉽습니다. 전체 작업 흐름은 하나의 단일 트랜잭션으로 간주될 수 있습니다. 모든 것이 하나의 DB에 있으면 커밋/롤백이 쉽습니다. 여러 데이터베이스가 있는 분산 시스템을 사용하면 매우 복잡해집니다! 이를 구현하는 방법을 알아보기 위해 먼저 아키텍처를 살펴보겠습니다.

자체 DB를 갖춘 아래의 마이크로서비스가 있습니다.

  • 주문 서비스
  • 결제 서비스
  • 재고 서비스

주문 서비스가 새로운 주문에 대한 요청을 받으면 결제 서비스 및 재고 서비스에 확인해야 합니다. 결제 금액과 재고 금액을 차감하고 최종적으로 주문을 이행합니다! 결제 금액을 차감했는데 재고가 없으면 어떻게 되나요? 롤백하는 방법? 여러 데이터베이스가 관련되어 있으면 어렵습니다.

사가 패턴:

여러 마이크로서비스에 걸쳐 있는 각 비즈니스 트랜잭션은 마이크로서비스별 로컬 트랜잭션으로 분할되며 순서대로 실행되어 비즈니스 워크플로를 완료합니다. 사가(Saga)라고 합니다. 2가지 방법으로 구현할 수 있습니다.

  • 안무 접근법
  • 오케스트레이션 접근 방식

이번 글에서는 오케스트레이션 기반의 사가(saga)에 대해 논의할 것입니다. 안무 기반 사가에 대한 자세한 내용은 여기를 확인하세요 .

오케스트레이션 사가 패턴:

이 패턴에서는 모든 마이크로서비스 간의 모든 트랜잭션을 조정하는 별도의 서비스인 오케스트레이터를 갖게 됩니다. 문제가 없으면 주문 요청이 완료된 것으로 표시되고, 그렇지 않으면 취소된 것으로 표시됩니다.

이것을 어떻게 구현할 수 있는지 살펴보겠습니다. 우리의 샘플 아키텍처는 대략 다음과 같습니다.!

  • 이 데모에서 오케스트레이터와 다른 서비스 간의 통신은 이를 상태 비저장으로 만들기 위한 비차단 비동기 방식의 간단한 HTTP입니다.
  • 이 통신에 Kafka 주제를 사용할 수도 있습니다. 이를 위해서는 상태 저장 스타일에 더 가까운 분산/수집 패턴을 사용해야 합니다 .

일반적인 DTO:

  • 먼저 아래와 같이 Spring boot 다중 모듈 Maven 프로젝트를 생성합니다.

재고 서비스:

오케스트레이터가 조정하는 각 마이크로서비스에는 각 엔터티에 대해 최소 2개의 엔드포인트가 있어야 합니다. 하나는 공제이고 다른 하나는 거래 재설정입니다. 예를 들어. 먼저 재고를 공제하고 나중에 결제 시스템에서 잔고가 부족하다는 사실을 알게 되면 재고를 다시 추가해야 합니다.

참고 : 몇 가지 제품 ID에 대한 일부 재고를 보관하기 위해 지도를 DB로 사용했습니다.

@Service
public class InventoryService {

    private Map<Integer, Integer> productInventoryMap;

    @PostConstruct
    private void init(){
        this.productInventoryMap = new HashMap<>();
        this.productInventoryMap.put(1, 5);
        this.productInventoryMap.put(2, 5);
        this.productInventoryMap.put(3, 5);
    }

    public InventoryResponseDTO deductInventory(final InventoryRequestDTO requestDTO){
        int quantity = this.productInventoryMap.getOrDefault(requestDTO.getProductId(), 0);
        InventoryResponseDTO responseDTO = new InventoryResponseDTO();
        responseDTO.setOrderId(requestDTO.getOrderId());
        responseDTO.setUserId(requestDTO.getUserId());
        responseDTO.setProductId(requestDTO.getProductId());
        responseDTO.setStatus(InventoryStatus.UNAVAILABLE);
        if(quantity > 0){
            responseDTO.setStatus(InventoryStatus.AVAILABLE);
            this.productInventoryMap.put(requestDTO.getProductId(), quantity - 1);
        }
        return responseDTO;
    }

    public void addInventory(final InventoryRequestDTO requestDTO){
        this.productInventoryMap
                .computeIfPresent(requestDTO.getProductId(), (k, v) -> v + 1);
    }

}
  • 제어 장치
@RestController
@RequestMapping("inventory")
public class InventoryController {

    @Autowired
    private InventoryService service;

    @PostMapping("/deduct")
    public InventoryResponseDTO deduct(@RequestBody final InventoryRequestDTO requestDTO){
        return this.service.deductInventory(requestDTO);
    }

    @PostMapping("/add")
    public void add(@RequestBody final InventoryRequestDTO requestDTO){
        this.service.addInventory(requestDTO);
    }

}

결제 서비스:

또한 Inventory-service와 같은 2개의 엔드포인트를 노출합니다. 중요한 수업만 보여드립니다. 자세한 내용은 이 기사 끝에 있는 전체 프로젝트 소스 코드에 대한 github 링크를 확인하세요.

@Service
public class PaymentService {

    private Map<Integer, Double> userBalanceMap;

    @PostConstruct
    private void init(){
        this.userBalanceMap = new HashMap<>();
        this.userBalanceMap.put(1, 1000d);
        this.userBalanceMap.put(2, 1000d);
        this.userBalanceMap.put(3, 1000d);
    }

    public PaymentResponseDTO debit(final PaymentRequestDTO requestDTO){
        double balance = this.userBalanceMap.getOrDefault(requestDTO.getUserId(), 0d);
        PaymentResponseDTO responseDTO = new PaymentResponseDTO();
        responseDTO.setAmount(requestDTO.getAmount());
        responseDTO.setUserId(requestDTO.getUserId());
        responseDTO.setOrderId(requestDTO.getOrderId());
        responseDTO.setStatus(PaymentStatus.PAYMENT_REJECTED);
        if(balance >= requestDTO.getAmount()){
            responseDTO.setStatus(PaymentStatus.PAYMENT_APPROVED);
            this.userBalanceMap.put(requestDTO.getUserId(), balance - requestDTO.getAmount());
        }
        return responseDTO;
    }

    public void credit(final PaymentRequestDTO requestDTO){
        this.userBalanceMap.computeIfPresent(requestDTO.getUserId(), (k, v) -> v + requestDTO.getAmount());
    }

}
  • 제어 장치
@RestController
@RequestMapping("payment")
public class PaymentController {

    @Autowired
    private PaymentService service;

    @PostMapping("/debit")
    public PaymentResponseDTO debit(@RequestBody PaymentRequestDTO requestDTO){
        return this.service.debit(requestDTO);
    }

    @PostMapping("/credit")
    public void credit(@RequestBody PaymentRequestDTO requestDTO){
        this.service.credit(requestDTO);
    }

}

주문 서비스:

우리의 주문 서비스는 주문 생성 명령을 수신하고 스프링 부트 카프카 바인더를 사용하여 주문 생성 이벤트를 발생시킵니다. 또한 주문 업데이트 채널/kafka 주제를 수신 하고 주문 상태를 업데이트합니다.

  • 제어 장치
@RestController
@RequestMapping("order")
public class OrderController {

    @Autowired
    private OrderService service;

    @PostMapping("/create")
    public PurchaseOrder createOrder(@RequestBody OrderRequestDTO requestDTO){
        requestDTO.setOrderId(UUID.randomUUID());
        return this.service.createOrder(requestDTO);
    }

    @GetMapping("/all")
    public List<OrderResponseDTO> getOrders(){
        return this.service.getAll();
    }

}
  • 서비스
@Service
public class OrderService {

    // product price map
    private static final Map<Integer, Double> PRODUCT_PRICE =  Map.of(
            1, 100d,
            2, 200d,
            3, 300d
    );

    @Autowired
    private PurchaseOrderRepository purchaseOrderRepository;

    @Autowired
    private FluxSink<OrchestratorRequestDTO> sink;

    public PurchaseOrder createOrder(OrderRequestDTO orderRequestDTO){
        PurchaseOrder purchaseOrder = this.purchaseOrderRepository.save(this.dtoToEntity(orderRequestDTO));
        this.sink.next(this.getOrchestratorRequestDTO(orderRequestDTO));
        return purchaseOrder;
    }

    public List<OrderResponseDTO> getAll() {
        return this.purchaseOrderRepository.findAll()
                .stream()
                .map(this::entityToDto)
                .collect(Collectors.toList());
    }

    private PurchaseOrder dtoToEntity(final OrderRequestDTO dto){
        PurchaseOrder purchaseOrder = new PurchaseOrder();
        purchaseOrder.setId(dto.getOrderId());
        purchaseOrder.setProductId(dto.getProductId());
        purchaseOrder.setUserId(dto.getUserId());
        purchaseOrder.setStatus(OrderStatus.ORDER_CREATED);
        purchaseOrder.setPrice(PRODUCT_PRICE.get(purchaseOrder.getProductId()));
        return purchaseOrder;
    }

    private OrderResponseDTO entityToDto(final PurchaseOrder purchaseOrder){
        OrderResponseDTO dto = new OrderResponseDTO();
        dto.setOrderId(purchaseOrder.getId());
        dto.setProductId(purchaseOrder.getProductId());
        dto.setUserId(purchaseOrder.getUserId());
        dto.setStatus(purchaseOrder.getStatus());
        dto.setAmount(purchaseOrder.getPrice());
        return dto;
    }

    public OrchestratorRequestDTO getOrchestratorRequestDTO(OrderRequestDTO orderRequestDTO){
        OrchestratorRequestDTO requestDTO = new OrchestratorRequestDTO();
        requestDTO.setUserId(orderRequestDTO.getUserId());
        requestDTO.setAmount(PRODUCT_PRICE.get(orderRequestDTO.getProductId()));
        requestDTO.setOrderId(orderRequestDTO.getOrderId());
        requestDTO.setProductId(orderRequestDTO.getProductId());
        return requestDTO;
    }

}

오더 오케스트레이터:

이는 모든 거래를 조정하는 역할을 하는 마이크로서비스입니다. 주문이 생성된 Topic을 청취합니다. 새로운 주문이 생성되면 결제 서비스/재고 서비스 등 각 서비스에 대해 즉시 별도의 요청을 작성하고 응답을 검증합니다. 정상이면 주문을 이행합니다. 그 중 하나라도 없으면 해당 순서를 취소합니다. 또한 마이크로서비스에서 발생한 로컬 트랜잭션을 재설정하려고 시도합니다.

우리는 모든 로컬 트랜잭션을 하나의 단일 워크플로우로 간주합니다. 워크플로에는 여러 워크플로 단계가 포함됩니다.

  • 워크플로 단계
public interface WorkflowStep {

    WorkflowStepStatus getStatus();
    Mono<Boolean> process();
    Mono<Boolean> revert();

}
  • 작업 흐름
public interface Workflow {

    List<WorkflowStep> getSteps();

}
  • 우리의 경우 주문 워크플로에는 2단계가 있습니다. 각 구현에서는 로컬 트랜잭션 수행 방법과 재설정 방법을 알아야 합니다.
  • 재고 단계
public class InventoryStep implements WorkflowStep {

    private final WebClient webClient;
    private final InventoryRequestDTO requestDTO;
    private WorkflowStepStatus stepStatus = WorkflowStepStatus.PENDING;

    public InventoryStep(WebClient webClient, InventoryRequestDTO requestDTO) {
        this.webClient = webClient;
        this.requestDTO = requestDTO;
    }

    @Override
    public WorkflowStepStatus getStatus() {
        return this.stepStatus;
    }

    @Override
    public Mono<Boolean> process() {
        return this.webClient
                .post()
                .uri("/inventory/deduct")
                .body(BodyInserters.fromValue(this.requestDTO))
                .retrieve()
                .bodyToMono(InventoryResponseDTO.class)
                .map(r -> r.getStatus().equals(InventoryStatus.AVAILABLE))
                .doOnNext(b -> this.stepStatus = b ? WorkflowStepStatus.COMPLETE : WorkflowStepStatus.FAILED);
    }

    @Override
    public Mono<Boolean> revert() {
        return this.webClient
                    .post()
                    .uri("/inventory/add")
                    .body(BodyInserters.fromValue(this.requestDTO))
                    .retrieve()
                    .bodyToMono(Void.class)
                    .map(r ->true)
                    .onErrorReturn(false);
    }
}
  • 결제 단계
public class PaymentStep implements WorkflowStep {

    private final WebClient webClient;
    private final PaymentRequestDTO requestDTO;
    private WorkflowStepStatus stepStatus = WorkflowStepStatus.PENDING;

    public PaymentStep(WebClient webClient, PaymentRequestDTO requestDTO) {
        this.webClient = webClient;
        this.requestDTO = requestDTO;
    }

    @Override
    public WorkflowStepStatus getStatus() {
        return this.stepStatus;
    }

    @Override
    public Mono<Boolean> process() {
        return this.webClient
                    .post()
                    .uri("/payment/debit")
                    .body(BodyInserters.fromValue(this.requestDTO))
                    .retrieve()
                    .bodyToMono(PaymentResponseDTO.class)
                    .map(r -> r.getStatus().equals(PaymentStatus.PAYMENT_APPROVED))
                    .doOnNext(b -> this.stepStatus = b ? WorkflowStepStatus.COMPLETE : WorkflowStepStatus.FAILED);
    }

    @Override
    public Mono<Boolean> revert() {
        return this.webClient
                .post()
                .uri("/payment/credit")
                .body(BodyInserters.fromValue(this.requestDTO))
                .retrieve()
                .bodyToMono(Void.class)
                .map(r -> true)
                .onErrorReturn(false);
    }

}
  • 서비스 / 코디네이터
@Service
public class OrchestratorService {

    @Autowired
    @Qualifier("payment")
    private WebClient paymentClient;

    @Autowired
    @Qualifier("inventory")
    private WebClient inventoryClient;

    public Mono<OrchestratorResponseDTO> orderProduct(final OrchestratorRequestDTO requestDTO){
        Workflow orderWorkflow = this.getOrderWorkflow(requestDTO);
        return Flux.fromStream(() -> orderWorkflow.getSteps().stream())
                .flatMap(WorkflowStep::process)
                .handle(((aBoolean, synchronousSink) -> {
                    if(aBoolean)
                        synchronousSink.next(true);
                    else
                        synchronousSink.error(new WorkflowException("create order failed!"));
                }))
                .then(Mono.fromCallable(() -> getResponseDTO(requestDTO, OrderStatus.ORDER_COMPLETED)))
                .onErrorResume(ex -> this.revertOrder(orderWorkflow, requestDTO));

    }

    private Mono<OrchestratorResponseDTO> revertOrder(final Workflow workflow, final OrchestratorRequestDTO requestDTO){
        return Flux.fromStream(() -> workflow.getSteps().stream())
                .filter(wf -> wf.getStatus().equals(WorkflowStepStatus.COMPLETE))
                .flatMap(WorkflowStep::revert)
                .retry(3)
                .then(Mono.just(this.getResponseDTO(requestDTO, OrderStatus.ORDER_CANCELLED)));
    }

    private Workflow getOrderWorkflow(OrchestratorRequestDTO requestDTO){
        WorkflowStep paymentStep = new PaymentStep(this.paymentClient, this.getPaymentRequestDTO(requestDTO));
        WorkflowStep inventoryStep = new InventoryStep(this.inventoryClient, this.getInventoryRequestDTO(requestDTO));
        return new OrderWorkflow(List.of(paymentStep, inventoryStep));
    }

    private OrchestratorResponseDTO getResponseDTO(OrchestratorRequestDTO requestDTO, OrderStatus status){
        OrchestratorResponseDTO responseDTO = new OrchestratorResponseDTO();
        responseDTO.setOrderId(requestDTO.getOrderId());
        responseDTO.setAmount(requestDTO.getAmount());
        responseDTO.setProductId(requestDTO.getProductId());
        responseDTO.setUserId(requestDTO.getUserId());
        responseDTO.setStatus(status);
        return responseDTO;
    }

    private PaymentRequestDTO getPaymentRequestDTO(OrchestratorRequestDTO requestDTO){
        PaymentRequestDTO paymentRequestDTO = new PaymentRequestDTO();
        paymentRequestDTO.setUserId(requestDTO.getUserId());
        paymentRequestDTO.setAmount(requestDTO.getAmount());
        paymentRequestDTO.setOrderId(requestDTO.getOrderId());
        return paymentRequestDTO;
    }

    private InventoryRequestDTO getInventoryRequestDTO(OrchestratorRequestDTO requestDTO){
        InventoryRequestDTO inventoryRequestDTO = new InventoryRequestDTO();
        inventoryRequestDTO.setUserId(requestDTO.getUserId());
        inventoryRequestDTO.setProductId(requestDTO.getProductId());
        inventoryRequestDTO.setOrderId(requestDTO.getOrderId());
        return inventoryRequestDTO;
    }

}

여기서는 높은 수준의 세부정보만 제공했습니다. 전체 소스를 보려면 여기를 확인하세요 .

오케스트레이션 사가 패턴 – 데모:

  • 모든 서비스가 실행되면 POST 요청을 보내 주문을 생성합니다. 주문 생성 상태가 표시됩니다.
    • 사용자 1은 $300의 제품 ID 3을 주문하려고 합니다.
    • 사용자의 신용 한도는 $1000입니다.
  • 4개의 요청을 보냈습니다. 그래서 3가지 요청이 충족되었습니다. 사용자에게는 100달러만 남게 되고 우리는 4번째 주문을 이행할 수 없기 때문에 4번째 주문이 아닙니다. 그래서 결제 서비스가 거부되었을 것입니다.
  • 이 사용 가능한 잔액이 $100인 사용자 1은 가격이 $100에 불과하므로 제품 ID 1을 구입할 수 있습니다.

요약:

Spring Boot를 사용하여 Orchestration Saga 패턴을 성공적으로 시연할 수 있었습니다 . 일반적으로 모든 마이크로서비스 간에 트랜잭션을 처리하고 데이터 일관성을 유지하는 것은 어렵습니다. 결제, 재고, 사기 확인, 배송 확인 등과 같은 여러 서비스가 관련되는 경우 코디네이터 없이 여러 단계를 거쳐 복잡한 작업 흐름을 관리하는 것은 매우 어려울 것입니다. 오케스트레이션을 위한 별도의 서비스를 도입함으로써 주문 서비스는 이러한 책임에서 해방됩니다.

여기에서 프로젝트 소스 코드를 확인하세요 .

마이크로서비스 패턴에 대해 자세히 알아보세요.

행복한 코딩