ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • kafka (2)
    이커머스 devops 2025. 10. 17. 16:28

    파티션

    • 파티션(Partition)은 큐(메시지를 임시로 저장할 수 있는 공간)를 여러개로 늘려서 병렬 처리 가능하게 하는 기본 단위
      • 각 토픽은 하나 이상의 파티션으로 구성할 수 있다
      • Producer가 특정 토픽에 메시지를 넣으면 파티션에 적절하게 분배된다
      • 하나의 파티션은 동일한 Consumer Group 내에서 하나의 컨슈머에만 할당된다
      • 하나의 컨슈머가 여러 파티션을 처리할 수 있다
      • 하나의 파티션에 할당된 하나의 컨슈머는 메시지를 순서대로 처리한다

     

     

    - springboot consumer 서버 2대 실행

     

     

    - api 연속으로 3번 호출했을 때 consumer 서버는 2대 실행했지만 한 쪽의 consumer 서버에만 메시지 처리 로그가 찍혔다

     

     

    특정 토픽의 파티션 수 조회하기 / 설정하기 / 변경하기

    특정 토픽 파티션 수(세부정보) 조회

    $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic email.send

     

     

    토픽 생성 (+ 파티션 수 설정)

    $ bin/kafka-topics.sh --bootstrap-server <kafka 주소> --create --topic <토픽명> --partitions <파티션 수>
    
    # 예제
    $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test.topic --partitions 3

     

     

    기존 토픽 파티션 수 증가

    # 문법
    $ bin/kafka-topics.sh --bootstrap-server <kafka 주소> --alter --topic <토픽명> --partitions <변경할 최종 파티션 수>
    # 예제
    $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test.topic --partitions 5

    - Kafka에서는 파티션을 줄이는 과정에서 내부적으로 문제(데이터손실, 성능저하 등)가 많이 발생하기 때문에 파티션 수를 늘릴 수 있지만 줄일 수 없다

    - 파티션을 줄이고 싶다면 새로운 토픽을 생성해서 파티션 수를 재설정해야 한다

     

     

    메시지 분배 방법

    • key가 포함되지 않은 메시지를 넣을 경우 : 스티키 파티셔닝(배치단위로 처리하기 위해 하나의 파티션에 메시지가 일정량 채워지면 그다음 파티션에 메시지를 저장) 방식으로 메시지 분배
    • key가 포함된 메시지를 넣을 경우 : key의 해시 값을 기반으로 파티션을 결정해서 메시지를 분배, 같은 key값을 가진 경우 같은 파티션에 들어감

     

     

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic email.send --from-beginning --property print.partition=true

    - 1개의 파티션에만 들어가는 것 확인 : 스티키 파티셔닝 방식

     

     

    spring:
      kafka:
        bootstrap-servers: 43.201.26.204:9092
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          properties:
            partitioner.class: org.apache.kafka.clients.producer.RoundRobinPartitioner

    - producer > application.yml 수정

    - RoundRobinPartitioner 방식

     

     

    여러 개의 컨슈머로 메시지 병렬 적으로 처리

    - 2개의 컨슈머 서버가 토픽의 메시지를 병렬적으로 처리

     

     

    @Service
    public class EmailSendConsumer {
        @KafkaListener(
                topics = "email.send",
                groupId = "email-send-group",
                concurrency = "3"
        )
        @RetryableTopic(
                attempts = "5",
                backoff = @Backoff(delay = 5000, multiplier = 2),
                dltTopicSuffix = ".dlt"
        )
        public void consume(String message) {
            ...
        }
    }

    -  API 요청을 연속으로 5 번 보낸 뒤에 컨슈머 서버의 로그를 확인

    - 하나의 컨슈머 서버에서 하나의 쓰레드가 하나의 파티션에 있는 메시지를 처리

     

     

    적정 파티션 개수 계산

    - 처리가 지연되는 메시지가 생기지 않는 선에서 파티션을 최소로 설정

    - 프로듀서가 보내는 메세지량 <= 하나의 쓰레드가 처리하는 메시지량 * 파티션 수

     

     

    컨슈머가 메시지를 지연 없이 처리하고 있는지 확인하는 방법(Consumer Lag)

    • Lag : 지연된 메시지 수(컨슈머가 아직 처리하지 못한 메시지 수), consumer lag
      • 갑작스럽게 요청이 증가할 때나 컨슈머에 장애가 생겼을 때 컨슈머 랙 발생

     

     

    - consumer server 끄고 메시지 4개 보냈을 때, 4개 lag 생성

    - 현업 모니터링 툴 : Datadog, burrow 

     

     

    노드, 브로커, 컨트롤러, 클러스터, 레플리케이션

    • 노드(node) : 카프카가 설치되어 있는 서버 단위
      • 서비스 장애를 방지하기 위해 실무에서는 최소 3대의 노드를 구성한다
    • 클러스터(cluster) : 여러 대의 서버가 연결되어 하나의 시스템처럼 동작하는 서버들의 집합
    • 브로커(broker) : 메시지를 저장하고 클라이언트의 요청을 처리하는 역할
    • 컨트롤러(controller) : 브로커들간의 연동과 전반적인 클러스트의 상태 총괄하는 역할
      • 카프카 서버는 크게 컨트롤러와 브로커로 구성되어 있다
      • 기본적으로 브로커는 9092번 포트에서 실행, 컨트롤러는 9093번 포트에서 실행
    • 레플리케이션(replication) : 데이터의 안정성과 가용성을 높이기 위해 토픽의 파티션을 여러 노드에 복제하는 것
      • 복제된 파티션은 리더 파티션(원본), 팔로워 파티션(복제본)으로 구분
      • 리더 파티션은 프로듀서나 컨슈머가 직접적으로 메시지를 쓰고 읽는 파티션
      • 팔로워 파티션은 리더 파티션의 메시지를 실시간으로 복제하며 유지
      • 레플리케이션 개수는 카프카 서버 수만큼 설정할 수 있지만 실무에서는 보통 2-3개로 설정해서 활용

     

     

    Kafka 서버 3대  셋팅

    $ cd kafka_2.134.0.0/config
    $ vi server.properties
    # kafka 노드를 식별하는 ID
    node.id = 1
    
    # 클러스터 구성할 컨트롤러의 노드 주소 목록 설정
    # 추가적인 노드 컨트롤러는 19093, 29093 포트에 실행 예정
    controller.quorum.bootstrap.servers={EC2 public IP}:9093, {EC2 public IP}:19093, {EC2 public IP}:29093
    
    # 브로커, 컨트롤러 프로세스 실행시킬 포트 지정
    # 브로커=plaintext, 컨트롤러=controller 지정
    listeners=PLAINTEXT://:9092,CONTROLLER://:9093
    
    # 외부에서 접근할 수 있는 주소
    advertised.listener=PLAINTEXT://{EC2 public IP}:9092,CONTROLLER://{EC2 public IP}:9093
    
    # kafka가 데이터를 저장할 디렉터리 경로 설정
    log.dirs=/tmp/kafka-logs-1

     

     

    # 설정 파일 복사
    $ cp server.properties server2.properties
    $ cp server.properties server3.properties

    - 복사한 설정 파일 똑같이 수정

     

     

    # kafka 디렉터리로 이동
    cd ..
    
    # kafka 종료
    $ bin/kafka-server-stop.sh
    # 처음 실행하는 kafka 노드 실행
    $ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
    $ KAFKA_CONTROLLER_ID="$(bin/kafka-storage.sh random-uuid)"
    
    $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/server.properties --initial-controllers "1@localhost:9093:$KAFKA_CONTROLLER_ID"
    $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/server2.properties --no-initial-controllers
    $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/server3.properties --no-initial-controllers
    
    # kafka 노드 3대 모두 실행 (각 창 띄워서 진행)
    $ bin/kafka-server-start.sh config/server.properties
    $ bin/kafka-server-start.sh config/server2.properties
    $ bin/kafka-server-start.sh config/server3.properties

     

     

     

    # 클러스터에 컨트롤러 등록
    $ bin/kafka-metadata-quorum.sh --command-config config/server2.properties --bootstrap-server localhost:9092 add-controller
    $ bin/kafka-metadata-quorum.sh --command-config config/server3.properties --bootstrap-server localhost:9092 add-controller
    
    # 컨트롤러끼리 연동 확인
    $ bin/kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status

     

     

    Kafka 서버 3대 연동 확인

    # 토픽/레플리케이션 생성
    $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic email.send --partitions 1 --replication-factor 3
    
    # 토픽 세부 정보 조회
    $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic email.send
    $ bin/kafka-topics.sh --bootstrap-server localhost:19092 --describe --topic email.send
    $ bin/kafka-topics.sh --bootstrap-server localhost:29092 --describe --topic email.send

     

     

    # 토픽 세부 정보 조회
    $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic email.send

    • PartitionCount : 해당 토픽의 파티션 수
    • ReplicationFactor : 해당 토픽의 레플리케이션 수
    • Partition : 파티션 번호
    • Leader : 해당 토픽의 리더 파티션을 가지고 있는 노드 id
    • Replicas : 해당 토픽의 파티션을 복제하기로 설정된 노트 id
    • Isf(In-Sync Replicas) : 리더 파티션과 똑같은 상태로 복제(동기화) 완료된 노드 id

     

     

    팔로워 파티션에 메시지 넣기

    - 팔로워 파티션이 있는 노드에 메시지를 넣더라도 잘 들어가는 것을 확인

    - kafka 프로듀서는 메시지를 보내기 전에 해당 파티션의 리더가 누구인지 확인하고 자동으로 리터 파티션에 메시지를 전송한다

    - kafka 노드들끼리 서로 연동되어 있어 리더 파티션이 누군지에 대한 정보를 주고받을 수 있기 때문이다

     

     

    리더 파티션에 장애가 발생할 때

    - 리더 파티션 가지고 있는 노드 조회 후 종료

    - 리더 파티션 노드 2로 자동 변경

     

     

    - 서버 1대 장애 가정 상황에서 메시지 넣고 읽어 들일 수 있음

     

     

    - 장애 가정 서버 복구 후 확인, 1-3번 노드 데이터가 전부 동일하게 동기화 됐음

    - 특정 kafka 서버에 장애가 발생하더라도 시스템 전체가 중단되지 않고 지속적으로 서비스 제공할 수 있다

     

     

    Spring Boot에 Kafka 서버 3대 연결

    spring:
      kafka:
        bootstrap-servers:
          - 43.201.26.204:9092
          - 43.201.26.204:19092
          - 43.201.26.204:29092
        ...

    - spring boot - application.yml 수정

    728x90

    '이커머스 devops' 카테고리의 다른 글

    Jenkins - CI/CD (1)  (0) 2025.11.03
    kafka (3)  (0) 2025.10.17
    Kafka (1)  (0) 2025.10.16
Designed by Tistory.