Skip to content

Commit 26fbb5a

Browse files
committed
merged main
2 parents 3843912 + 9cb7a5b commit 26fbb5a

File tree

35 files changed

+455
-461
lines changed

35 files changed

+455
-461
lines changed

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class RqueueConfig {
4545

4646
@Getter
4747
private static final String brokerId = UUID.randomUUID().toString();
48+
4849
private static final AtomicLong counter = new AtomicLong(1);
4950
private final RedisConnectionFactory connectionFactory;
5051
private final ReactiveRedisConnectionFactory reactiveRedisConnectionFactory;

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@
3535
import com.github.sonus21.rqueue.dao.impl.RqueueStringDaoImpl;
3636
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
3737
import com.github.sonus21.rqueue.metrics.RqueueQueueMetrics;
38-
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistry;
39-
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistryImpl;
4038
import com.github.sonus21.rqueue.utils.RedisUtils;
4139
import com.github.sonus21.rqueue.utils.condition.MissingRqueueMessageIdGenerator;
4240
import com.github.sonus21.rqueue.utils.condition.ReactiveEnabled;
4341
import com.github.sonus21.rqueue.utils.pebble.ResourceLoader;
4442
import com.github.sonus21.rqueue.utils.pebble.RqueuePebbleExtension;
43+
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistry;
44+
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistryImpl;
4545
import io.pebbletemplates.pebble.PebbleEngine;
4646
import io.pebbletemplates.spring.extension.SpringExtension;
4747
import io.pebbletemplates.spring.reactive.PebbleReactiveViewResolver;

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueBeanProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
2525
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
2626
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
27-
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistry;
2827
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
28+
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistry;
2929
import lombok.Getter;
3030
import lombok.Setter;
3131
import org.springframework.beans.factory.annotation.Autowired;

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/BaseMessageSender.java

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,15 @@ abstract class BaseMessageSender {
5353
protected final MessageConverter messageConverter;
5454
protected final RqueueMessageTemplate messageTemplate;
5555
protected final RqueueMessageIdGenerator messageIdGenerator;
56-
@Autowired protected RqueueStringDao rqueueStringDao;
57-
@Autowired protected RqueueConfig rqueueConfig;
58-
@Autowired protected RqueueMessageMetadataService rqueueMessageMetadataService;
56+
57+
@Autowired
58+
protected RqueueStringDao rqueueStringDao;
59+
60+
@Autowired
61+
protected RqueueConfig rqueueConfig;
62+
63+
@Autowired
64+
protected RqueueMessageMetadataService rqueueMessageMetadataService;
5965

6066
BaseMessageSender(
6167
RqueueMessageTemplate messageTemplate,
@@ -118,16 +124,15 @@ protected String pushMessage(
118124
Long delayInMilliSecs,
119125
boolean isUnique) {
120126
QueueDetail queueDetail = EndpointRegistry.get(queueName);
121-
RqueueMessage rqueueMessage =
122-
buildMessage(
123-
messageIdGenerator,
124-
messageConverter,
125-
queueName,
126-
messageId,
127-
message,
128-
retryCount,
129-
delayInMilliSecs,
130-
messageHeaders);
127+
RqueueMessage rqueueMessage = buildMessage(
128+
messageIdGenerator,
129+
messageConverter,
130+
queueName,
131+
messageId,
132+
message,
133+
retryCount,
134+
delayInMilliSecs,
135+
messageHeaders);
131136
try {
132137
storeMessageMetadata(rqueueMessage, delayInMilliSecs, false, isUnique);
133138
enqueue(queueDetail, rqueueMessage, delayInMilliSecs, false);
@@ -147,16 +152,15 @@ protected String pushMessage(
147152
protected String pushPeriodicMessage(
148153
String queueName, String messageId, Object message, long periodInMilliSeconds) {
149154
QueueDetail queueDetail = EndpointRegistry.get(queueName);
150-
RqueueMessage rqueueMessage =
151-
buildPeriodicMessage(
152-
messageIdGenerator,
153-
messageConverter,
154-
queueName,
155-
messageId,
156-
message,
157-
null,
158-
periodInMilliSeconds,
159-
messageHeaders);
155+
RqueueMessage rqueueMessage = buildPeriodicMessage(
156+
messageIdGenerator,
157+
messageConverter,
158+
queueName,
159+
messageId,
160+
message,
161+
null,
162+
periodInMilliSeconds,
163+
messageHeaders);
160164
try {
161165
storeMessageMetadata(rqueueMessage, periodInMilliSeconds, false, false);
162166
enqueue(queueDetail, rqueueMessage, periodInMilliSeconds, false);

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/ReactiveRqueueMessageEnqueuerImpl.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,7 @@ public ReactiveRqueueMessageEnqueuerImpl(
4242
RqueueMessageTemplate messageTemplate,
4343
MessageConverter messageConverter,
4444
MessageHeaders messageHeaders) {
45-
this(
46-
messageTemplate,
47-
messageConverter,
48-
messageHeaders,
49-
new UuidV4RqueueMessageIdGenerator());
45+
this(messageTemplate, messageConverter, messageHeaders, new UuidV4RqueueMessageIdGenerator());
5046
}
5147

5248
public ReactiveRqueueMessageEnqueuerImpl(
@@ -68,16 +64,15 @@ private <T> Mono<T> pushReactiveMessage(
6864
boolean isUnique,
6965
Function<RqueueMessage, Mono<T>> monoConverter) {
7066
QueueDetail queueDetail = EndpointRegistry.get(queueName);
71-
RqueueMessage rqueueMessage =
72-
builder.build(
73-
messageIdGenerator,
74-
messageConverter,
75-
queueName,
76-
messageId,
77-
message,
78-
retryCount,
79-
delayInMilliSecs,
80-
messageHeaders);
67+
RqueueMessage rqueueMessage = builder.build(
68+
messageIdGenerator,
69+
messageConverter,
70+
queueName,
71+
messageId,
72+
message,
73+
retryCount,
74+
delayInMilliSecs,
75+
messageHeaders);
8176
try {
8277
Mono<Boolean> storeResult =
8378
(Mono<Boolean>) storeMessageMetadata(rqueueMessage, delayInMilliSecs, true, isUnique);

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueEndpointManagerImpl.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,7 @@ public RqueueEndpointManagerImpl(
5252
RqueueMessageTemplate messageTemplate,
5353
MessageConverter messageConverter,
5454
MessageHeaders messageHeaders) {
55-
this(
56-
messageTemplate,
57-
messageConverter,
58-
messageHeaders,
59-
new UuidV4RqueueMessageIdGenerator());
55+
this(messageTemplate, messageConverter, messageHeaders, new UuidV4RqueueMessageIdGenerator());
6056
}
6157

6258
public RqueueEndpointManagerImpl(

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageEnqueuerImpl.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,7 @@ public RqueueMessageEnqueuerImpl(
4040
RqueueMessageTemplate messageTemplate,
4141
MessageConverter messageConverter,
4242
MessageHeaders messageHeaders) {
43-
this(
44-
messageTemplate,
45-
messageConverter,
46-
messageHeaders,
47-
new UuidV4RqueueMessageIdGenerator());
43+
this(messageTemplate, messageConverter, messageHeaders, new UuidV4RqueueMessageIdGenerator());
4844
}
4945

5046
public RqueueMessageEnqueuerImpl(

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageManagerImpl.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,7 @@ public RqueueMessageManagerImpl(
5151
RqueueMessageTemplate messageTemplate,
5252
MessageConverter messageConverter,
5353
MessageHeaders messageHeaders) {
54-
this(
55-
messageTemplate,
56-
messageConverter,
57-
messageHeaders,
58-
new UuidV4RqueueMessageIdGenerator());
54+
this(messageTemplate, messageConverter, messageHeaders, new UuidV4RqueueMessageIdGenerator());
5955
}
6056

6157
public RqueueMessageManagerImpl(

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/support/RqueueMessageUtils.java

Lines changed: 21 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,14 @@ public static RqueueMessage buildPeriodicMessage(
7272
} else {
7373
throw new MessageConversionException("Message payload is neither String nor byte[]");
7474
}
75-
RqueueMessage rqueueMessage =
76-
RqueueMessage.builder()
77-
.id(messageIdGenerator.generate())
78-
.queueName(queueName)
79-
.message(strMessage)
80-
.processAt(processAt)
81-
.retryCount(retryCount)
82-
.period(period)
83-
.build();
75+
RqueueMessage rqueueMessage = RqueueMessage.builder()
76+
.id(messageIdGenerator.generate())
77+
.queueName(queueName)
78+
.message(strMessage)
79+
.processAt(processAt)
80+
.retryCount(retryCount)
81+
.period(period)
82+
.build();
8483
if (messageId != null) {
8584
rqueueMessage.setId(messageId);
8685
}
@@ -114,15 +113,14 @@ public static RqueueMessage buildMessage(
114113
} else {
115114
throw new MessageConversionException("Message payload is neither String nor byte[]");
116115
}
117-
RqueueMessage rqueueMessage =
118-
RqueueMessage.builder()
119-
.retryCount(retryCount)
120-
.queuedTime(queuedTime)
121-
.id(messageIdGenerator.generate())
122-
.queueName(queueName)
123-
.message(strMessage)
124-
.processAt(processAt)
125-
.build();
116+
RqueueMessage rqueueMessage = RqueueMessage.builder()
117+
.retryCount(retryCount)
118+
.queuedTime(queuedTime)
119+
.id(messageIdGenerator.generate())
120+
.queueName(queueName)
121+
.message(strMessage)
122+
.processAt(processAt)
123+
.build();
126124
if (messageId != null) {
127125
rqueueMessage.setId(messageId);
128126
}
@@ -135,27 +133,13 @@ public static List<RqueueMessage> generateMessages(
135133
String queueName,
136134
int count) {
137135
return generateMessages(
138-
messageIdGenerator,
139-
converter,
140-
messageIdGenerator.generate(),
141-
queueName,
142-
null,
143-
null,
144-
count);
136+
messageIdGenerator, converter, messageIdGenerator.generate(), queueName, null, null, count);
145137
}
146138

147139
public static RqueueMessage generateMessage(
148-
RqueueMessageIdGenerator messageIdGenerator,
149-
MessageConverter converter,
150-
String queueName) {
140+
RqueueMessageIdGenerator messageIdGenerator, MessageConverter converter, String queueName) {
151141
return generateMessages(
152-
messageIdGenerator,
153-
converter,
154-
messageIdGenerator.generate(),
155-
queueName,
156-
null,
157-
null,
158-
1)
142+
messageIdGenerator, converter, messageIdGenerator.generate(), queueName, null, null, 1)
159143
.get(0);
160144
}
161145

@@ -185,16 +169,8 @@ public static List<RqueueMessage> generateMessages(
185169
int count) {
186170
List<RqueueMessage> messages = new ArrayList<>();
187171
for (int i = 0; i < count; i++) {
188-
messages.add(
189-
buildMessage(
190-
messageIdGenerator,
191-
converter,
192-
queueName,
193-
null,
194-
object,
195-
retryCount,
196-
delay,
197-
null));
172+
messages.add(buildMessage(
173+
messageIdGenerator, converter, queueName, null, object, retryCount, delay, null));
198174
}
199175
return messages;
200176
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,9 @@ protected boolean hasAvailableThreads(QueueDetail queueDetail, QueueThreadPool q
105105

106106
protected void recordCapacityExhausted(QueueDetail queueDetail, QueueThreadPool queueThreadPool) {
107107
if (rqueueBeanProvider.getRqueueWorkerRegistry() != null) {
108-
rqueueBeanProvider.getRqueueWorkerRegistry().recordQueueCapacityExhausted(
109-
queueDetail, queueThreadPool);
108+
rqueueBeanProvider
109+
.getRqueueWorkerRegistry()
110+
.recordQueueCapacityExhausted(queueDetail, queueThreadPool);
110111
}
111112
}
112113

0 commit comments

Comments
 (0)