ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka (1)
    이커머스 devops 2025. 10. 16. 17:56

    Kafka  설치

    1. aws EC2 / ubuntu 서버 생성

     

     

    2. kafka 설치

    - kafka 설치하기 위해서는 jdk17 이상 필요

    $ sudo apt update
    $ sudo apt install openjdk-17-jdk
    $ java -version

     

     

    $ wget https://dlcdn.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz
    $ tar -xzf kafka_2.13-4.0.0.tgz #압축풀기

     

     

    $ export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"

    - kafka 실행 시 메모리 낮추기

     

     

    $ sudo dd if=/dev/zero of=/swapfile bs=128M count=16 # 2기가 파일 생성
    $ sudo chmod 600 /swapfile                           # 파일 권한 부여
    $ sudo mkswap /swapfile                              # 파일 > swap 공간 형태로 전환
    $ sudo swapon /swapfile                              # swap 활성화

    - swap 활용해 메모리 늘리기

     

     

    $ sudo vi /etc/fstab
    # fstab 파일에 추가하고 저장
    /swapfile swap swap defaults 0 0

    - 시스템 부팅 할 때마다 자동으로 활성화되도록 파일시스템 수정

     

     

    $ free

     

     

    $ vi config/server.properties

     

     

    $ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
    $ bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

    초기 로그 폴더 셋팅

     

     

    # 포그라운드 kafka 실행
    $ bin/kafka-server-start.sh config/server.properties

    - ctrl + c : kafka server 종료

     

     

    # 백그라운드 kafka 실행
    $ bin/kafka-server-start.sh -daemon config/server.properties 
    
    # kafka 서버 실행 중인지 확인
    $ sudo lsof -i:9092
    
    # kafka 서버 종료
    $ bin/kafka-server-stop.sh

     

     

    Kafka 기본 구성

    • producer : 카프카에 메시지(데이터)를 전달하는 주체
    • consumer : 카프카의 메시지(데이터)를 처리하는 주체
    • topic : 카프카에 넣을 메시지의 종류를 구분하는 개념

     

     

    토픽 생성하기 / 조회하기 / 삭제하기

    # 토픽 생성
    $ bin/kafka-topics.sh \--bootstrap-server localhost:9092 \--create \--topic email.send

     

     

    [2025-10-16 06:18:05,106] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient)

    kafka 서버 시작하지 않으면 이런 에러가 뜬다... 바보...

     

     

     

     

    # 토픽 조회
    $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
    
    # 토픽 상세 조회
    $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic email.send
    
    # 토픽 삭제
    $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic email.send

     

     

    Kafka에 메시지 넣기 / Kafka에서 메시지 조회하기

    # email.send 토픽 생성
    $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic email.send
    
    # 특정 토픽 메시지 작성
    $ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic email.send
    
    # 특정 토픽 메시지 전체 조회
    $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic email.send --from-beginning

    - kafka는 메시지를 읽고 제거하는 방식이 아니라 저장되어 있는 메시지를 읽기만 하는 방식

     

     

    - 새로운 창을 열고 메시지 추가해도 실시간으로 조회 가능하다 

     

     

    메시지를 어디까지 읽었는지 기억하고 , 그다음 메시지부 터처리하기 (Consumer Group, Offset)

    • 컨슈머 : 카프카의 메시지를 처리하는 주체
    • 컨슈머 그룹 : 1개 이상의 컨슈머를 하나의 그룹으로 묶은 단위
    • 오프셋 : 메시지의 순서를 나타내는 고유 번호 (0부터 시작)

     

     

    • 토픽에 저장되어 있는 여러 메시지는 메시지의 순서를 나타내는 고유 번호인 오프셋을 가지고 있다
    • 컨슈머 그룹은 어디까지 메시지를 읽었는지에 대한 정보를 알고 있다
      • current-offset : 다음에 읽을 오프셋 번호

     

     

    # 컨슈머 그룹 활용해 메시지 조회
    $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic email.send --from-beginning --group email-send-group
    
    # 컨슈머 그룹 생성 확인
    $ bin/kafka-consumer-group.sh --bootstrap-server localhost:9092 --list
    
    # 컨슈머 그룹 세부 정보 조회
    $ bin/kafka-consumer-group.sh --bootstrap-server localhost:9092 --group email-send-group --describe
    • --group email-send-group : email-send-group이라는 컨슈머 그룹이 없다면 그룹을 생성하고 해당 그룹의 메시지를 읽는다
    • --from-beginning + --group 옵션 : 컨슈머 그룹의 오프셋 기록이 없으면 첫 메시지부터 읽고 기록이 있다면 그 이후 오프셋부터 메시지를 읽는다
    • 실제 서비스에서 똑같은 요청을 중복해서 여러 번 처리하면 안 된다 그래서 반드시 컨슈머 그룹을 활용해서 메시지를 읽어야 한다

     

     

    Spring Boot에 Kakfa 연결

    - 스프링부트 프로젝트 생성

     

     

    - application.yml 수정

     

     

    @RestController
    @RequestMapping("/api/emails")
    public class EmailController {
        private final EmailService emailService;
    
        public EmailController(EmailService emailService) {
            this.emailService = emailService;
        }
    
        @PostMapping
        public ResponseEntity<String> sendEmail(@RequestBody SendEmailRequestDto sendEmailRequestDto) {
            emailService.sendEmail(sendEmailRequestDto);
            return ResponseEntity.ok("Email sent successfully");
        }
    }
    @Service
    public class EmailService {
        private final KafkaTemplate<String, String> kafkaTemplate;
    
        public EmailService(KafkaTemplate<String, String> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }
    
        public void sendEmail(SendEmailRequestDto request) {
            EmailsendMessage emailsendMessage = new EmailsendMessage(request.getFrom(), request.getTo(), request.getSubject(), request.getBody());
            this.kafkaTemplate.send("email.send", toJsonString(emailsendMessage));
        }
    
        private String toJsonString(Object object) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                String message = objectMapper.writeValueAsString(object);
                return message;
            } catch (JsonProcessingException e) {
                throw new RuntimeException("JSON 직렬화 실패");
            }
        }
    }
    public class EmailsendMessage {
        private String from;
        private String to;
        private String subject;
        private String body;
    
        public EmailsendMessage(String from, String to, String subject, String body) {
            this.from = from;
            this.to = to;
            this.subject = subject;
            this.body = body;
        }
    
        public String getFrom() {
            return from;
        }
    
        public String getTo() {
            return to;
        }
    
        public String getSubject() {
            return subject;
        }
    
        public String getBody() {
            return body;
        }
    }
    public class SendEmailRequestDto {
        private String from;
        private String to;
        private String subject;
        private String body;
    
        public String getFrom() {
            return from;
        }
    
        public String getTo() {
            return to;
        }
    
        public String getSubject() {
            return subject;
        }
    
        public String getBody() {
            return body;
        }
    }

     

     

    확인 완료

     

     

    Spring Boot로 Kafka에서 메시지 조회하기 (Consumer)

    - 스프링부트 프로젝트 생성

     

     

    server:
      port: 0
    
    spring:
      kafka:
        bootstrap-servers: 54.180.85.20:9092
        consumer:
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
          auto-offset-reset: earliest

    - application.yml 수정

     

     

    @Service
    public class EmailSendConsumer {
        @KafkaListener(
                topics = "email.send",
                groupId = "email-send-group"
        )
        public void consume(String message) {
            System.out.println("kafka로부터 받아온 message : " + message);
            EmailSendMessage emailSendMessage = EmailSendMessage.fromJson(message);
            
            // 이메일 발송 로직 생략
    
            System.out.println("이메일 발송 완료");
        }
    }
    public class EmailSendMessage {
        private String from;
        private String to;
        private String subject;
        private String body;
    
        public EmailSendMessage() {
        }
    
        public EmailSendMessage(String from, String to, String subject, String body) {
            this.from = from;
            this.to = to;
            this.subject = subject;
            this.body = body;
        }
    
        public static EmailSendMessage fromJson(String json) {
            try {
                ObjectMapper mapper = new ObjectMapper();
                return mapper.readValue(json, EmailSendMessage.class);
            } catch (JsonProcessingException e) {
                throw new RuntimeException("Json parsing error", e);
            }
        }
    
        public String getFrom() {
            return from;
        }
    
        public String getTo() {
            return to;
        }
    
        public String getSubject() {
            return subject;
        }
    
        public String getBody() {
            return body;
        }
    }

     

     

     

     

    Spring Boot로 Kafka에서 처리에 실패한 메시지를 재시도(Retry) 하도록

    @Service
    public class EmailSendConsumer {
        @KafkaListener(
                topics = "email.send",
                groupId = "email-send-group"
        )
        @RetryableTopic(
                attempts = "5",
                backoff = @Backoff(delay = 5000, multiplier = 2)
        )
        public void consume(String message) {
            System.out.println("kafka로부터 받아온 message : " + message);
            EmailSendMessage emailSendMessage = EmailSendMessage.fromJson(message);
    
            if (emailSendMessage.getTo().equals("fail@naver.com")) {
                System.out.println("잘못된 이메일 주소로 인해 발송 실패");
                throw new RuntimeException("잘못된 이메일 주소로 인해 발송 실패");
            }
            
            // 이메일 발송 로직 생략
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException("이메일 발송 실패");
            }
    
            System.out.println("이메일 발송 완료");
        }
    }

     

     

    재시도 실패한 메 시지를 따로 보관 및 사후처리

    • Dead Letter Topic : DLT는 나중에 관리자가 오류로 인해 처리할 수 없는 메시지를 임시로 저장하는 토픽
      • 실패한 메시지를 DLT 토픽에 저장해 놓기 때문에 실패한 메시지 유실 방지
      • 사후 실패 원인 분석 가능
      • 실패한 메시지를 수동으로 처리 가능

     

     

    @Service
    public class EmailSendConsumer {
        @KafkaListener(
                topics = "email.send",
                groupId = "email-send-group"
        )
        @RetryableTopic(
                attempts = "5",
                backoff = @Backoff(delay = 5000, multiplier = 2),
                dltTopicSuffix = ".dlt"
        )
        public void consume(String message) {
            ...
        }
    }

     

     

    DLT 저장 메시지를 사후 처리 방식

    • 실패 메시지를 로그 시스템에 전송해 장애 원인을 추적할 수 있도록 한다
    • DLT에 메시지가 저장되자마자 수동으로 대처할 수 있게 알림을 설정한다
    • 알림을 받은 관리자는 로그에 쌓인 내용을 보고 장애 원인을 분석하고 , 그에 맞게 메시지를 수동으로 처리한다
      • 메시지를 원래 토픽으로 다시 보내기 (장애가 해결된 경우)
      • 메시지 폐기하기
      • 잘못된 메시지 내용이 kafka에 들어가지 않게 producer 검증 로직 보완하기

     

     

    @Service
    public class EmailSendDltConsumer {
        @KafkaListener(
                topics = "email.send.dlt",
                groupId = "email-send-dlt-group"
        )
        public void consume(String message) {
            // 로그 시스템 전송
            System.out.println("로그 시스템 전송 : "  + message);
            
            // 알림 발송
            System.out.println("Slack 알림 발송");
        }
    }

    728x90

    '이커머스 devops' 카테고리의 다른 글

    Jenkins - CI/CD (1)  (0) 2025.11.03
    kafka (3)  (0) 2025.10.17
    kafka (2)  (0) 2025.10.17
Designed by Tistory.