-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathClientHandler.java
More file actions
59 lines (54 loc) · 2.47 KB
/
ClientHandler.java
File metadata and controls
59 lines (54 loc) · 2.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.List;
public class ClientHandler extends Thread {
private final Socket clientSocket;
private final Broker broker;
public ClientHandler(Socket socket, Broker broker) {
this.clientSocket = socket;
this.broker = broker;
}
public void run() {
try (BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
String[] request = inputLine.split(" ", 4);
String action = request[0];
String topicName = request[1];
String response = null;
switch (action.toLowerCase()) {
case "create":
int partitions = Integer.parseInt(request[2]);
response = broker.createTopic(topicName, partitions);
break;
case "publish":
int partitionIndex = Integer.parseInt(request[2]);
String message = request[3];
response = broker.publishMessage(topicName, partitionIndex, message);
break;
case "consume":
partitionIndex = Integer.parseInt(request[2]);
long offset = Long.parseLong(request[3]);
List<Message> messages = broker.consumeMessages(topicName, partitionIndex, offset);
StringBuilder builder = new StringBuilder();
for (Message msg : messages) {
builder.append("Offset: ").append(msg.getOffset())
.append(", Message: ").append(msg.getContent()).append("\n");
}
response = builder.toString();
break;
default:
response = "Unknown command.";
}
out.println(response);
}
} catch (IOException e) {
System.out.println("Exception caught when trying to listen on port or listening for a connection.");
System.out.println(e.getMessage());
}
}
}