-
Kafka (1)이커머스 devops 2025. 10. 16. 17:56
Kafka 설치
1. aws EC2 / ubuntu 서버 생성

2. kafka 설치
- kafka 설치하기 위해서는 jdk17 이상 필요
$ sudo apt update $ sudo apt install openjdk-17-jdk $ java -version
$ wget https://dlcdn.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz $ tar -xzf kafka_2.13-4.0.0.tgz #압축풀기
$ export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"- kafka 실행 시 메모리 낮추기
$ sudo dd if=/dev/zero of=/swapfile bs=128M count=16 # 2기가 파일 생성 $ sudo chmod 600 /swapfile # 파일 권한 부여 $ sudo mkswap /swapfile # 파일 > swap 공간 형태로 전환 $ sudo swapon /swapfile # swap 활성화- swap 활용해 메모리 늘리기
$ sudo vi /etc/fstab # fstab 파일에 추가하고 저장 /swapfile swap swap defaults 0 0- 시스템 부팅 할 때마다 자동으로 활성화되도록 파일시스템 수정
$ free
$ vi config/server.properties
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" $ bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties초기 로그 폴더 셋팅
# 포그라운드 kafka 실행 $ bin/kafka-server-start.sh config/server.properties
- ctrl + c : kafka server 종료
# 백그라운드 kafka 실행 $ bin/kafka-server-start.sh -daemon config/server.properties # kafka 서버 실행 중인지 확인 $ sudo lsof -i:9092 # kafka 서버 종료 $ bin/kafka-server-stop.shKafka 기본 구성

- producer : 카프카에 메시지(데이터)를 전달하는 주체
- consumer : 카프카의 메시지(데이터)를 처리하는 주체
- topic : 카프카에 넣을 메시지의 종류를 구분하는 개념
토픽 생성하기 / 조회하기 / 삭제하기
# 토픽 생성 $ bin/kafka-topics.sh \--bootstrap-server localhost:9092 \--create \--topic email.send[2025-10-16 06:18:05,106] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient)
kafka 서버 시작하지 않으면 이런 에러가 뜬다... 바보...

# 토픽 조회 $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list # 토픽 상세 조회 $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic email.send # 토픽 삭제 $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic email.sendKafka에 메시지 넣기 / Kafka에서 메시지 조회하기
# email.send 토픽 생성 $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic email.send # 특정 토픽 메시지 작성 $ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic email.send # 특정 토픽 메시지 전체 조회 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic email.send --from-beginning- kafka는 메시지를 읽고 제거하는 방식이 아니라 저장되어 있는 메시지를 읽기만 하는 방식


- 새로운 창을 열고 메시지 추가해도 실시간으로 조회 가능하다
메시지를 어디까지 읽었는지 기억하고 , 그다음 메시지부 터처리하기 (Consumer Group, Offset)
- 컨슈머 : 카프카의 메시지를 처리하는 주체
- 컨슈머 그룹 : 1개 이상의 컨슈머를 하나의 그룹으로 묶은 단위
- 오프셋 : 메시지의 순서를 나타내는 고유 번호 (0부터 시작)

- 토픽에 저장되어 있는 여러 메시지는 메시지의 순서를 나타내는 고유 번호인 오프셋을 가지고 있다
- 컨슈머 그룹은 어디까지 메시지를 읽었는지에 대한 정보를 알고 있다
- current-offset : 다음에 읽을 오프셋 번호
# 컨슈머 그룹 활용해 메시지 조회 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic email.send --from-beginning --group email-send-group # 컨슈머 그룹 생성 확인 $ bin/kafka-consumer-group.sh --bootstrap-server localhost:9092 --list # 컨슈머 그룹 세부 정보 조회 $ bin/kafka-consumer-group.sh --bootstrap-server localhost:9092 --group email-send-group --describe- --group email-send-group : email-send-group이라는 컨슈머 그룹이 없다면 그룹을 생성하고 해당 그룹의 메시지를 읽는다
- --from-beginning + --group 옵션 : 컨슈머 그룹의 오프셋 기록이 없으면 첫 메시지부터 읽고 기록이 있다면 그 이후 오프셋부터 메시지를 읽는다
- 실제 서비스에서 똑같은 요청을 중복해서 여러 번 처리하면 안 된다 그래서 반드시 컨슈머 그룹을 활용해서 메시지를 읽어야 한다
Spring Boot에 Kakfa 연결

- 스프링부트 프로젝트 생성

- application.yml 수정
@RestController @RequestMapping("/api/emails") public class EmailController { private final EmailService emailService; public EmailController(EmailService emailService) { this.emailService = emailService; } @PostMapping public ResponseEntity<String> sendEmail(@RequestBody SendEmailRequestDto sendEmailRequestDto) { emailService.sendEmail(sendEmailRequestDto); return ResponseEntity.ok("Email sent successfully"); } }@Service public class EmailService { private final KafkaTemplate<String, String> kafkaTemplate; public EmailService(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendEmail(SendEmailRequestDto request) { EmailsendMessage emailsendMessage = new EmailsendMessage(request.getFrom(), request.getTo(), request.getSubject(), request.getBody()); this.kafkaTemplate.send("email.send", toJsonString(emailsendMessage)); } private String toJsonString(Object object) { ObjectMapper objectMapper = new ObjectMapper(); try { String message = objectMapper.writeValueAsString(object); return message; } catch (JsonProcessingException e) { throw new RuntimeException("JSON 직렬화 실패"); } } }public class EmailsendMessage { private String from; private String to; private String subject; private String body; public EmailsendMessage(String from, String to, String subject, String body) { this.from = from; this.to = to; this.subject = subject; this.body = body; } public String getFrom() { return from; } public String getTo() { return to; } public String getSubject() { return subject; } public String getBody() { return body; } }public class SendEmailRequestDto { private String from; private String to; private String subject; private String body; public String getFrom() { return from; } public String getTo() { return to; } public String getSubject() { return subject; } public String getBody() { return body; } }

확인 완료 Spring Boot로 Kafka에서 메시지 조회하기 (Consumer)

- 스프링부트 프로젝트 생성
server: port: 0 spring: kafka: bootstrap-servers: 54.180.85.20:9092 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest- application.yml 수정
@Service public class EmailSendConsumer { @KafkaListener( topics = "email.send", groupId = "email-send-group" ) public void consume(String message) { System.out.println("kafka로부터 받아온 message : " + message); EmailSendMessage emailSendMessage = EmailSendMessage.fromJson(message); // 이메일 발송 로직 생략 System.out.println("이메일 발송 완료"); } }public class EmailSendMessage { private String from; private String to; private String subject; private String body; public EmailSendMessage() { } public EmailSendMessage(String from, String to, String subject, String body) { this.from = from; this.to = to; this.subject = subject; this.body = body; } public static EmailSendMessage fromJson(String json) { try { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(json, EmailSendMessage.class); } catch (JsonProcessingException e) { throw new RuntimeException("Json parsing error", e); } } public String getFrom() { return from; } public String getTo() { return to; } public String getSubject() { return subject; } public String getBody() { return body; } }


Spring Boot로 Kafka에서 처리에 실패한 메시지를 재시도(Retry) 하도록
@Service public class EmailSendConsumer { @KafkaListener( topics = "email.send", groupId = "email-send-group" ) @RetryableTopic( attempts = "5", backoff = @Backoff(delay = 5000, multiplier = 2) ) public void consume(String message) { System.out.println("kafka로부터 받아온 message : " + message); EmailSendMessage emailSendMessage = EmailSendMessage.fromJson(message); if (emailSendMessage.getTo().equals("fail@naver.com")) { System.out.println("잘못된 이메일 주소로 인해 발송 실패"); throw new RuntimeException("잘못된 이메일 주소로 인해 발송 실패"); } // 이메일 발송 로직 생략 try { Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException("이메일 발송 실패"); } System.out.println("이메일 발송 완료"); } }
재시도 실패한 메 시지를 따로 보관 및 사후처리
- Dead Letter Topic : DLT는 나중에 관리자가 오류로 인해 처리할 수 없는 메시지를 임시로 저장하는 토픽
- 실패한 메시지를 DLT 토픽에 저장해 놓기 때문에 실패한 메시지 유실 방지
- 사후 실패 원인 분석 가능
- 실패한 메시지를 수동으로 처리 가능
@Service public class EmailSendConsumer { @KafkaListener( topics = "email.send", groupId = "email-send-group" ) @RetryableTopic( attempts = "5", backoff = @Backoff(delay = 5000, multiplier = 2), dltTopicSuffix = ".dlt" ) public void consume(String message) { ... } }
DLT 저장 메시지를 사후 처리 방식
- 실패 메시지를 로그 시스템에 전송해 장애 원인을 추적할 수 있도록 한다
- DLT에 메시지가 저장되자마자 수동으로 대처할 수 있게 알림을 설정한다
- 알림을 받은 관리자는 로그에 쌓인 내용을 보고 장애 원인을 분석하고 , 그에 맞게 메시지를 수동으로 처리한다
- 메시지를 원래 토픽으로 다시 보내기 (장애가 해결된 경우)
- 메시지 폐기하기
- 잘못된 메시지 내용이 kafka에 들어가지 않게 producer 검증 로직 보완하기
@Service public class EmailSendDltConsumer { @KafkaListener( topics = "email.send.dlt", groupId = "email-send-dlt-group" ) public void consume(String message) { // 로그 시스템 전송 System.out.println("로그 시스템 전송 : " + message); // 알림 발송 System.out.println("Slack 알림 발송"); } }
728x90'이커머스 devops' 카테고리의 다른 글
Jenkins - CI/CD (1) (0) 2025.11.03 kafka (3) (0) 2025.10.17 kafka (2) (0) 2025.10.17