[과제]
9주차의 과제는 다음과 같았다
- docker 를 이용해 kafka 를 설치 및 실행하고 애플리케이션과 연결 후 카프카 consumer, producer 를 연동 및 테스트
- 카프카의 발행이 실패하는 것을 방지하기 위해 Transactional Outbox Pattern를 적용
카프카의 발행이 실패한 케이스에 대한 재처리를 구현 ( Scheduler or BatchProcess )
그동안에 숱하게 들어왔던.. 카프카는 신이야 ! 에서 드디어 카프카를 연동해보고 구현해보는 주차가 다가왔다
사실은 실무에서도 카프카를 사용하고 있지만 내가 직접 연동해보고 어떠한 전략을 사용할지는 고민해본적은 없어서
기초적인 개념은 이해하고 있지만 제대로 실습을 해본다는 느낌으로 과제를 진행했다
[과제 해결 과정]
도커 컴포즈에 카프카 설정을 추가해주어 도커를 띄웠을때 카프카가 같이 뜨도록 설정해 주었다
kafka:
image: 'bitnami/kafka:latest'
container_name: kafka
restart: always
depends_on:
- zookeeper
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_OPTS=-Djava.security.auth.login.config=/dev/null
- KAFKA_ZOOKEEPER_SSL_CLIENT_ENABLE=false
- KAFKA_SASL_ENABLED_MECHANISMS=
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
ports:
- '9092:9092'
networks:
- app-network
* networks 빼먹지 말기
그리고 producer를 설정한 후 바로 consumer를 연동하여 바로 consume 되는지 통합테스트를 통하여 확인하였다
@SpringBootTest
@ContextConfiguration(classes = ServerApplication.class)
@EmbeddedKafka(partitions = 1, topics = { "test-topic" }) // 테스트용 Kafka 브로커 실행
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) // 테스트 후 컨텍스트 초기화
public class KafkaIntegrationTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static Consumer<String, String> consumer;
@BeforeAll
public static void setup(@Autowired EmbeddedKafkaBroker embeddedKafkaBroker) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "true", embeddedKafkaBroker);
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(
consumerProps, new StringDeserializer(), new StringDeserializer());
consumer = consumerFactory.createConsumer();
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "test-topic");
}
@Test
public void testKafkaProducerAndConsumer() {
String testMessage = "Integration Test Message";
//Kafka에 메시지 전송
kafkaTemplate.send("test-topic", testMessage);
//Kafka Consumer가 메시지를 수신할 때까지 기다림
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
ConsumerRecord<String, String> received = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
assertThat(received.value()).isEqualTo(testMessage);
});
}
@AfterAll
public static void cleanup() {
if (consumer != null) {
consumer.close();
}
}
}
Transactional Outbox Pattern
데이터베이스의 Outbox 테이블에 이벤트 데이터를 먼저 저장한 후, 별도의 프로세스 또는 폴링 메커니즘를 통해 메시지 브로커(Kafka)로 전송하는 방식이다
이를 통해 데이터 일관성을 유지하면서도 비동기 이벤트 처리를 보장할 수 있다
이는 해당 순서로 구현하였다
- 애플리케이션 이벤트퍼블리셔를 통해서 주문 시 이벤트를 발행한다 (이전 주차와 동일)
- 애플리케이션 이벤트 리스너를 통해 해당 이벤트를 받아서 아웃박스 테이블에 저장한다
- @TransactionalEventListener(phase = BEFORE_COMMIT) 를 통해 커밋되기 전에 저장
- 카프카 메세지를 발행한다
- @TransactionalEventListener(phase = AFTER_COMMIT) 를 통해 커밋 된 이후에 카프카 메세지 발행
- 카프카 consumer 에서 소비할때 아웃박스 테이블에 있던 해당 row의 상태를 발행 상태로 상태값을 update 해준다
- 스케줄러를 통해 아웃박스 테이블의 상태값이 일정시간 동안 계속 변경되지 않은 메세지들은 다시한번 카프카 메세지 발행을 해주어 재누락된 메세지가 없도록 처리한다
[알게된 점]
이번 과제로 Transactional Outbox Pattern 전략을 구현해보며 메시지 발행 이벤트의 실패에 대한 대응을 직접 경험해볼 수 있었다 다음에는 DLQ도 직접 구현해 보고 싶다
카프카 .. 만능이지만 공부해야 할 것이 참많다... 더열심히 해야겠다
이번주 KPT 회고
Keep : [유지해야 할 좋은 점]
과제를 꾸준히 잘 제출하고 있다는 점..!
Problem : [개선이 필요한 점]
피드백이 생각보다 짧아서 이번주차를 내가 잘해낸건지 잘 확신이 안선다.. 피드백이 조금 아쉽다
Try : [새롭게 시도할 점]
카프카에 대해서 좀더 공부해 보아야겠다!
🎉 수료생 추천 할인 혜택 안내 🎉
항해 플러스에 합류하고 싶은데 수강료가 부담되시나요? 🤔
수료생 추천 할인 혜택으로 20만 원 할인을 받을 수 있다는 사실, 알고 계셨나요? 💡
💳 할인 적용 방법
1️⃣ 결제 페이지로 이동
2️⃣ 할인 코드 입력란에 zzy6ui 입력
3️⃣ 추가 할인 20만 원 바로 적용!
놓치지 말고 특별한 혜택 꼭 챙겨가세요! 🚀🌟
'🔔[항해99] > WIL' 카테고리의 다른 글
[항해플러스 백엔드후기] 10주차 최종 회고 - 또다른 항해의 시작 (0) | 2025.03.05 |
---|---|
[항해플러스 백엔드후기] 8주차 회고 - 쿼리튜닝 - ("적절한" + 인덱스) = 0 (0) | 2025.02.17 |
[항해플러스 백엔드후기] 7주차 회고 - 오쪼쪼.. 나의 임시 보관함..캐싱 (0) | 2025.02.13 |
[항해플러스 백엔드후기] 6주차 회고 - 하루하루가 고비 (0) | 2025.02.03 |
[항해플러스 백엔드후기] 5주차/챕터2 회고 - 시작이 반이다 (1) | 2025.01.17 |