ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • kafka (3)
    이커머스 devops 2025. 10. 17. 18:09

    회원가입 하면 회원가입 축하 이메일 발송

    • 회원 가입 기능 - 회원 가입한 사용자 정보 DB에 저장
    • 이메일 발송 기능 - 이메일 발송 기록 DB에 저장

     

     

    - userService 프로젝트 생성

     

     

    server:
      port: 8080
    
    spring:
      kafka:
        bootstrap-servers:
          - 43.201.26.204:9092
          - 43.201.26.204:19092
          - 43.201.26.204:29092
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          properties:
            partitioner.class: org.apache.kafka.clients.producer.RoundRobinPartitioner
    
      h2:
        console:
          enabled: true
      datasource:
        url: jdbc:h2:mem:userDB
        driver-class-name: org.h2.Driver
        username: sa
        password:

    - application.yml 수정

     

     

    - localhost:8080/h2-console

    - 연동 확인

     

     

    @Entity
    @Table(name="users")
    public class User {
        @Id
        @GeneratedValue(strategy = GenerationType.IDENTITY)
        private Long id;
    
        private String email;
    
        private String name;
    
        private String password;
    
        public User() {
        }
    
        public User(String email, String name, String password) {
            this.email = email;
            this.name = name;
            this.password = password;
        }
    
        public Long getId() {
            return id;
        }
    
        public String getEmail() {
            return email;
        }
    
        public String getName() {
            return name;
        }
    
        public String getPassword() {
            return password;
        }
    }
    public interface UserRepository extends JpaRepository<User, Long> {
    }
    public class SignUpRequestDto {
        private String email;
    
        private String name;
    
        private String password;
    
        public String getEmail() {
            return email;
        }
    
        public String getName() {
            return name;
        }
    
        public String getPassword() {
            return password;
        }
    }
    @RestController
    @RequestMapping("/api/users")
    public class UserController {
        private final UserService userService;
        public UserController(UserService userService) {
            this.userService = userService;
        }
    
        @GetMapping
        private ResponseEntity<String> singUp(@RequestBody SignUpRequestDto signUpRequestDto) {
            userService.signUp(signUpRequestDto);
            return ResponseEntity.ok("Sign Up Successful");
        }
    }
    public class UserSignedUpEvent {
        private Long userId;
        private String email;
        private String name;
    
        public UserSignedUpEvent(Long userId, String email, String name) {
            this.userId = userId;
            this.email = email;
            this.name = name;
        }
    
        public Long getUserId() {
            return userId;
        }
    
        public String getEmail() {
            return email;
        }
    
        public String getName() {
            return name;
        }
    }
    @Service
    public class UserService {
        private final UserRepository userRepository;
        private final KafkaTemplate<String, String> kafkaTemplate;
        public UserService(UserRepository userRepository, KafkaTemplate<String, String> kafkaTemplate) {
            this.userRepository = userRepository;
            this.kafkaTemplate = kafkaTemplate;
        }
    
        public void signUp(SignUpRequestDto signUpRequestDto) {
            User user = new User(
                    signUpRequestDto.getEmail(),
                    signUpRequestDto.getName(),
                    signUpRequestDto.getPassword()
            );
    
            User savedUser = userRepository.save(user);
    
            UserSignedUpEvent userSignedUpEvent = new UserSignedUpEvent(
                    savedUser.getId(),
                    savedUser.getEmail(),
                    savedUser.getName()
            );
            this.kafkaTemplate.send("user.signed-up", toJsonString(userSignedUpEvent));
        }
    
        private String toJsonString(Object object) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                String message = objectMapper.writeValueAsString(object);
                return message;
            } catch (JsonProcessingException e) {
                throw new RuntimeException("JSON 직렬화 실패");
            }
        }
    }

     

     

    - emailService 프로젝트 생성

     

     

    server:
      port: 8081
    
    spring:
      kafka:
        bootstrap-servers:
          - 43.201.26.204:9092
          - 43.201.26.204:19092
          - 43.201.26.204:29092
        consumer:
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          auto-offset-reset: earliest
    
      h2:
        console:
          enabled: true
      datasource:
        url: jdbc:h2:mem:emailDB
        driver-class-name: org.h2.Driver
        username: sa
        password:

    - application.yml 수정

     

     

    - localhost:8081/h2-console

    - 연동 확인

     

     

    public class UserSignedUpEvent {
        private Long userId;
        private String email;
        private String name;
    
        public UserSignedUpEvent() {}
    
        public UserSignedUpEvent(Long userId, String email, String name) {
            this.userId = userId;
            this.email = email;
            this.name = name;
        }
    
        public static UserSignedUpEvent fromJson(String json) {
            try {
                ObjectMapper mapper = new ObjectMapper();
                return mapper.readValue(json, UserSignedUpEvent.class);
            } catch (JsonProcessingException e) {
                throw new RuntimeException("JSON 파싱 실패");
            }
        }
    
        public Long getUserId() {
            return userId;
        }
    
        public String getEmail() {
            return email;
        }
    
        public String getName() {
            return name;
        }
    }
    @Entity
    @Table(name = "email_logs")
    public class EmailLog {
        @Id
        @GeneratedValue(strategy = GenerationType.IDENTITY)
        private Long id;
        private Long receiverUserId;
        private String receiverEmail;
        private String subject;
    
        public EmailLog() {}
    
        public EmailLog(Long receiverUserId, String receiverEmail, String subject) {
            this.receiverUserId = receiverUserId;
            this.receiverEmail = receiverEmail;
            this.subject = subject;
        }
    
        public Long getId() {
            return id;
        }
    
        public Long getReceiverUserId() {
            return receiverUserId;
        }
    
        public String getReceiverEmail() {
            return receiverEmail;
        }
    
        public String getSubject() {
            return subject;
        }
    }
    public interface EmailLogRepository extends JpaRepository<EmailLog, Long> {
    }
    @Service
    public class UserSignedUpEventConsumer {
        private EmailLogRepository emailLogRepository;
        
        public UserSignedUpEventConsumer(EmailLogRepository emailLogRepository) {
            this.emailLogRepository = emailLogRepository;
        }
    
        @KafkaListener(
                topics = "user.signed-up",
                groupId = "email-service",
                concurrency = "3"
        )
        @RetryableTopic(
                attempts = "5",
                backoff = @Backoff(delay = 1000, multiplier = 2),
                dltTopicSuffix = ".dlt"
        )
        public void consume(String message) throws InterruptedException {
            UserSignedUpEvent userSignedUpEvent = UserSignedUpEvent.fromJson(message);
    
            // 실제 이메일 발송 로직 생략
            String receiverEmail = userSignedUpEvent.getEmail();
            String subject = userSignedUpEvent.getName() + "님, 회원 가입을 축하합니다.";
            Thread.sleep(3000);
            System.out.println("이메일 발송 완료");
    
            // 이메일 발송 로그 저장
            EmailLog emailLog = new EmailLog(
                    userSignedUpEvent.getUserId(),
                    receiverEmail,
                    subject
            );
            emailLogRepository.save(emailLog);
        }
    }
    @Service
    public class UserSignedUpEventDltConsumer {
        @KafkaListener(
                topics = "user.signed-up.dlt",
                groupId = "email-service"
        )
        public void consume(String message) {
            // 실제 로직 생략
            System.out.println("로그 시스템에 전송 : " + message);
            System.out.println("SLack 알림 발송");
        }
    }

     

     

     

    # 서버 3대 백그라운드 실행
    $ bin/kafka-server-start.sh -daemon config/server.properties
    $ bin/kafka-server-start.sh -daemon config/server2.properties
    $ bin/kafka-server-start.sh -daemon config/server3.properties
    
    # 실행 확인
    $ lsof -i:9092
    $ lsof -i:19092
    $ lsof -i:29092

     

     

    - 기존 토픽 삭제

    - 토픽, dlt 토픽 생성

    - 생성 후 확인

     

     

    확인 완료 !

    728x90

    '이커머스 devops' 카테고리의 다른 글

    Jenkins - CI/CD (1)  (0) 2025.11.03
    kafka (2)  (0) 2025.10.17
    Kafka (1)  (0) 2025.10.16
Designed by Tistory.