diff --git a/README.md b/README.md
index 99f2b864c..4056f2480 100644
--- a/README.md
+++ b/README.md
@@ -23,6 +23,7 @@ This repository houses user-friendly, cloud-ready benchmarking suites for the fo
* [Pravega](https://pravega.io/)
* [RabbitMQ](https://www.rabbitmq.com/)
* [Redis](https://redis.com/)
+* [MQTT](https://mqtt.org/)
> More details could be found at the [official documentation](http://openmessaging.cloud/docs/benchmarks/).
diff --git a/benchmark-framework/pom.xml b/benchmark-framework/pom.xml
index 0b1bddca7..698a79c9a 100644
--- a/benchmark-framework/pom.xml
+++ b/benchmark-framework/pom.xml
@@ -106,6 +106,11 @@
driver-rocketmq
${project.version}
+
+ ${project.groupId}
+ driver-rocketmq5
+ ${project.version}
+
com.beust
jcommander
diff --git a/driver-rocketmq5/pom.xml b/driver-rocketmq5/pom.xml
new file mode 100644
index 000000000..c3de982bf
--- /dev/null
+++ b/driver-rocketmq5/pom.xml
@@ -0,0 +1,64 @@
+
+
+
+ 4.0.0
+
+ io.openmessaging.benchmark
+ messaging-benchmark
+ 0.0.1-SNAPSHOT
+
+
+ driver-rocketmq5
+
+
+ 5.0.7
+ 5.1.0
+ 29.0-jre
+
+
+
+
+ ${project.groupId}
+ driver-api
+ ${project.version}
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+
+
+
+ org.apache.rocketmq
+ rocketmq-client-java
+ ${rocketmq-client-java-version}
+
+
+ org.apache.rocketmq
+ rocketmq-tools
+ ${rocketmq.version}
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+
+
+
diff --git a/driver-rocketmq5/rocketmq.yaml b/driver-rocketmq5/rocketmq.yaml
new file mode 100644
index 000000000..cda85a17a
--- /dev/null
+++ b/driver-rocketmq5/rocketmq.yaml
@@ -0,0 +1,32 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: RocketMQ5
+driverClass: io.openmessaging.benchmark.driver.rocketmq.RocketMQ5BenchmarkDriver
+
+# The RocketMQ nameserver address
+namesrvAddr: x.x.x.x:9876
+# The RocketMQ broker cluster name
+clusterName: rocketmq-broker-xxxx
+# The proxy address to connect for grpc client.
+grpcEndpoint: 127.0.0.1:8081
+# 是否生产定时消息,否:生产普通消息,是:生产定时/延时消息且通过delayTimeInSec设置延迟时间
+sendDelayMsg: false
+delayTimeInSec: 60
+# (Optional) The admin credential
+adminAccessKey: xxxx
+adminSecretKey: xxxx
+# (Optional) The credential that clients connect to proxy.
+accessKey: xxxxxx
+secretKey: xxxxx
diff --git a/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkConsumer.java b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkConsumer.java
new file mode 100644
index 000000000..23d96cb05
--- /dev/null
+++ b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkConsumer.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.benchmark.driver.rocketmq;
+
+
+import io.openmessaging.benchmark.driver.BenchmarkConsumer;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+
+public class RocketMQ5BenchmarkConsumer implements BenchmarkConsumer {
+ private final PushConsumer rmqConsumer;
+
+ public RocketMQ5BenchmarkConsumer(final PushConsumer rmqConsumer) {
+ this.rmqConsumer = rmqConsumer;
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.rmqConsumer.close();
+ }
+}
diff --git a/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkDriver.java b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkDriver.java
new file mode 100644
index 000000000..38dc85f23
--- /dev/null
+++ b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkDriver.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.benchmark.driver.rocketmq;
+
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.io.BaseEncoding;
+import io.openmessaging.benchmark.driver.BenchmarkConsumer;
+import io.openmessaging.benchmark.driver.BenchmarkDriver;
+import io.openmessaging.benchmark.driver.BenchmarkProducer;
+import io.openmessaging.benchmark.driver.ConsumerCallback;
+import io.openmessaging.benchmark.driver.rocketmq.client.RocketMQClient5Config;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.shaded.commons.lang3.StringUtils;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQ5BenchmarkDriver implements BenchmarkDriver {
+ private RocketMQClient5Config rmqClientConfig;
+ private DefaultMQAdminExt rmqAdmin;
+
+ @Override
+ public void initialize(final File configurationFile, final StatsLogger statsLogger)
+ throws IOException {
+ this.rmqClientConfig = readConfig(configurationFile);
+ if (isAdminAclEnabled()) {
+ AclClientRPCHook rpcHook =
+ new AclClientRPCHook(
+ new SessionCredentials(
+ this.rmqClientConfig.adminAccessKey, this.rmqClientConfig.adminSecretKey));
+ this.rmqAdmin = new DefaultMQAdminExt(rpcHook);
+ } else {
+ this.rmqAdmin = new DefaultMQAdminExt();
+ }
+ this.rmqAdmin.setNamesrvAddr(this.rmqClientConfig.namesrvAddr);
+ this.rmqAdmin.setInstanceName("AdminInstance-" + getRandomString());
+ try {
+ this.rmqAdmin.start();
+ } catch (MQClientException e) {
+ log.error("Start the RocketMQ admin tool failed.");
+ }
+ }
+
+ Map> cachedBrokerAddr = new ConcurrentHashMap<>();
+
+ private synchronized Set fetchMasterAndSlaveAddrByClusterName(
+ final MQAdminExt adminExt, final String clusterName)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+ MQBrokerException, InterruptedException {
+ Set brokerList = cachedBrokerAddr.get(clusterName);
+ if (brokerList == null) {
+ brokerList =
+ CommandUtil.fetchMasterAndSlaveAddrByClusterName(
+ adminExt, this.rmqClientConfig.clusterName);
+ cachedBrokerAddr.put(clusterName, brokerList);
+ if (brokerList.isEmpty()) {
+ throw new RuntimeException("get brokerAddr return null, clusterName: " + clusterName);
+ }
+ }
+ return brokerList;
+ }
+
+ @Override
+ public String getTopicNamePrefix() {
+ return "Benchmark";
+ }
+
+ @Override
+ public CompletableFuture createTopic(final String topic, final int partitions) {
+ return CompletableFuture.runAsync(
+ () -> {
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setOrder(false);
+ topicConfig.setPerm(6);
+ topicConfig.setReadQueueNums(partitions);
+ topicConfig.setWriteQueueNums(partitions);
+ topicConfig.setTopicName(topic);
+ Map properties = new HashMap<>();
+ String topicType =
+ rmqClientConfig.sendDelayMsg
+ ? TopicMessageType.DELAY.getValue()
+ : TopicMessageType.NORMAL.getValue();
+ properties.put("+message.type", topicType);
+ topicConfig.setAttributes(properties);
+
+ try {
+ Set brokerList =
+ fetchMasterAndSlaveAddrByClusterName(
+ this.rmqAdmin, this.rmqClientConfig.clusterName);
+ topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size()));
+ topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size()));
+
+ for (String brokerAddr : brokerList) {
+ this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to create topic [%s] to cluster [%s]",
+ topic, this.rmqClientConfig.clusterName),
+ e);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture createProducer(final String topic) {
+
+ ClientServiceProvider provider = ClientServiceProvider.loadService();
+ ClientConfigurationBuilder builder =
+ ClientConfiguration.newBuilder().setEndpoints(this.rmqClientConfig.grpcEndpoint);
+ SessionCredentialsProvider sessionCredentialsProvider =
+ new StaticSessionCredentialsProvider(
+ this.rmqClientConfig.accessKey, this.rmqClientConfig.secretKey);
+ ClientConfiguration configuration;
+ if (isAclEnabled()) {
+ builder.setCredentialProvider(sessionCredentialsProvider);
+ configuration = builder.build();
+ } else {
+ configuration = builder.build();
+ }
+
+ Producer rmqProducer;
+ try {
+ rmqProducer =
+ provider
+ .newProducerBuilder()
+ .setTopics(topic)
+ .setClientConfiguration(configuration)
+ .build();
+ } catch (ClientException e) {
+ throw new RuntimeException(e);
+ }
+ if (this.rmqClientConfig.sendDelayMsg) {
+ return CompletableFuture.completedFuture(
+ new RocketMQ5BenchmarkProducer(
+ rmqProducer, topic, true, this.rmqClientConfig.delayTimeInSec));
+ } else {
+ return CompletableFuture.completedFuture(new RocketMQ5BenchmarkProducer(rmqProducer, topic));
+ }
+ }
+
+ public void createSubscriptionGroup(String fullSubName) {
+ try {
+ Set brokerList =
+ fetchMasterAndSlaveAddrByClusterName(this.rmqAdmin, this.rmqClientConfig.clusterName);
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(fullSubName);
+
+ for (String brokerAddr : brokerList) {
+ this.rmqAdmin.createAndUpdateSubscriptionGroupConfig(brokerAddr, subscriptionGroupConfig);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to create subscription [%s] to cluster [%s]",
+ fullSubName, this.rmqClientConfig.clusterName),
+ e);
+ }
+ }
+
+ @Override
+ public CompletableFuture createConsumer(
+ final String topic, final String subscriptionName, final ConsumerCallback consumerCallback) {
+ PushConsumer rmqConsumer;
+
+ // To avoid bench-tool encounter subscription relationship conflict when specifying multiple
+ // topics, let's add topic name as subscription name prefix.
+ String subPrefix;
+ if (topic.contains("%")) {
+ subPrefix = topic.split("%")[1];
+ } else {
+ subPrefix = topic;
+ }
+ String fullSubName = String.format("%s_%s", subPrefix, subscriptionName);
+ createSubscriptionGroup(fullSubName);
+
+ ClientServiceProvider provider = ClientServiceProvider.loadService();
+ ClientConfigurationBuilder builder =
+ ClientConfiguration.newBuilder().setEndpoints(this.rmqClientConfig.grpcEndpoint);
+ SessionCredentialsProvider sessionCredentialsProvider =
+ new StaticSessionCredentialsProvider(
+ this.rmqClientConfig.accessKey, this.rmqClientConfig.secretKey);
+ ClientConfiguration configuration;
+ if (isAclEnabled()) {
+ builder.setCredentialProvider(sessionCredentialsProvider);
+ configuration = builder.build();
+ } else {
+ configuration = builder.build();
+ }
+ FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
+
+ try {
+ rmqConsumer =
+ provider
+ .newPushConsumerBuilder()
+ .setClientConfiguration(configuration)
+ // Set the consumer group name.
+ .setConsumerGroup(fullSubName)
+ // Set the subscription for the consumer.
+ .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
+ .setMessageListener(
+ messageView -> {
+ // Handle the received message and return consume result.
+ consumerCallback.messageReceived(
+ messageView.getBody(), messageView.getBornTimestamp());
+ return ConsumeResult.SUCCESS;
+ })
+ .build();
+ } catch (ClientException e) {
+ throw new RuntimeException(e);
+ }
+ return CompletableFuture.completedFuture(new RocketMQ5BenchmarkConsumer(rmqConsumer));
+ }
+
+ public boolean isAclEnabled() {
+ return !(StringUtils.isAnyBlank(this.rmqClientConfig.accessKey, this.rmqClientConfig.secretKey)
+ || StringUtils.isAnyEmpty(this.rmqClientConfig.accessKey, this.rmqClientConfig.secretKey));
+ }
+
+ public boolean isAdminAclEnabled() {
+ return !(StringUtils.isAnyBlank(
+ this.rmqClientConfig.adminAccessKey, this.rmqClientConfig.adminSecretKey)
+ || StringUtils.isAnyEmpty(
+ this.rmqClientConfig.adminAccessKey, this.rmqClientConfig.adminSecretKey));
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ private static final ObjectMapper mapper =
+ new ObjectMapper(new YAMLFactory())
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ private static RocketMQClient5Config readConfig(File configurationFile) throws IOException {
+ return mapper.readValue(configurationFile, RocketMQClient5Config.class);
+ }
+
+ private static final Random random = new Random();
+
+ private static String getRandomString() {
+ byte[] buffer = new byte[5];
+ random.nextBytes(buffer);
+ return BaseEncoding.base64Url().omitPadding().encode(buffer);
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(RocketMQ5BenchmarkDriver.class);
+}
diff --git a/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkProducer.java b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkProducer.java
new file mode 100644
index 000000000..34bbd8a78
--- /dev/null
+++ b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkProducer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.benchmark.driver.rocketmq;
+
+
+import io.openmessaging.benchmark.driver.BenchmarkProducer;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.client.apis.message.MessageBuilder;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
+
+public class RocketMQ5BenchmarkProducer implements BenchmarkProducer {
+ private final Producer rmqProducer;
+ private final String rmqTopic;
+ private final Boolean sendDelayMsg;
+ private final Long delayTimeInSec;
+
+ public RocketMQ5BenchmarkProducer(final Producer rmqProducer, final String rmqTopic) {
+ this.rmqProducer = rmqProducer;
+ this.rmqTopic = rmqTopic;
+ this.sendDelayMsg = false;
+ this.delayTimeInSec = 0L;
+ }
+
+ public RocketMQ5BenchmarkProducer(
+ final Producer rmqProducer,
+ final String rmqTopic,
+ Boolean sendDelayMsg,
+ Long delayTimeInSec) {
+ this.rmqProducer = rmqProducer;
+ this.rmqTopic = rmqTopic;
+ this.sendDelayMsg = sendDelayMsg;
+ this.delayTimeInSec = delayTimeInSec;
+ }
+
+ @Override
+ public CompletableFuture sendAsync(final Optional key, final byte[] payload) {
+ MessageBuilder messageBuilder = new MessageBuilderImpl();
+ messageBuilder.setBody(payload);
+ messageBuilder.setTopic(this.rmqTopic);
+
+ key.ifPresent(messageBuilder::setKeys);
+
+ if (this.sendDelayMsg) {
+ // 延时消息,单位秒(s),在指定延迟时间(当前时间之后)进行投递,例如消息在10秒后投递。
+ long delayTime = System.currentTimeMillis() + this.delayTimeInSec * 1000;
+ // 设置消息需要被投递的时间。
+ messageBuilder.setDeliveryTimestamp(delayTime);
+ }
+
+ CompletableFuture future = new CompletableFuture<>();
+ this.rmqProducer
+ .sendAsync(messageBuilder.build())
+ .whenComplete(
+ (sendReceipt, throwable) -> {
+ if (sendReceipt != null) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(throwable);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (rmqProducer != null) {
+ rmqProducer.close();
+ }
+ }
+}
diff --git a/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClient5Config.java b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClient5Config.java
new file mode 100644
index 000000000..11c9ae5a4
--- /dev/null
+++ b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClient5Config.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.benchmark.driver.rocketmq.client;
+
+public class RocketMQClient5Config {
+ public String namesrvAddr;
+ public String grpcEndpoint;
+ public String clusterName;
+ public String adminAccessKey;
+ public String adminSecretKey;
+ public String accessKey;
+ public String secretKey;
+ public boolean sendDelayMsg;
+ public Long delayTimeInSec;
+}
diff --git a/pom.xml b/pom.xml
index 892179283..e624f7633 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@
driver-artemis
driver-bookkeeper
driver-rocketmq
+ driver-rocketmq5
driver-nats
driver-nats-streaming
driver-nsq