🔔[항해99]/WIL

[항해플러스 백엔드후기] 9주차 회고 - 카프카는 신이야

디카페인라떼 2025. 2. 24. 00:14

 

 

[과제]

9주차의 과제는 다음과 같았다 

 

 - docker 를 이용해 kafka 를 설치 및 실행하고 애플리케이션과 연결 후 카프카 consumer, producer 를 연동 및 테스트
 - 카프카의 발행이 실패하는 것을 방지하기 위해 Transactional Outbox Pattern를 적용
   카프카의 발행이 실패한 케이스에 대한 재처리를 구현 ( Scheduler or BatchProcess )

 

 

그동안에 숱하게 들어왔던.. 카프카는 신이야 ! 에서 드디어 카프카를 연동해보고 구현해보는 주차가 다가왔다

사실은 실무에서도 카프카를 사용하고 있지만 내가 직접 연동해보고 어떠한 전략을 사용할지는 고민해본적은 없어서 
기초적인 개념은 이해하고 있지만 제대로 실습을 해본다는 느낌으로 과제를 진행했다


[과제 해결 과정]

도커 컴포즈에 카프카 설정을 추가해주어 도커를 띄웠을때 카프카가 같이 뜨도록 설정해 주었다

  kafka:
    image: 'bitnami/kafka:latest'
    container_name: kafka
    restart: always
    depends_on:
      - zookeeper
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_OPTS=-Djava.security.auth.login.config=/dev/null
      - KAFKA_ZOOKEEPER_SSL_CLIENT_ENABLE=false
      - KAFKA_SASL_ENABLED_MECHANISMS=
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
    ports:
      - '9092:9092'
    networks:
      - app-network

* networks 빼먹지 말기

 

그리고 producer를 설정한 후 바로 consumer를 연동하여 바로 consume 되는지 통합테스트를 통하여 확인하였다

 

@SpringBootTest
@ContextConfiguration(classes = ServerApplication.class)
@EmbeddedKafka(partitions = 1, topics = { "test-topic" })  // 테스트용 Kafka 브로커 실행
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) // 테스트 후 컨텍스트 초기화
public class KafkaIntegrationTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static Consumer<String, String> consumer;

    @BeforeAll
    public static void setup(@Autowired EmbeddedKafkaBroker embeddedKafkaBroker) {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "true", embeddedKafkaBroker);
        ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(
                consumerProps, new StringDeserializer(), new StringDeserializer());

        consumer = consumerFactory.createConsumer();
        embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "test-topic");
    }

    @Test
    public void testKafkaProducerAndConsumer() {
        String testMessage = "Integration Test Message";

        //Kafka에 메시지 전송
        kafkaTemplate.send("test-topic", testMessage);

        //Kafka Consumer가 메시지를 수신할 때까지 기다림
        await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
            ConsumerRecord<String, String> received = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
            assertThat(received.value()).isEqualTo(testMessage);
        });
    }

    @AfterAll
    public static void cleanup() {
        if (consumer != null) {
            consumer.close();
        }
    }
}

 


Transactional Outbox Pattern

데이터베이스의 Outbox 테이블에 이벤트 데이터를 먼저 저장한 후, 별도의 프로세스 또는 폴링 메커니즘를 통해 메시지 브로커(Kafka)로 전송하는 방식이다

이를 통해 데이터 일관성을 유지하면서도 비동기 이벤트 처리를 보장할 수 있다

 

이는 해당 순서로 구현하였다 

  1. 애플리케이션 이벤트퍼블리셔를 통해서 주문 시 이벤트를 발행한다 (이전 주차와 동일)
  2. 애플리케이션 이벤트 리스너를 통해 해당 이벤트를 받아서 아웃박스 테이블에 저장한다 
    •     @TransactionalEventListener(phase = BEFORE_COMMIT) 를 통해 커밋되기 전에 저장
  3. 카프카 메세지를 발행한다 
    • @TransactionalEventListener(phase = AFTER_COMMIT) 를 통해 커밋 된 이후에 카프카 메세지 발행
  4. 카프카 consumer 에서 소비할때 아웃박스 테이블에 있던 해당 row의 상태를 발행 상태로 상태값을 update 해준다
  5. 스케줄러를 통해 아웃박스 테이블의 상태값이 일정시간 동안 계속 변경되지 않은 메세지들은 다시한번 카프카 메세지 발행을 해주어 재누락된 메세지가 없도록 처리한다

[알게된 점]

이번 과제로 Transactional Outbox Pattern 전략을 구현해보며 메시지 발행 이벤트의 실패에 대한 대응을 직접 경험해볼 수 있었다 다음에는 DLQ도 직접 구현해 보고 싶다

카프카 .. 만능이지만 공부해야 할 것이 참많다... 더열심히 해야겠다

 


이번주 KPT 회고

Keep : [유지해야 할 좋은 점]

과제를 꾸준히 잘 제출하고 있다는 점..! 

Problem : [개선이 필요한 점]

피드백이 생각보다 짧아서 이번주차를 내가 잘해낸건지 잘 확신이 안선다.. 피드백이 조금 아쉽다

Try : [새롭게 시도할 점]

카프카에 대해서 좀더 공부해 보아야겠다!


 


🎉 수료생 추천 할인 혜택 안내 🎉

항해 플러스에 합류하고 싶은데 수강료가 부담되시나요? 🤔
수료생 추천 할인 혜택으로 20만 원 할인을 받을 수 있다는 사실, 알고 계셨나요? 💡

💳 할인 적용 방법
1️⃣ 결제 페이지로 이동
2️⃣ 할인 코드 입력란에 zzy6ui 입력
3️⃣ 추가 할인 20만 원 바로 적용!

놓치지 말고 특별한 혜택 꼭 챙겨가세요! 🚀🌟