-
kafka (3)이커머스 devops 2025. 10. 17. 18:09
회원가입 하면 회원가입 축하 이메일 발송
- 회원 가입 기능 - 회원 가입한 사용자 정보 DB에 저장
- 이메일 발송 기능 - 이메일 발송 기록 DB에 저장


- userService 프로젝트 생성
server: port: 8080 spring: kafka: bootstrap-servers: - 43.201.26.204:9092 - 43.201.26.204:19092 - 43.201.26.204:29092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: partitioner.class: org.apache.kafka.clients.producer.RoundRobinPartitioner h2: console: enabled: true datasource: url: jdbc:h2:mem:userDB driver-class-name: org.h2.Driver username: sa password:- application.yml 수정


- localhost:8080/h2-console
- 연동 확인
@Entity @Table(name="users") public class User { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String email; private String name; private String password; public User() { } public User(String email, String name, String password) { this.email = email; this.name = name; this.password = password; } public Long getId() { return id; } public String getEmail() { return email; } public String getName() { return name; } public String getPassword() { return password; } }public interface UserRepository extends JpaRepository<User, Long> { }public class SignUpRequestDto { private String email; private String name; private String password; public String getEmail() { return email; } public String getName() { return name; } public String getPassword() { return password; } }@RestController @RequestMapping("/api/users") public class UserController { private final UserService userService; public UserController(UserService userService) { this.userService = userService; } @GetMapping private ResponseEntity<String> singUp(@RequestBody SignUpRequestDto signUpRequestDto) { userService.signUp(signUpRequestDto); return ResponseEntity.ok("Sign Up Successful"); } }public class UserSignedUpEvent { private Long userId; private String email; private String name; public UserSignedUpEvent(Long userId, String email, String name) { this.userId = userId; this.email = email; this.name = name; } public Long getUserId() { return userId; } public String getEmail() { return email; } public String getName() { return name; } }@Service public class UserService { private final UserRepository userRepository; private final KafkaTemplate<String, String> kafkaTemplate; public UserService(UserRepository userRepository, KafkaTemplate<String, String> kafkaTemplate) { this.userRepository = userRepository; this.kafkaTemplate = kafkaTemplate; } public void signUp(SignUpRequestDto signUpRequestDto) { User user = new User( signUpRequestDto.getEmail(), signUpRequestDto.getName(), signUpRequestDto.getPassword() ); User savedUser = userRepository.save(user); UserSignedUpEvent userSignedUpEvent = new UserSignedUpEvent( savedUser.getId(), savedUser.getEmail(), savedUser.getName() ); this.kafkaTemplate.send("user.signed-up", toJsonString(userSignedUpEvent)); } private String toJsonString(Object object) { ObjectMapper objectMapper = new ObjectMapper(); try { String message = objectMapper.writeValueAsString(object); return message; } catch (JsonProcessingException e) { throw new RuntimeException("JSON 직렬화 실패"); } } }
- emailService 프로젝트 생성
server: port: 8081 spring: kafka: bootstrap-servers: - 43.201.26.204:9092 - 43.201.26.204:19092 - 43.201.26.204:29092 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest h2: console: enabled: true datasource: url: jdbc:h2:mem:emailDB driver-class-name: org.h2.Driver username: sa password:- application.yml 수정


- localhost:8081/h2-console
- 연동 확인
public class UserSignedUpEvent { private Long userId; private String email; private String name; public UserSignedUpEvent() {} public UserSignedUpEvent(Long userId, String email, String name) { this.userId = userId; this.email = email; this.name = name; } public static UserSignedUpEvent fromJson(String json) { try { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(json, UserSignedUpEvent.class); } catch (JsonProcessingException e) { throw new RuntimeException("JSON 파싱 실패"); } } public Long getUserId() { return userId; } public String getEmail() { return email; } public String getName() { return name; } }@Entity @Table(name = "email_logs") public class EmailLog { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private Long receiverUserId; private String receiverEmail; private String subject; public EmailLog() {} public EmailLog(Long receiverUserId, String receiverEmail, String subject) { this.receiverUserId = receiverUserId; this.receiverEmail = receiverEmail; this.subject = subject; } public Long getId() { return id; } public Long getReceiverUserId() { return receiverUserId; } public String getReceiverEmail() { return receiverEmail; } public String getSubject() { return subject; } }public interface EmailLogRepository extends JpaRepository<EmailLog, Long> { }@Service public class UserSignedUpEventConsumer { private EmailLogRepository emailLogRepository; public UserSignedUpEventConsumer(EmailLogRepository emailLogRepository) { this.emailLogRepository = emailLogRepository; } @KafkaListener( topics = "user.signed-up", groupId = "email-service", concurrency = "3" ) @RetryableTopic( attempts = "5", backoff = @Backoff(delay = 1000, multiplier = 2), dltTopicSuffix = ".dlt" ) public void consume(String message) throws InterruptedException { UserSignedUpEvent userSignedUpEvent = UserSignedUpEvent.fromJson(message); // 실제 이메일 발송 로직 생략 String receiverEmail = userSignedUpEvent.getEmail(); String subject = userSignedUpEvent.getName() + "님, 회원 가입을 축하합니다."; Thread.sleep(3000); System.out.println("이메일 발송 완료"); // 이메일 발송 로그 저장 EmailLog emailLog = new EmailLog( userSignedUpEvent.getUserId(), receiverEmail, subject ); emailLogRepository.save(emailLog); } }@Service public class UserSignedUpEventDltConsumer { @KafkaListener( topics = "user.signed-up.dlt", groupId = "email-service" ) public void consume(String message) { // 실제 로직 생략 System.out.println("로그 시스템에 전송 : " + message); System.out.println("SLack 알림 발송"); } }# 서버 3대 백그라운드 실행 $ bin/kafka-server-start.sh -daemon config/server.properties $ bin/kafka-server-start.sh -daemon config/server2.properties $ bin/kafka-server-start.sh -daemon config/server3.properties # 실행 확인 $ lsof -i:9092 $ lsof -i:19092 $ lsof -i:29092

- 기존 토픽 삭제
- 토픽, dlt 토픽 생성
- 생성 후 확인



확인 완료 ! 728x90'이커머스 devops' 카테고리의 다른 글
Jenkins - CI/CD (1) (0) 2025.11.03 kafka (2) (0) 2025.10.17 Kafka (1) (0) 2025.10.16