Skip to content

Commit e1a5c10

Browse files
committed
feat&bug-fix: implemented message buffer fixed bugs, change id type to String and improve the logic for history handling
1 parent 3694301 commit e1a5c10

File tree

10 files changed

+60
-48
lines changed

10 files changed

+60
-48
lines changed

.github/workflows/java-ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ jobs:
3030
REDIS_HOST: ${{ secrets.REDIS_HOST }}
3131
REDIS_PORT: ${{ secrets.REDIS_PORT }}
3232
REDIS_PASSWORD: ${{ secrets.REDIS_PASSWORD }}
33+
KAFKA_SERVER: ${{ secrets.KAFKA_SERVER }}
3334

3435
steps:
3536
- name: Checkout Repository

src/main/java/com/connect/controller/ChatUserController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.connect.dto.MessageDTO;
44
import com.connect.dto.UserDTO;
5-
import com.connect.kafka.KafkaPublisherService;
65
import com.connect.model.Message;
76
import com.connect.model.User;
87
import com.connect.service.ChatUserService;
@@ -56,6 +55,7 @@ public void handleHistory(SimpMessageHeaderAccessor headerAccessor) {
5655
log.info("Handling the History of chats for the Room: {}", roomId);
5756
Optional<List<Message>> messages = chatUserService.chatHistoryHandler(roomId);
5857
messages.ifPresent(msgs -> {
58+
System.out.println(messages.get().size());
5959
List<MessageDTO> dtoList = msgs.stream()
6060
.map(MessageDTO::new)
6161
.collect(Collectors.toList());

src/main/java/com/connect/dto/RoomDTO.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import lombok.NoArgsConstructor;
77
import lombok.Setter;
88

9+
import java.time.LocalDateTime;
10+
911
@Getter
1012
@Setter
1113
@NoArgsConstructor
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,25 @@
11
package com.connect.kafka;
22

3+
import com.connect.buffer.MessageBuffer;
34
import com.connect.model.Message;
45
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.beans.factory.annotation.Autowired;
57
import org.springframework.kafka.annotation.KafkaListener;
68
import org.springframework.stereotype.Service;
79

810
@Service
911
@Slf4j
1012
public class KafkaListenerService {
1113

14+
@Autowired
15+
private MessageBuffer messageBuffer;
16+
1217
@KafkaListener(topics = "chat", groupId = "chat-group")
1318
public void consumeEvent(Message message) {
1419
// logic message is being consumed.
1520
log.info("Consumed message: {}", message.toString());
21+
// Here we have to save the message to a shared buffer ok, and we need to save the data to the database.
22+
messageBuffer.addMessage(message);
1623
}
1724

1825
}

src/main/java/com/connect/kafka/KafkaPublisherService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class KafkaPublisherService {
1717
private KafkaTemplate<String, Object> kafkaTemplate;
1818

1919
public void sendEvent(Message message) {
20+
System.out.println(message.toString());
2021
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send("chat", message);
2122
future.whenComplete((result, exception) -> {
2223
if (exception == null) {

src/main/java/com/connect/model/Message.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ public class Message {
1818
@Id
1919
private ObjectId id;
2020

21-
private ObjectId roomId;
21+
private String roomId;
2222
private String sender;
2323
private String message;
2424
private LocalDateTime timeStamp;
2525

26-
public Message(ObjectId roomId, String sender, String message) {
26+
public Message(String roomId, String sender, String message) {
2727
this.roomId = roomId;
2828
this.sender = sender;
2929
this.message = message;

src/main/java/com/connect/model/Room.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import lombok.Getter;
55
import lombok.NoArgsConstructor;
66
import lombok.Setter;
7-
import org.bson.types.ObjectId;
87
import org.springframework.data.annotation.Id;
98
import org.springframework.data.mongodb.core.mapping.DBRef;
109
import org.springframework.data.mongodb.core.mapping.Document;
@@ -19,7 +18,7 @@
1918
@Document(collection = "rooms")
2019
public class Room {
2120
@Id
22-
private ObjectId roomId;
21+
private String roomId;
2322
private String roomName;
2423
private String roomDescription;
2524
private LocalDateTime timeStamp;
@@ -32,16 +31,16 @@ public class Room {
3231
@DBRef
3332
private List<User> allUsers; // The key is the username.
3433

35-
public Room(String roomName, String roomDescription) {
34+
public Room(String roomName, String roomDescription, LocalDateTime timeStamp) {
3635
this.roomName = roomName;
3736
this.roomDescription = roomDescription;
38-
this.timeStamp = LocalDateTime.now();
37+
this.timeStamp = timeStamp;
3938
}
4039

41-
public Room(String roomName, String roomDescription, User admin) {
40+
public Room(String roomName, String roomDescription, User admin, LocalDateTime timeStamp) {
4241
this.roomName = roomName;
4342
this.roomDescription = roomDescription;
44-
this.timeStamp = LocalDateTime.now();
43+
this.timeStamp = timeStamp;
4544
this.admin = admin;
4645
}
4746
}

src/main/java/com/connect/repository/RoomRepository.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public Optional<List<Room>> getRooms() {
3333
return Optional.of(mongoTemplate.find(query, Room.class));
3434
}
3535

36-
public Optional<Room> findRoomByID(ObjectId roomId) {
36+
public Optional<Room> findRoomByID(String roomId) {
3737
Query query = new Query();
3838
query.addCriteria(Criteria.where("roomId").is(roomId));
3939
return Optional.ofNullable(mongoTemplate.findOne(query, Room.class));
@@ -49,10 +49,16 @@ public Optional<Message> addMessageToRoom(Message message) {
4949
return Optional.ofNullable(mongoTemplate.insert(message));
5050
}
5151

52-
public Optional<List<Message>> getRoomSpecificMessages(ObjectId roomId) {
52+
public Optional<List<Message>> addMessages(List<Message> messageList) {
53+
return Optional.of((List<Message>) mongoTemplate.insert(messageList, Message.class));
54+
}
55+
56+
public Optional<List<Message>> getRoomSpecificMessages(String roomId) {
5357
Query query = new Query();
5458
query.addCriteria(Criteria.where("roomId").is(roomId));
55-
return Optional.of(mongoTemplate.find(query, Message.class));
59+
List<Message> messageList = mongoTemplate.find(query, Message.class);
60+
System.out.println(messageList.size());
61+
return Optional.of(messageList);
5662
}
5763

5864
}

src/main/java/com/connect/service/ChatUserService.java

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.connect.service;
22

3+
import com.connect.buffer.MessageBuffer;
34
import com.connect.dto.MessageDTO;
45
import com.connect.enums.UserStatus;
56
import com.connect.kafka.KafkaPublisherService;
@@ -37,6 +38,9 @@ public class ChatUserService {
3738
@Autowired
3839
private KafkaPublisherService kafkaPublisherService;
3940

41+
@Autowired
42+
private MessageBuffer messageBuffer;
43+
4044
private Map<String, User> users = new ConcurrentHashMap<>();
4145

4246
public void greetingHandler(String token) {
@@ -61,16 +65,14 @@ public void joiningRequestHandler(String username, String roomId) {
6165
return;
6266
}
6367

64-
ObjectId roomID = new ObjectId(roomId);
65-
6668
// Checking the Room exists from the RoomId
67-
Optional<Room> requestedRoom = roomRepository.findRoomByID(roomID);
69+
Optional<Room> requestedRoom = roomRepository.findRoomByID(roomId);
6870
if (requestedRoom.isEmpty()) {
6971
log.error("No room exists");
7072
return;
7173
}
7274
// If Room exists and the User is valid we have to update the room data.
73-
roomService.addUserToRoom(requiredUser, roomID);
75+
roomService.addUserToRoom(requiredUser, roomId);
7476
}
7577

7678
public void publishMessageToKafka(MessageDTO messageDTO, String roomId) {
@@ -80,39 +82,41 @@ public void publishMessageToKafka(MessageDTO messageDTO, String roomId) {
8082
} else if (messageDTO.getMessage().isEmpty() || roomId.isEmpty()) {
8183
log.error("Empty Data occurred");
8284
return;
85+
} else if (!ObjectId.isValid(roomId)) {
86+
log.error("Room ID is not valid");
87+
return;
8388
}
84-
ObjectId roomID = new ObjectId(roomId);
85-
// We are assuming that the roomID is not null so.
89+
// Room ID is valid and not null.
8690
Message message = Message.builder()
87-
.roomId(roomID)
91+
.roomId(roomId)
8892
.sender(messageDTO.getSender())
8993
.message(messageDTO.getMessage())
94+
.timeStamp(messageDTO.getTimeStamp())
9095
.build();
9196

92-
kafkaPublisherService.sendEvent(message);
93-
}
94-
95-
public Optional<Message> addMessagetoRoom(MessageDTO messageDTO, String roomId) {
96-
if (messageDTO == null) {
97-
log.error("Message DTO is null");
98-
return Optional.empty();
99-
} else if (messageDTO.getMessage().isEmpty() || roomId.isEmpty()) {
100-
log.error("Empty Data occurred");
101-
return Optional.empty();
102-
}
103-
104-
ObjectId roomID = new ObjectId(roomId);
97+
System.out.println(message.toString());
10598

106-
return roomService.addMessage(messageDTO, roomID);
99+
kafkaPublisherService.sendEvent(message);
107100
}
108101

109102
public Optional<List<Message>> chatHistoryHandler(String roomId) {
110103
if (roomId.isEmpty()) {
111104
log.error("Room ID is null in the chat history handler");
112105
return Optional.empty();
113106
}
114-
ObjectId roomID = new ObjectId(roomId);
115-
return roomRepository.getRoomSpecificMessages(roomID);
107+
108+
// Here we have to check the roomSpecific message is in the buffer or not.
109+
if (messageBuffer.size() != 0) {
110+
// Here we need to filter out the room specific messages if present in the buffer.
111+
List<Message> messageList = messageBuffer.getMessages().stream()
112+
.filter(message -> message.getRoomId().equals(roomId))
113+
.toList();
114+
if (!messageList.isEmpty()) {
115+
return Optional.of(messageList);
116+
}
117+
}
118+
// If the buffer is empty we have to fetch the messages from the db.
119+
return roomRepository.getRoomSpecificMessages(roomId);
116120
}
117121

118122
public Optional<List<User>> fetchUser() {

src/main/java/com/connect/service/RoomService.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,15 @@
11
package com.connect.service;
22

3-
import com.connect.dto.MessageDTO;
4-
import com.connect.enums.UserRole;
5-
import com.connect.model.Message;
63
import com.connect.model.Room;
74
import com.connect.model.User;
85
import com.connect.repository.RoomRepository;
96
import com.connect.repository.UserRepository;
107
import jakarta.annotation.PostConstruct;
118
import lombok.extern.slf4j.Slf4j;
12-
import org.bson.types.ObjectId;
139
import org.springframework.beans.factory.annotation.Autowired;
1410
import org.springframework.stereotype.Service;
1511

16-
import java.security.Principal;
12+
import java.time.LocalDateTime;
1713
import java.util.List;
1814
import java.util.Optional;
1915

@@ -36,7 +32,8 @@ public void init() {
3632
// Have to create the first room.
3733
Room generalRoom = new Room(
3834
ROOM_NAME,
39-
"A public space for all users to chat, ask questions, and share updates."
35+
"A public space for all users to chat, ask questions, and share updates.",
36+
LocalDateTime.now()
4037
);
4138
Optional<Room> createdRoom = roomRepository.addRoom(generalRoom);
4239
if (createdRoom.isEmpty()) {
@@ -48,21 +45,16 @@ public void init() {
4845
}
4946
}
5047

51-
public void addUserToRoom(User user, ObjectId roomId) {
48+
public void addUserToRoom(User user, String roomId) {
5249
Optional<Room> room = roomRepository.findRoomByID(roomId);
5350
if (room.isEmpty()) {
5451
log.error("No room exists failed to add user to the room");
5552
return;
5653
}
5754

5855
}
59-
60-
public Optional<Message> addMessage(MessageDTO messageDTO, ObjectId roomId) {
61-
Message message = new Message(roomId, messageDTO.getSender(), messageDTO.getMessage());
62-
return roomRepository.addMessageToRoom(message);
63-
}
6456

65-
public boolean isRoomExists(ObjectId roomId) {
57+
public boolean isRoomExists(String roomId) {
6658
return roomRepository.findRoomByID(roomId).isPresent();
6759
}
6860

0 commit comments

Comments
 (0)