Skip to content

Commit 178641e

Browse files
authored
Periodic Job and Serialization Changes (#50)
Periodic Job and Serialization Changes
1 parent dc1fb3b commit 178641e

File tree

39 files changed

+1452
-257
lines changed

39 files changed

+1452
-257
lines changed

README.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
<h1 style="float:left">Rqueue: Redis Queue,Task Queue, Delayed Queue for Spring and Spring Boot</h1>
44
</div>
55

6-
[![Build Status](https://travis-ci.org/sonus21/rqueue.svg?branch=master)](https://travis-ci.org/sonus21/rqueue)
6+
[![Build Status](https://circleci.com/gh/sonus21/rqueue/tree/master.svg?style=shield)](https://circleci.com/gh/sonus21/rqueue/tree/master)
77
[![Coverage Status](https://coveralls.io/repos/github/sonus21/rqueue/badge.svg?branch=master)](https://coveralls.io/github/sonus21/rqueue?branch=master)
88
[![Maven Central](https://img.shields.io/maven-central/v/com.github.sonus21/rqueue-core)](https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core)
9-
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE)
109
[![Javadoc](https://javadoc.io/badge2/com.github.sonus21/rqueue-core/javadoc.svg)](https://javadoc.io/doc/com.github.sonus21/rqueue-core)
10+
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE)
1111

1212
**Rqueue** is an asynchronous task executor(worker) built for spring framework based on the spring framework's messaging library backed by Redis. It can be used as message broker as well, where all services code is in Spring.
1313

@@ -18,7 +18,7 @@
1818
* **Message delivery**: It's guaranteed that a message is consumed **at least once**. (Message would be consumed by a worker more than once due to the failure in the underlying worker/restart-process etc, otherwise exactly one delivery)
1919
* **Redis cluster** : Redis cluster can be used with driver.
2020
* **Metrics** : In flight messages, waiting for consumption and delayed messages
21-
* **Web interface**: a web interface to manage a queue and queue insights including latency
21+
* **Web Dashboard**: a web dashboard to manage a queue and queue insights including latency
2222
* **Automatic message serialization and deserialization**
2323
* **Concurrency**: Concurrency of any queue can be configured
2424
* **Queue Priority** :
@@ -28,6 +28,7 @@
2828
* **Callbacks** : Callbacks for dead letter queue, discard etc
2929
* **Events** 1. Bootstrap event 2. Task execution event.
3030
* **Unique message** : Unique message processing for a queue based on the message id
31+
* **Periodic message** : Process same message at certain interval
3132
* **Redis connection**: A different redis setup can be used for Rqueue
3233

3334
## Getting Started
@@ -117,6 +118,12 @@ public class MessageService {
117118
public void sendSms(Sms sms, SmsPriority priority){
118119
rqueueMessageEnqueuer.enqueueWithPriority("sms-queue", priority.value(), sms);
119120
}
121+
122+
// enqueue periodic job, email should be sent every 30 seconds
123+
public void sendPeriodicEmail(Email email){
124+
rqueueMessageEnqueuer.enqueuePeriodic("email-queue", invoice, 30_000);
125+
}
126+
120127
}
121128
```
122129

rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/MessageListener.java

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.github.sonus21.rqueue.test.dto.FeedGeneration;
2626
import com.github.sonus21.rqueue.test.dto.Job;
2727
import com.github.sonus21.rqueue.test.dto.Notification;
28+
import com.github.sonus21.rqueue.test.dto.PeriodicJob;
2829
import com.github.sonus21.rqueue.test.dto.Reservation;
2930
import com.github.sonus21.rqueue.test.dto.ReservationRequest;
3031
import com.github.sonus21.rqueue.test.dto.Sms;
@@ -36,6 +37,7 @@
3637
import lombok.RequiredArgsConstructor;
3738
import lombok.extern.slf4j.Slf4j;
3839
import org.springframework.beans.factory.annotation.Autowired;
40+
import org.springframework.beans.factory.annotation.Value;
3941
import org.springframework.messaging.handler.annotation.Header;
4042
import org.springframework.messaging.handler.annotation.Payload;
4143
import org.springframework.stereotype.Component;
@@ -49,13 +51,46 @@ public class MessageListener {
4951
@NonNull private ConsumedMessageService consumedMessageService;
5052
@NonNull private FailureManager failureManager;
5153

54+
@Value("${job.queue.name}")
55+
private String jobQueue;
56+
57+
@Value("${notification.queue.name}")
58+
private String notificationQueueName;
59+
60+
@Value("${email.queue.name}")
61+
private String emailQueue;
62+
63+
@Value("${sms.queue}")
64+
private String smsQueue;
65+
66+
@Value("${chat.indexing.queue}")
67+
private String chatIndexingQueue;
68+
69+
@Value("${feed.generation.queue}")
70+
private String feedGenerationQueue;
71+
72+
@Value("${reservation.queue}")
73+
private String reservationQueue;
74+
75+
@Value("${reservation.request.queue.name}")
76+
private String reservationRequestQueue;
77+
78+
@Value("${reservation.request.dead.letter.queue.name}")
79+
private String reservationRequestDeadLetterQueue;
80+
81+
@Value("${list.email.queue.name}")
82+
private String listEmailQueue;
83+
84+
@Value("${periodic.job.queue.name}")
85+
private String periodicJobQueue;
86+
5287
@RqueueListener(value = "${job.queue.name}", active = "${job.queue.active}")
5388
public void onMessage(Job job) throws Exception {
5489
log.info("Job: {}", job);
5590
if (failureManager.shouldFail(job.getId())) {
5691
throw new Exception("Failing job task to be retried" + job);
5792
}
58-
consumedMessageService.save(job, null);
93+
consumedMessageService.save(job, null, jobQueue);
5994
}
6095

6196
@RqueueListener(
@@ -69,7 +104,7 @@ public void onMessage(
69104
if (failureManager.shouldFail(notification.getId())) {
70105
throw new Exception("Failing notification task to be retried" + notification);
71106
}
72-
consumedMessageService.save(notification, null);
107+
consumedMessageService.save(notification, null, notificationQueueName);
73108
}
74109

75110
@RqueueListener(
@@ -84,7 +119,7 @@ public void onMessage(Email email, @Header(RqueueMessageHeaders.MESSAGE) RqueueM
84119
if (failureManager.shouldFail(email.getId())) {
85120
throw new Exception("Failing email task to be retried" + email);
86121
}
87-
consumedMessageService.save(email, null);
122+
consumedMessageService.save(email, null, emailQueue);
88123
}
89124

90125
@RqueueListener(
@@ -98,7 +133,7 @@ public void onMessage(Sms sms) throws Exception {
98133
if (failureManager.shouldFail(sms.getId())) {
99134
throw new Exception("Failing sms task to be retried" + sms);
100135
}
101-
consumedMessageService.save(sms, null);
136+
consumedMessageService.save(sms, null, smsQueue);
102137
}
103138

104139
@RqueueListener(
@@ -112,7 +147,7 @@ public void onMessage(ChatIndexing chatIndexing) throws Exception {
112147
if (failureManager.shouldFail(chatIndexing.getId())) {
113148
throw new Exception("Failing chat indexing task to be retried" + chatIndexing);
114149
}
115-
consumedMessageService.save(chatIndexing, null);
150+
consumedMessageService.save(chatIndexing, null, chatIndexingQueue);
116151
}
117152

118153
@RqueueListener(
@@ -126,7 +161,7 @@ public void onMessage(FeedGeneration feedGeneration) throws Exception {
126161
if (failureManager.shouldFail(feedGeneration.getId())) {
127162
throw new Exception("Failing feedGeneration task to be retried" + feedGeneration);
128163
}
129-
consumedMessageService.save(feedGeneration, null);
164+
consumedMessageService.save(feedGeneration, null, feedGenerationQueue);
130165
}
131166

132167
@RqueueListener(
@@ -140,7 +175,7 @@ public void onMessage(Reservation reservation) throws Exception {
140175
if (failureManager.shouldFail(reservation.getId())) {
141176
throw new Exception("Failing reservation task to be retried" + reservation);
142177
}
143-
consumedMessageService.save(reservation, null);
178+
consumedMessageService.save(reservation, null, reservationQueue);
144179
}
145180

146181
@RqueueListener(
@@ -154,7 +189,7 @@ public void onMessageReservationRequest(ReservationRequest request) throws Excep
154189
if (failureManager.shouldFail(request.getId())) {
155190
throw new Exception("Failing reservation request task to be retried" + request);
156191
}
157-
consumedMessageService.save(request, null);
192+
consumedMessageService.save(request, null, reservationRequestQueue);
158193
}
159194

160195
@RqueueListener(
@@ -164,15 +199,29 @@ public void onMessageReservationRequest(ReservationRequest request) throws Excep
164199
public void onMessageReservationRequestDeadLetterQueue(ReservationRequest request)
165200
throws Exception {
166201
log.info("ReservationRequest Dead Letter Queue{}", request);
167-
consumedMessageService.save(request, "reservation-request-dlq");
202+
consumedMessageService.save(
203+
request, "reservation-request-dlq", reservationRequestDeadLetterQueue);
168204
}
169205

170206
@RqueueListener(value = "${list.email.queue.name}", active = "${list.email.queue.enabled}")
171207
public void onMessageEmailList(List<Email> emailList) throws JsonProcessingException {
172208
log.info("onMessageEmailList {}", emailList);
173209
String consumedId = UUID.randomUUID().toString();
174210
for (Email email : emailList) {
175-
consumedMessageService.save(email, consumedId);
211+
consumedMessageService.save(email, consumedId, listEmailQueue);
212+
}
213+
}
214+
215+
@RqueueListener(
216+
value = "${periodic.job.queue.name}",
217+
active = "${periodic.job.queue.active}",
218+
deadLetterQueue = "${periodic.job.dead.letter.queue.name}",
219+
numRetries = "${periodic.job.queue.retry.count}")
220+
public void onPeriodicJob(PeriodicJob periodicJob) throws Exception {
221+
log.info("onPeriodicJob: {}", periodicJob);
222+
if (failureManager.shouldFail(periodicJob.getId())) {
223+
throw new Exception("Failing PeriodicJob task to be retried" + periodicJob);
176224
}
225+
consumedMessageService.save(periodicJob, UUID.randomUUID().toString(), periodicJobQueue);
177226
}
178227
}

rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/application/ApplicationBasicConfiguration.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.Executors;
2727
import javax.sql.DataSource;
2828
import lombok.AllArgsConstructor;
29+
import lombok.extern.slf4j.Slf4j;
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
3132
import org.springframework.beans.factory.annotation.Value;
@@ -37,6 +38,7 @@
3738
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
3839
import redis.embedded.RedisServer;
3940

41+
@Slf4j
4042
public abstract class ApplicationBasicConfiguration {
4143
private static final Logger monitorLogger = LoggerFactory.getLogger("monitor");
4244
protected RedisServer redisServer;
@@ -58,11 +60,20 @@ public abstract class ApplicationBasicConfiguration {
5860
@Value("${monitor.thread.count:0}")
5961
protected int monitorThreads;
6062

63+
@Value("${monitor.enabled:false}")
64+
protected boolean monitoringEnabled;
65+
6166
protected void init() {
67+
if (monitoringEnabled && monitorThreads == 0) {
68+
monitorThreads = 1;
69+
}
6270
if (monitorThreads > 0) {
6371
executorService = Executors.newFixedThreadPool(monitorThreads);
6472
processes = new ArrayList<>();
6573
}
74+
if (monitoringEnabled) {
75+
monitor(redisHost, redisPort);
76+
}
6677
if (useSystemRedis) {
6778
return;
6879
}
@@ -92,6 +103,7 @@ protected void destroy() {
92103
}
93104

94105
protected void monitor(String host, int port) {
106+
log.info("Monitor {}:{}", host, port);
95107
executorService.submit(
96108
() -> {
97109
try {

rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/common/SpringTestBase.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
3030
import com.github.sonus21.rqueue.listener.QueueDetail;
3131
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
32+
import com.github.sonus21.rqueue.test.entity.ConsumedMessage;
3233
import com.github.sonus21.rqueue.test.service.ConsumedMessageService;
3334
import com.github.sonus21.rqueue.test.service.FailureManager;
3435
import com.github.sonus21.rqueue.utils.StringUtils;
@@ -101,10 +102,13 @@ public abstract class SpringTestBase extends TestBase {
101102
@Value("${list.email.queue.name}")
102103
protected String listEmailQueue;
103104

105+
@Value("${periodic.job.queue.name}")
106+
protected String periodicJobQueue;
107+
104108
protected void enqueue(Object message, String queueName) {
105109
RqueueMessage rqueueMessage =
106110
RqueueMessageUtils.buildMessage(
107-
rqueueMessageManager.getMessageConverter(), message, queueName, null, null, null);
111+
rqueueMessageManager.getMessageConverter(), queueName, message, null, null, null);
108112
rqueueMessageTemplate.addMessage(queueName, rqueueMessage);
109113
}
110114

@@ -113,7 +117,7 @@ protected void enqueue(String queueName, Factory factory, int n) {
113117
Object object = factory.next(i);
114118
RqueueMessage rqueueMessage =
115119
RqueueMessageUtils.buildMessage(
116-
rqueueMessageManager.getMessageConverter(), object, queueName, null, null, null);
120+
rqueueMessageManager.getMessageConverter(), queueName, object, null, null, null);
117121
rqueueMessageTemplate.addMessage(queueName, rqueueMessage);
118122
}
119123
}
@@ -124,15 +128,15 @@ protected void enqueueIn(String zsetName, Factory factory, Delay delay, int n) {
124128
long score = delay.getDelay(i);
125129
RqueueMessage rqueueMessage =
126130
RqueueMessageUtils.buildMessage(
127-
rqueueMessageManager.getMessageConverter(), object, zsetName, null, score, null);
131+
rqueueMessageManager.getMessageConverter(), zsetName, object, null, score, null);
128132
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage, rqueueMessage.getProcessAt());
129133
}
130134
}
131135

132136
protected void enqueueIn(Object message, String zsetName, long delay) {
133137
RqueueMessage rqueueMessage =
134138
RqueueMessageUtils.buildMessage(
135-
rqueueMessageManager.getMessageConverter(), message, zsetName, null, delay, null);
139+
rqueueMessageManager.getMessageConverter(), zsetName, message, null, delay, null);
136140
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage, rqueueMessage.getProcessAt());
137141
}
138142

@@ -185,6 +189,13 @@ protected void printQueueStats(String queueName) {
185189
printQueueStats(Collections.singletonList(queueName));
186190
}
187191

192+
protected void printConsumedMessage(String queueName) {
193+
for (ConsumedMessage consumedMessage :
194+
consumedMessageService.getConsumedMessagesForQueue(queueName)) {
195+
log.info("Queue {} Msg: {}", queueName, consumedMessage);
196+
}
197+
}
198+
188199
protected void cleanQueue(String queue) {
189200
QueueDetail queueDetail = EndpointRegistry.get(queue);
190201
stringRqueueRedisTemplate.delete(queueDetail.getQueueName());
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2020 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.github.sonus21.rqueue.test.dto;
18+
19+
import java.util.UUID;
20+
import lombok.Data;
21+
import lombok.EqualsAndHashCode;
22+
import lombok.NoArgsConstructor;
23+
import org.apache.commons.lang3.RandomStringUtils;
24+
25+
@Data
26+
@NoArgsConstructor
27+
@EqualsAndHashCode(callSuper = true)
28+
public class PeriodicJob extends BaseQueueMessage {
29+
private String jobName;
30+
31+
public static PeriodicJob newInstance() {
32+
PeriodicJob job = new PeriodicJob();
33+
job.setId(UUID.randomUUID().toString());
34+
job.setJobName(RandomStringUtils.randomAlphabetic(10));
35+
return job;
36+
}
37+
}

0 commit comments

Comments
 (0)