From fa850d50132e6010936dab2e954448fe5583497f Mon Sep 17 00:00:00 2001 From: Kim-Dong-Jun99 Date: Sun, 7 Jan 2024 19:05:37 +0900 Subject: [PATCH] =?UTF-8?q?Feat=20:=20Kafka=20Producer,=20Consumer=20?= =?UTF-8?q?=ED=98=95=EC=8B=9D=EC=9C=BC=EB=A1=9C=20=EC=84=9C=EB=B9=84?= =?UTF-8?q?=EC=8A=A4=20=EB=A1=9C=EC=A7=81=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .http/Trip.http | 2 +- .../trip/dto/request/TripItemAddMsg.java | 2 +- .../request/TripItemVisitDateUpdateMsg.java | 4 +- .../domain/trip/dto/response/TripItemMsg.java | 3 + .../trip/dto/response/TripMemberMsg.java | 1 + .../domain/trip/dto/response/TripPathMsg.java | 3 + .../domain/trip/service/TripItemService.java | 66 +++++++++++++++++-- .../domain/trip/service/TripService.java | 61 ++++++++++++----- ...dPointConstant.java => TopicConstant.java} | 2 +- .../kafka/consumer/KafkaConsumer.java | 36 +++++++++- .../tentenstomp/global/util/TempUtil.java | 4 -- .../tentenstomp/global/util/TopicUtil.java | 18 +++++ 12 files changed, 171 insertions(+), 31 deletions(-) rename src/main/java/org/tenten/tentenstomp/global/common/constant/{EndPointConstant.java => TopicConstant.java} (89%) delete mode 100644 src/main/java/org/tenten/tentenstomp/global/util/TempUtil.java create mode 100644 src/main/java/org/tenten/tentenstomp/global/util/TopicUtil.java diff --git a/.http/Trip.http b/.http/Trip.http index 98841e0..644d2b7 100644 --- a/.http/Trip.http +++ b/.http/Trip.http @@ -14,7 +14,7 @@ Content-Type: application/json "tripName": "success", "tripStatus": "BEFORE", "area": "서울", - "subarea": "강남구", + "subarea": "송파구", "budget": 10000 }, "tripMemberMessage": null, diff --git a/src/main/java/org/tenten/tentenstomp/domain/trip/dto/request/TripItemAddMsg.java b/src/main/java/org/tenten/tentenstomp/domain/trip/dto/request/TripItemAddMsg.java index cd5f1cf..05369ad 100644 --- a/src/main/java/org/tenten/tentenstomp/domain/trip/dto/request/TripItemAddMsg.java +++ b/src/main/java/org/tenten/tentenstomp/domain/trip/dto/request/TripItemAddMsg.java @@ -5,13 +5,13 @@ import java.util.List; public record TripItemAddMsg( + String visitDate, List newTripItems ) { public record TripItemCreateRequest( Long tourItemId, Transportation transportation, Long seqNum, - String visitDate, Long price ) { diff --git a/src/main/java/org/tenten/tentenstomp/domain/trip/dto/request/TripItemVisitDateUpdateMsg.java b/src/main/java/org/tenten/tentenstomp/domain/trip/dto/request/TripItemVisitDateUpdateMsg.java index 7803314..e2e297c 100644 --- a/src/main/java/org/tenten/tentenstomp/domain/trip/dto/request/TripItemVisitDateUpdateMsg.java +++ b/src/main/java/org/tenten/tentenstomp/domain/trip/dto/request/TripItemVisitDateUpdateMsg.java @@ -1,4 +1,6 @@ package org.tenten.tentenstomp.domain.trip.dto.request; -public record TripItemVisitDateUpdateMsg() { +public record TripItemVisitDateUpdateMsg( + String visitDate +) { } diff --git a/src/main/java/org/tenten/tentenstomp/domain/trip/dto/response/TripItemMsg.java b/src/main/java/org/tenten/tentenstomp/domain/trip/dto/response/TripItemMsg.java index ec150cf..e8e30ff 100644 --- a/src/main/java/org/tenten/tentenstomp/domain/trip/dto/response/TripItemMsg.java +++ b/src/main/java/org/tenten/tentenstomp/domain/trip/dto/response/TripItemMsg.java @@ -1,8 +1,11 @@ package org.tenten.tentenstomp.domain.trip.dto.response; +import java.time.LocalDate; import java.util.List; public record TripItemMsg( + Long tripId, + String visitDate, List tripItems ) { } diff --git a/src/main/java/org/tenten/tentenstomp/domain/trip/dto/response/TripMemberMsg.java b/src/main/java/org/tenten/tentenstomp/domain/trip/dto/response/TripMemberMsg.java index 6d559d3..352f539 100644 --- a/src/main/java/org/tenten/tentenstomp/domain/trip/dto/response/TripMemberMsg.java +++ b/src/main/java/org/tenten/tentenstomp/domain/trip/dto/response/TripMemberMsg.java @@ -3,6 +3,7 @@ import java.util.List; public record TripMemberMsg( + Long tripId, List connectedMembers ) { } diff --git a/src/main/java/org/tenten/tentenstomp/domain/trip/dto/response/TripPathMsg.java b/src/main/java/org/tenten/tentenstomp/domain/trip/dto/response/TripPathMsg.java index 5aabf2e..2edc043 100644 --- a/src/main/java/org/tenten/tentenstomp/domain/trip/dto/response/TripPathMsg.java +++ b/src/main/java/org/tenten/tentenstomp/domain/trip/dto/response/TripPathMsg.java @@ -1,8 +1,11 @@ package org.tenten.tentenstomp.domain.trip.dto.response; +import java.time.LocalDate; import java.util.List; public record TripPathMsg( + Long tripId, + String visitDate, List paths ) { } diff --git a/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripItemService.java b/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripItemService.java index bd89565..ccbe97e 100644 --- a/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripItemService.java +++ b/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripItemService.java @@ -5,11 +5,21 @@ import org.springframework.transaction.annotation.Transactional; import org.tenten.tentenstomp.domain.trip.dto.request.TripItemPriceUpdateMsg; import org.tenten.tentenstomp.domain.trip.dto.request.TripItemVisitDateUpdateMsg; +import org.tenten.tentenstomp.domain.trip.dto.response.TripItemMsg; +import org.tenten.tentenstomp.domain.trip.dto.response.TripPathMsg; +import org.tenten.tentenstomp.domain.trip.entity.TripItem; import org.tenten.tentenstomp.domain.trip.repository.TripItemRepository; +import org.tenten.tentenstomp.global.exception.GlobalException; import org.tenten.tentenstomp.global.messaging.kafka.producer.KafkaProducer; import org.tenten.tentenstomp.global.messaging.redis.publisher.RedisPublisher; import org.tenten.tentenstomp.global.util.RedisChannelUtil; +import java.time.LocalDate; + +import static org.springframework.http.HttpStatus.NOT_FOUND; +import static org.tenten.tentenstomp.global.common.constant.TopicConstant.PATH; +import static org.tenten.tentenstomp.global.common.constant.TopicConstant.TRIP_ITEM; + @Service @RequiredArgsConstructor public class TripItemService { @@ -19,19 +29,61 @@ public class TripItemService { private final KafkaProducer kafkaProducer; @Transactional public void updateTripItemPrice(String tripItemId, TripItemPriceUpdateMsg priceUpdateMsg) { - // TODO : /sub/{tripId}/tripItems/{visitDate} + TripItem tripItem = tripItemRepository.findById(Long.parseLong(tripItemId)).orElseThrow(() -> new GlobalException("해당 아이디로 존재하는 tripItem이 없다 " + tripItemId, NOT_FOUND)); + /* + 비즈니스 로직 + */ + TripItemMsg tripItemMsg = new TripItemMsg( + tripItem.getTourItem().getId(), tripItem.getVisitDate().toString(), null + ); + + kafkaProducer.send(TRIP_ITEM, tripItemMsg); } @Transactional public void updateTripItemVisitDate(String tripItemId, TripItemVisitDateUpdateMsg visitDateUpdateMsg) { - // TODO : /sub/{tripId}/tripItems/{oldVisitDate} - // TODO : /sub/{tripId}/tripItems/{newVisitDate} - // TODO : /sub/{tripId}/path/{oldVisitDate} - // TODO : /sub/{tripId}/path/{newVisitDate} + TripItem tripItem = tripItemRepository.findById(Long.parseLong(tripItemId)).orElseThrow(() -> new GlobalException("해당 아이디로 존재하는 tripItem이 없다 " + tripItemId, NOT_FOUND)); + LocalDate pastDate = tripItem.getVisitDate(); + LocalDate newDate = LocalDate.parse(visitDateUpdateMsg.visitDate()); + /* + 비즈니스 로직 + */ + + TripItemMsg tripItemMsgToPastDate = new TripItemMsg( + tripItem.getTourItem().getId(), pastDate.toString(), null + ); + TripItemMsg tripItemMsgToNewDate = new TripItemMsg( + tripItem.getTourItem().getId(), newDate.toString(), null + ); + + TripPathMsg tripPathMsgToPastDate = new TripPathMsg( + tripItem.getTourItem().getId(), pastDate.toString(), null + ); + TripPathMsg tripPathMsgToNewDate = new TripPathMsg( + tripItem.getTourItem().getId(), newDate.toString(), null + ); + + kafkaProducer.send(TRIP_ITEM, tripItemMsgToPastDate); + kafkaProducer.send(TRIP_ITEM, tripItemMsgToNewDate); + kafkaProducer.send(PATH, tripPathMsgToPastDate); + kafkaProducer.send(PATH, tripPathMsgToNewDate); } @Transactional public void deleteTripItem(String tripItemId) { - // TODO : /sub/{tripId}/tripItems/{visitDate} - // TODO : /sub/{tripId}/path/{visitDate} + TripItem tripItem = tripItemRepository.findById(Long.parseLong(tripItemId)).orElseThrow(() -> new GlobalException("해당 아이디로 존재하는 tripItem이 없다 " + tripItemId, NOT_FOUND)); + LocalDate visitDate = tripItem.getVisitDate(); + /* + 비즈니스 로직 + */ + TripItemMsg tripItemMsg = new TripItemMsg( + Long.parseLong(tripItemId), visitDate.toString(), null + ); + TripPathMsg tripPathMsg = new TripPathMsg( + Long.parseLong(tripItemId), visitDate.toString(), null + ); + + kafkaProducer.send(TRIP_ITEM, tripItemMsg); + kafkaProducer.send(PATH, tripPathMsg); + } } diff --git a/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripService.java b/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripService.java index ffadb58..1cfaf1c 100644 --- a/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripService.java +++ b/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripService.java @@ -6,6 +6,9 @@ import org.springframework.transaction.annotation.Transactional; import org.tenten.tentenstomp.domain.trip.dto.request.*; import org.tenten.tentenstomp.domain.trip.dto.response.TripInfoMsg; +import org.tenten.tentenstomp.domain.trip.dto.response.TripItemMsg; +import org.tenten.tentenstomp.domain.trip.dto.response.TripMemberMsg; +import org.tenten.tentenstomp.domain.trip.dto.response.TripPathMsg; import org.tenten.tentenstomp.domain.trip.entity.Trip; import org.tenten.tentenstomp.domain.trip.repository.TripItemRepository; import org.tenten.tentenstomp.global.messaging.kafka.producer.KafkaProducer; @@ -14,11 +17,12 @@ import org.tenten.tentenstomp.global.response.GlobalStompResponse; import org.tenten.tentenstomp.global.util.RedisChannelUtil; +import java.time.LocalDate; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import static org.tenten.tentenstomp.global.common.constant.EndPointConstant.*; +import static org.tenten.tentenstomp.global.common.constant.TopicConstant.*; @Service @RequiredArgsConstructor @@ -36,40 +40,67 @@ public class TripService { @Transactional public void updateTrip(String tripId, TripUpdateMsg tripUpdateMsg) { Trip trip = tripRepository.getReferenceById(Long.parseLong(tripId)); - ChannelTopic topic = redisChannelUtil.getChannelTopic(tripId, TRIP_INFO); TripInfoMsg tripResponseMsg = trip.changeTripInfo(tripUpdateMsg); tripRepository.save(trip); - redisPublisher.publish(topic, GlobalStompResponse.ok(tripResponseMsg)); // 해당 여정의 토픽을 찾아야함, + + kafkaProducer.send(TRIP_INFO, tripResponseMsg); } @Transactional public void addTripItem(String tripId, TripItemAddMsg tripItemAddMsg) { Trip trip = tripRepository.getReferenceById(Long.parseLong(tripId)); - ChannelTopic tripItemTopic = redisChannelUtil.getChannelTopic(tripId, tripItemAddMsg.newTripItems().get(0).visitDate(), TRIP_ITEM); - ChannelTopic pathTopic = redisChannelUtil.getChannelTopic(tripId, tripItemAddMsg.newTripItems().get(0).visitDate(), PATH); + /* + 비즈니스 로직 + */ - // TODO : /sub/{tripId}/tripItems/{visitDate} - // TODO : /sub/{tripId}/path/{visitDate} + TripItemMsg tripItemMsg = new TripItemMsg( + Long.parseLong(tripId), LocalDate.parse(tripItemAddMsg.visitDate()).toString(), null + ); + TripPathMsg tripPathMsg = new TripPathMsg( + Long.parseLong(tripId), LocalDate.parse(tripItemAddMsg.visitDate()).toString(), null + ); + kafkaProducer.send(TRIP_ITEM, tripItemMsg); + kafkaProducer.send(PATH, tripPathMsg); } @Transactional public void updateTripItemOrder(String tripId, TripItemOrderUpdateMsg orderUpdateMsg) { - // TODO : /sub/{tripId}/tripItems/{visitDate} - // TODO : /sub/{tripId}/path/{visitDate} - ChannelTopic tripItemTopic = redisChannelUtil.getChannelTopic(tripId, orderUpdateMsg.visitDate(), TRIP_ITEM); - ChannelTopic pathTopic = redisChannelUtil.getChannelTopic(tripId, orderUpdateMsg.visitDate(), PATH); + /* + 비즈니스 로직 + */ + TripItemMsg tripItemMsg = new TripItemMsg( + Long.parseLong(tripId), LocalDate.parse(orderUpdateMsg.visitDate()).toString(), null + ); + TripPathMsg tripPathMsg = new TripPathMsg( + Long.parseLong(tripId), LocalDate.parse(orderUpdateMsg.visitDate()).toString(), null + ); + kafkaProducer.send(TRIP_ITEM, tripItemMsg); + kafkaProducer.send(PATH, tripPathMsg); } + @Transactional(readOnly = true) public void connectMember(String tripId, MemberConnectMsg memberConnectMsg) { - // TODO: /sub/{tripId}/connectedMember - ChannelTopic memberTopic = redisChannelUtil.getChannelTopic(tripId, MEMBER); + /* + 비즈니스 로직 + */ + TripMemberMsg tripMemberMsg = new TripMemberMsg( + Long.parseLong(tripId), null + ); + + kafkaProducer.send(MEMBER, tripMemberMsg); } @Transactional(readOnly = true) public void disconnectMember(String tripId, MemberDisconnectMsg memberDisconnectMsg) { - // TODO: /sub/{tripId}/connectedMember - ChannelTopic memberTopic = redisChannelUtil.getChannelTopic(tripId, MEMBER); + /* + 비즈니스 로직 + */ + TripMemberMsg tripMemberMsg = new TripMemberMsg( + Long.parseLong(tripId), null + ); + + kafkaProducer.send(MEMBER, tripMemberMsg); } diff --git a/src/main/java/org/tenten/tentenstomp/global/common/constant/EndPointConstant.java b/src/main/java/org/tenten/tentenstomp/global/common/constant/TopicConstant.java similarity index 89% rename from src/main/java/org/tenten/tentenstomp/global/common/constant/EndPointConstant.java rename to src/main/java/org/tenten/tentenstomp/global/common/constant/TopicConstant.java index c32cf2b..289df3c 100644 --- a/src/main/java/org/tenten/tentenstomp/global/common/constant/EndPointConstant.java +++ b/src/main/java/org/tenten/tentenstomp/global/common/constant/TopicConstant.java @@ -1,6 +1,6 @@ package org.tenten.tentenstomp.global.common.constant; -public class EndPointConstant { +public class TopicConstant { public static final String TRIP_INFO = "info"; public static final String TRIP_ITEM = "tripItems"; public static final String PATH = "path"; diff --git a/src/main/java/org/tenten/tentenstomp/global/messaging/kafka/consumer/KafkaConsumer.java b/src/main/java/org/tenten/tentenstomp/global/messaging/kafka/consumer/KafkaConsumer.java index f8abf64..427965e 100644 --- a/src/main/java/org/tenten/tentenstomp/global/messaging/kafka/consumer/KafkaConsumer.java +++ b/src/main/java/org/tenten/tentenstomp/global/messaging/kafka/consumer/KafkaConsumer.java @@ -2,19 +2,53 @@ import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.internals.Topic; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.simp.SimpMessageSendingOperations; import org.springframework.stereotype.Service; import org.tenten.tentenstomp.domain.trip.dto.request.TripUpdateMsg; +import org.tenten.tentenstomp.domain.trip.dto.response.TripInfoMsg; +import org.tenten.tentenstomp.domain.trip.dto.response.TripItemMsg; +import org.tenten.tentenstomp.domain.trip.dto.response.TripMemberMsg; +import org.tenten.tentenstomp.domain.trip.dto.response.TripPathMsg; +import org.tenten.tentenstomp.global.common.constant.TopicConstant; +import org.tenten.tentenstomp.global.response.GlobalStompResponse; +import org.tenten.tentenstomp.global.util.TopicUtil; + +import java.time.LocalDate; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.tenten.tentenstomp.global.common.constant.TopicConstant.*; @Service @RequiredArgsConstructor public class KafkaConsumer { private final SimpMessageSendingOperations messagingTemplate; + private final TopicUtil topicUtil; - @KafkaListener(topics = "kafka", groupId = ConsumerConfig.GROUP_ID_CONFIG) + @KafkaListener(topics = "kafka", groupId = GROUP_ID_CONFIG) public void consumeTest(TripUpdateMsg data) { messagingTemplate.convertAndSend("/sub/kafka", data); } + + @KafkaListener(topics = TRIP_INFO, groupId = GROUP_ID_CONFIG) + public void updateTripInfo(TripInfoMsg tripInfoMsg) { + messagingTemplate.convertAndSend(topicUtil.topicToReturnEndPoint(tripInfoMsg.tripId(), TRIP_INFO), GlobalStompResponse.ok(tripInfoMsg)); + } + + @KafkaListener(topics = TRIP_ITEM, groupId = GROUP_ID_CONFIG) + public void updateTripItem(TripItemMsg tripItemMsg) { + messagingTemplate.convertAndSend(topicUtil.topicToReturnEndPoint(tripItemMsg.tripId(), TRIP_ITEM, LocalDate.parse(tripItemMsg.visitDate())), GlobalStompResponse.ok(tripItemMsg)); + } + + @KafkaListener(topics = PATH, groupId = GROUP_ID_CONFIG) + public void updateTripPath(TripPathMsg tripPathMsg) { + messagingTemplate.convertAndSend(topicUtil.topicToReturnEndPoint(tripPathMsg.tripId(), PATH, LocalDate.parse(tripPathMsg.visitDate())), GlobalStompResponse.ok(tripPathMsg)); + } + + @KafkaListener(topics = MEMBER, groupId = GROUP_ID_CONFIG) + public void updateConnectedTripMember(TripMemberMsg tripMemberMsg) { + messagingTemplate.convertAndSend(topicUtil.topicToReturnEndPoint(tripMemberMsg.tripId(), MEMBER), GlobalStompResponse.ok(tripMemberMsg)); + } } diff --git a/src/main/java/org/tenten/tentenstomp/global/util/TempUtil.java b/src/main/java/org/tenten/tentenstomp/global/util/TempUtil.java deleted file mode 100644 index 8f04284..0000000 --- a/src/main/java/org/tenten/tentenstomp/global/util/TempUtil.java +++ /dev/null @@ -1,4 +0,0 @@ -package org.tenten.tentenstomp.global.util; - -public class TempUtil { -} diff --git a/src/main/java/org/tenten/tentenstomp/global/util/TopicUtil.java b/src/main/java/org/tenten/tentenstomp/global/util/TopicUtil.java new file mode 100644 index 0000000..f2867ae --- /dev/null +++ b/src/main/java/org/tenten/tentenstomp/global/util/TopicUtil.java @@ -0,0 +1,18 @@ +package org.tenten.tentenstomp.global.util; + +import org.springframework.stereotype.Component; + +import java.time.LocalDate; + +@Component +public class TopicUtil { + private final String BASE_URL = "/sub"; + + public String topicToReturnEndPoint(Long tripId, String endPoint, LocalDate visitDate) { + return BASE_URL + "/" + tripId + "/" + endPoint + "/" + visitDate; + } + + public String topicToReturnEndPoint(Long tripId, String endPoint) { + return BASE_URL + "/" + tripId + "/" + endPoint; + } +}