Skip to content

Commit

Permalink
Feat : Kafka Producer, Consumer 형식으로 서비스 로직 수정
Browse files Browse the repository at this point in the history
  • Loading branch information
Kim-Dong-Jun99 committed Jan 7, 2024
1 parent a08faa5 commit fa850d5
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .http/Trip.http
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Content-Type: application/json
"tripName": "success",
"tripStatus": "BEFORE",
"area": "서울",
"subarea": "강남구",
"subarea": "송파구",
"budget": 10000
},
"tripMemberMessage": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import java.util.List;

public record TripItemAddMsg(
String visitDate,
List<TripItemCreateRequest> newTripItems
) {
public record TripItemCreateRequest(
Long tourItemId,
Transportation transportation,
Long seqNum,
String visitDate,
Long price
) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
package org.tenten.tentenstomp.domain.trip.dto.request;

public record TripItemVisitDateUpdateMsg() {
public record TripItemVisitDateUpdateMsg(
String visitDate
) {
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.tenten.tentenstomp.domain.trip.dto.response;

import java.time.LocalDate;
import java.util.List;

public record TripItemMsg(
Long tripId,
String visitDate,
List<TripItemInfoMsg> tripItems
) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.List;

public record TripMemberMsg(
Long tripId,
List<TripMemberInfoMsg> connectedMembers
) {
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.tenten.tentenstomp.domain.trip.dto.response;

import java.time.LocalDate;
import java.util.List;

public record TripPathMsg(
Long tripId,
String visitDate,
List<TripPathInfoMsg> paths
) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,21 @@
import org.springframework.transaction.annotation.Transactional;
import org.tenten.tentenstomp.domain.trip.dto.request.TripItemPriceUpdateMsg;
import org.tenten.tentenstomp.domain.trip.dto.request.TripItemVisitDateUpdateMsg;
import org.tenten.tentenstomp.domain.trip.dto.response.TripItemMsg;
import org.tenten.tentenstomp.domain.trip.dto.response.TripPathMsg;
import org.tenten.tentenstomp.domain.trip.entity.TripItem;
import org.tenten.tentenstomp.domain.trip.repository.TripItemRepository;
import org.tenten.tentenstomp.global.exception.GlobalException;
import org.tenten.tentenstomp.global.messaging.kafka.producer.KafkaProducer;
import org.tenten.tentenstomp.global.messaging.redis.publisher.RedisPublisher;
import org.tenten.tentenstomp.global.util.RedisChannelUtil;

import java.time.LocalDate;

import static org.springframework.http.HttpStatus.NOT_FOUND;
import static org.tenten.tentenstomp.global.common.constant.TopicConstant.PATH;
import static org.tenten.tentenstomp.global.common.constant.TopicConstant.TRIP_ITEM;

@Service
@RequiredArgsConstructor
public class TripItemService {
Expand All @@ -19,19 +29,61 @@ public class TripItemService {
private final KafkaProducer kafkaProducer;
@Transactional
public void updateTripItemPrice(String tripItemId, TripItemPriceUpdateMsg priceUpdateMsg) {
// TODO : /sub/{tripId}/tripItems/{visitDate}
TripItem tripItem = tripItemRepository.findById(Long.parseLong(tripItemId)).orElseThrow(() -> new GlobalException("해당 아이디로 존재하는 tripItem이 없다 " + tripItemId, NOT_FOUND));
/*
비즈니스 로직
*/
TripItemMsg tripItemMsg = new TripItemMsg(
tripItem.getTourItem().getId(), tripItem.getVisitDate().toString(), null
);

kafkaProducer.send(TRIP_ITEM, tripItemMsg);

}
@Transactional
public void updateTripItemVisitDate(String tripItemId, TripItemVisitDateUpdateMsg visitDateUpdateMsg) {
// TODO : /sub/{tripId}/tripItems/{oldVisitDate}
// TODO : /sub/{tripId}/tripItems/{newVisitDate}
// TODO : /sub/{tripId}/path/{oldVisitDate}
// TODO : /sub/{tripId}/path/{newVisitDate}
TripItem tripItem = tripItemRepository.findById(Long.parseLong(tripItemId)).orElseThrow(() -> new GlobalException("해당 아이디로 존재하는 tripItem이 없다 " + tripItemId, NOT_FOUND));
LocalDate pastDate = tripItem.getVisitDate();
LocalDate newDate = LocalDate.parse(visitDateUpdateMsg.visitDate());
/*
비즈니스 로직
*/

TripItemMsg tripItemMsgToPastDate = new TripItemMsg(
tripItem.getTourItem().getId(), pastDate.toString(), null
);
TripItemMsg tripItemMsgToNewDate = new TripItemMsg(
tripItem.getTourItem().getId(), newDate.toString(), null
);

TripPathMsg tripPathMsgToPastDate = new TripPathMsg(
tripItem.getTourItem().getId(), pastDate.toString(), null
);
TripPathMsg tripPathMsgToNewDate = new TripPathMsg(
tripItem.getTourItem().getId(), newDate.toString(), null
);

kafkaProducer.send(TRIP_ITEM, tripItemMsgToPastDate);
kafkaProducer.send(TRIP_ITEM, tripItemMsgToNewDate);
kafkaProducer.send(PATH, tripPathMsgToPastDate);
kafkaProducer.send(PATH, tripPathMsgToNewDate);
}
@Transactional
public void deleteTripItem(String tripItemId) {
// TODO : /sub/{tripId}/tripItems/{visitDate}
// TODO : /sub/{tripId}/path/{visitDate}
TripItem tripItem = tripItemRepository.findById(Long.parseLong(tripItemId)).orElseThrow(() -> new GlobalException("해당 아이디로 존재하는 tripItem이 없다 " + tripItemId, NOT_FOUND));
LocalDate visitDate = tripItem.getVisitDate();
/*
비즈니스 로직
*/
TripItemMsg tripItemMsg = new TripItemMsg(
Long.parseLong(tripItemId), visitDate.toString(), null
);
TripPathMsg tripPathMsg = new TripPathMsg(
Long.parseLong(tripItemId), visitDate.toString(), null
);

kafkaProducer.send(TRIP_ITEM, tripItemMsg);
kafkaProducer.send(PATH, tripPathMsg);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import org.springframework.transaction.annotation.Transactional;
import org.tenten.tentenstomp.domain.trip.dto.request.*;
import org.tenten.tentenstomp.domain.trip.dto.response.TripInfoMsg;
import org.tenten.tentenstomp.domain.trip.dto.response.TripItemMsg;
import org.tenten.tentenstomp.domain.trip.dto.response.TripMemberMsg;
import org.tenten.tentenstomp.domain.trip.dto.response.TripPathMsg;
import org.tenten.tentenstomp.domain.trip.entity.Trip;
import org.tenten.tentenstomp.domain.trip.repository.TripItemRepository;
import org.tenten.tentenstomp.global.messaging.kafka.producer.KafkaProducer;
Expand All @@ -14,11 +17,12 @@
import org.tenten.tentenstomp.global.response.GlobalStompResponse;
import org.tenten.tentenstomp.global.util.RedisChannelUtil;

import java.time.LocalDate;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import static org.tenten.tentenstomp.global.common.constant.EndPointConstant.*;
import static org.tenten.tentenstomp.global.common.constant.TopicConstant.*;

@Service
@RequiredArgsConstructor
Expand All @@ -36,40 +40,67 @@ public class TripService {
@Transactional
public void updateTrip(String tripId, TripUpdateMsg tripUpdateMsg) {
Trip trip = tripRepository.getReferenceById(Long.parseLong(tripId));
ChannelTopic topic = redisChannelUtil.getChannelTopic(tripId, TRIP_INFO);

TripInfoMsg tripResponseMsg = trip.changeTripInfo(tripUpdateMsg);
tripRepository.save(trip);
redisPublisher.publish(topic, GlobalStompResponse.ok(tripResponseMsg)); // 해당 여정의 토픽을 찾아야함,

kafkaProducer.send(TRIP_INFO, tripResponseMsg);
}
@Transactional
public void addTripItem(String tripId, TripItemAddMsg tripItemAddMsg) {
Trip trip = tripRepository.getReferenceById(Long.parseLong(tripId));
ChannelTopic tripItemTopic = redisChannelUtil.getChannelTopic(tripId, tripItemAddMsg.newTripItems().get(0).visitDate(), TRIP_ITEM);
ChannelTopic pathTopic = redisChannelUtil.getChannelTopic(tripId, tripItemAddMsg.newTripItems().get(0).visitDate(), PATH);
/*
비즈니스 로직
*/

// TODO : /sub/{tripId}/tripItems/{visitDate}
// TODO : /sub/{tripId}/path/{visitDate}
TripItemMsg tripItemMsg = new TripItemMsg(
Long.parseLong(tripId), LocalDate.parse(tripItemAddMsg.visitDate()).toString(), null
);
TripPathMsg tripPathMsg = new TripPathMsg(
Long.parseLong(tripId), LocalDate.parse(tripItemAddMsg.visitDate()).toString(), null
);

kafkaProducer.send(TRIP_ITEM, tripItemMsg);
kafkaProducer.send(PATH, tripPathMsg);
}
@Transactional
public void updateTripItemOrder(String tripId, TripItemOrderUpdateMsg orderUpdateMsg) {
// TODO : /sub/{tripId}/tripItems/{visitDate}
// TODO : /sub/{tripId}/path/{visitDate}
ChannelTopic tripItemTopic = redisChannelUtil.getChannelTopic(tripId, orderUpdateMsg.visitDate(), TRIP_ITEM);
ChannelTopic pathTopic = redisChannelUtil.getChannelTopic(tripId, orderUpdateMsg.visitDate(), PATH);
/*
비즈니스 로직
*/

TripItemMsg tripItemMsg = new TripItemMsg(
Long.parseLong(tripId), LocalDate.parse(orderUpdateMsg.visitDate()).toString(), null
);
TripPathMsg tripPathMsg = new TripPathMsg(
Long.parseLong(tripId), LocalDate.parse(orderUpdateMsg.visitDate()).toString(), null
);

kafkaProducer.send(TRIP_ITEM, tripItemMsg);
kafkaProducer.send(PATH, tripPathMsg);
}

@Transactional(readOnly = true)
public void connectMember(String tripId, MemberConnectMsg memberConnectMsg) {
// TODO: /sub/{tripId}/connectedMember
ChannelTopic memberTopic = redisChannelUtil.getChannelTopic(tripId, MEMBER);
/*
비즈니스 로직
*/
TripMemberMsg tripMemberMsg = new TripMemberMsg(
Long.parseLong(tripId), null
);

kafkaProducer.send(MEMBER, tripMemberMsg);
}
@Transactional(readOnly = true)
public void disconnectMember(String tripId, MemberDisconnectMsg memberDisconnectMsg) {
// TODO: /sub/{tripId}/connectedMember
ChannelTopic memberTopic = redisChannelUtil.getChannelTopic(tripId, MEMBER);
/*
비즈니스 로직
*/
TripMemberMsg tripMemberMsg = new TripMemberMsg(
Long.parseLong(tripId), null
);

kafkaProducer.send(MEMBER, tripMemberMsg);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.tenten.tentenstomp.global.common.constant;

public class EndPointConstant {
public class TopicConstant {
public static final String TRIP_INFO = "info";
public static final String TRIP_ITEM = "tripItems";
public static final String PATH = "path";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,53 @@

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.internals.Topic;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Service;
import org.tenten.tentenstomp.domain.trip.dto.request.TripUpdateMsg;
import org.tenten.tentenstomp.domain.trip.dto.response.TripInfoMsg;
import org.tenten.tentenstomp.domain.trip.dto.response.TripItemMsg;
import org.tenten.tentenstomp.domain.trip.dto.response.TripMemberMsg;
import org.tenten.tentenstomp.domain.trip.dto.response.TripPathMsg;
import org.tenten.tentenstomp.global.common.constant.TopicConstant;
import org.tenten.tentenstomp.global.response.GlobalStompResponse;
import org.tenten.tentenstomp.global.util.TopicUtil;

import java.time.LocalDate;

import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.tenten.tentenstomp.global.common.constant.TopicConstant.*;

@Service
@RequiredArgsConstructor
public class KafkaConsumer {

private final SimpMessageSendingOperations messagingTemplate;
private final TopicUtil topicUtil;

@KafkaListener(topics = "kafka", groupId = ConsumerConfig.GROUP_ID_CONFIG)
@KafkaListener(topics = "kafka", groupId = GROUP_ID_CONFIG)
public void consumeTest(TripUpdateMsg data) {
messagingTemplate.convertAndSend("/sub/kafka", data);
}

@KafkaListener(topics = TRIP_INFO, groupId = GROUP_ID_CONFIG)
public void updateTripInfo(TripInfoMsg tripInfoMsg) {
messagingTemplate.convertAndSend(topicUtil.topicToReturnEndPoint(tripInfoMsg.tripId(), TRIP_INFO), GlobalStompResponse.ok(tripInfoMsg));
}

@KafkaListener(topics = TRIP_ITEM, groupId = GROUP_ID_CONFIG)
public void updateTripItem(TripItemMsg tripItemMsg) {
messagingTemplate.convertAndSend(topicUtil.topicToReturnEndPoint(tripItemMsg.tripId(), TRIP_ITEM, LocalDate.parse(tripItemMsg.visitDate())), GlobalStompResponse.ok(tripItemMsg));
}

@KafkaListener(topics = PATH, groupId = GROUP_ID_CONFIG)
public void updateTripPath(TripPathMsg tripPathMsg) {
messagingTemplate.convertAndSend(topicUtil.topicToReturnEndPoint(tripPathMsg.tripId(), PATH, LocalDate.parse(tripPathMsg.visitDate())), GlobalStompResponse.ok(tripPathMsg));
}

@KafkaListener(topics = MEMBER, groupId = GROUP_ID_CONFIG)
public void updateConnectedTripMember(TripMemberMsg tripMemberMsg) {
messagingTemplate.convertAndSend(topicUtil.topicToReturnEndPoint(tripMemberMsg.tripId(), MEMBER), GlobalStompResponse.ok(tripMemberMsg));
}
}

This file was deleted.

18 changes: 18 additions & 0 deletions src/main/java/org/tenten/tentenstomp/global/util/TopicUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.tenten.tentenstomp.global.util;

import org.springframework.stereotype.Component;

import java.time.LocalDate;

@Component
public class TopicUtil {
private final String BASE_URL = "/sub";

public String topicToReturnEndPoint(Long tripId, String endPoint, LocalDate visitDate) {
return BASE_URL + "/" + tripId + "/" + endPoint + "/" + visitDate;
}

public String topicToReturnEndPoint(Long tripId, String endPoint) {
return BASE_URL + "/" + tripId + "/" + endPoint;
}
}

0 comments on commit fa850d5

Please sign in to comment.