diff --git a/README.md b/README.md index 99f2b864c..4056f2480 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ This repository houses user-friendly, cloud-ready benchmarking suites for the fo * [Pravega](https://pravega.io/) * [RabbitMQ](https://www.rabbitmq.com/) * [Redis](https://redis.com/) +* [MQTT](https://mqtt.org/) > More details could be found at the [official documentation](http://openmessaging.cloud/docs/benchmarks/). diff --git a/benchmark-framework/pom.xml b/benchmark-framework/pom.xml index 0b1bddca7..698a79c9a 100644 --- a/benchmark-framework/pom.xml +++ b/benchmark-framework/pom.xml @@ -106,6 +106,11 @@ driver-rocketmq ${project.version} + + ${project.groupId} + driver-rocketmq5 + ${project.version} + com.beust jcommander diff --git a/driver-rocketmq5/pom.xml b/driver-rocketmq5/pom.xml new file mode 100644 index 000000000..c3de982bf --- /dev/null +++ b/driver-rocketmq5/pom.xml @@ -0,0 +1,64 @@ + + + + 4.0.0 + + io.openmessaging.benchmark + messaging-benchmark + 0.0.1-SNAPSHOT + + + driver-rocketmq5 + + + 5.0.7 + 5.1.0 + 29.0-jre + + + + + ${project.groupId} + driver-api + ${project.version} + + + com.google.guava + guava + ${guava.version} + + + + org.apache.rocketmq + rocketmq-client-java + ${rocketmq-client-java-version} + + + org.apache.rocketmq + rocketmq-tools + ${rocketmq.version} + + + ch.qos.logback + logback-classic + + + + + + diff --git a/driver-rocketmq5/rocketmq.yaml b/driver-rocketmq5/rocketmq.yaml new file mode 100644 index 000000000..cda85a17a --- /dev/null +++ b/driver-rocketmq5/rocketmq.yaml @@ -0,0 +1,32 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: RocketMQ5 +driverClass: io.openmessaging.benchmark.driver.rocketmq.RocketMQ5BenchmarkDriver + +# The RocketMQ nameserver address +namesrvAddr: x.x.x.x:9876 +# The RocketMQ broker cluster name +clusterName: rocketmq-broker-xxxx +# The proxy address to connect for grpc client. +grpcEndpoint: 127.0.0.1:8081 +# 是否生产定时消息,否:生产普通消息,是:生产定时/延时消息且通过delayTimeInSec设置延迟时间 +sendDelayMsg: false +delayTimeInSec: 60 +# (Optional) The admin credential +adminAccessKey: xxxx +adminSecretKey: xxxx +# (Optional) The credential that clients connect to proxy. +accessKey: xxxxxx +secretKey: xxxxx diff --git a/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkConsumer.java b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkConsumer.java new file mode 100644 index 000000000..23d96cb05 --- /dev/null +++ b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkConsumer.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.benchmark.driver.rocketmq; + + +import io.openmessaging.benchmark.driver.BenchmarkConsumer; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; + +public class RocketMQ5BenchmarkConsumer implements BenchmarkConsumer { + private final PushConsumer rmqConsumer; + + public RocketMQ5BenchmarkConsumer(final PushConsumer rmqConsumer) { + this.rmqConsumer = rmqConsumer; + } + + @Override + public void close() throws Exception { + this.rmqConsumer.close(); + } +} diff --git a/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkDriver.java b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkDriver.java new file mode 100644 index 000000000..38dc85f23 --- /dev/null +++ b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkDriver.java @@ -0,0 +1,294 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.benchmark.driver.rocketmq; + + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.io.BaseEncoding; +import io.openmessaging.benchmark.driver.BenchmarkConsumer; +import io.openmessaging.benchmark.driver.BenchmarkDriver; +import io.openmessaging.benchmark.driver.BenchmarkProducer; +import io.openmessaging.benchmark.driver.ConsumerCallback; +import io.openmessaging.benchmark.driver.rocketmq.client.RocketMQClient5Config; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientConfigurationBuilder; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.SessionCredentialsProvider; +import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; +import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.shaded.commons.lang3.StringUtils; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RocketMQ5BenchmarkDriver implements BenchmarkDriver { + private RocketMQClient5Config rmqClientConfig; + private DefaultMQAdminExt rmqAdmin; + + @Override + public void initialize(final File configurationFile, final StatsLogger statsLogger) + throws IOException { + this.rmqClientConfig = readConfig(configurationFile); + if (isAdminAclEnabled()) { + AclClientRPCHook rpcHook = + new AclClientRPCHook( + new SessionCredentials( + this.rmqClientConfig.adminAccessKey, this.rmqClientConfig.adminSecretKey)); + this.rmqAdmin = new DefaultMQAdminExt(rpcHook); + } else { + this.rmqAdmin = new DefaultMQAdminExt(); + } + this.rmqAdmin.setNamesrvAddr(this.rmqClientConfig.namesrvAddr); + this.rmqAdmin.setInstanceName("AdminInstance-" + getRandomString()); + try { + this.rmqAdmin.start(); + } catch (MQClientException e) { + log.error("Start the RocketMQ admin tool failed."); + } + } + + Map> cachedBrokerAddr = new ConcurrentHashMap<>(); + + private synchronized Set fetchMasterAndSlaveAddrByClusterName( + final MQAdminExt adminExt, final String clusterName) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + MQBrokerException, InterruptedException { + Set brokerList = cachedBrokerAddr.get(clusterName); + if (brokerList == null) { + brokerList = + CommandUtil.fetchMasterAndSlaveAddrByClusterName( + adminExt, this.rmqClientConfig.clusterName); + cachedBrokerAddr.put(clusterName, brokerList); + if (brokerList.isEmpty()) { + throw new RuntimeException("get brokerAddr return null, clusterName: " + clusterName); + } + } + return brokerList; + } + + @Override + public String getTopicNamePrefix() { + return "Benchmark"; + } + + @Override + public CompletableFuture createTopic(final String topic, final int partitions) { + return CompletableFuture.runAsync( + () -> { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setOrder(false); + topicConfig.setPerm(6); + topicConfig.setReadQueueNums(partitions); + topicConfig.setWriteQueueNums(partitions); + topicConfig.setTopicName(topic); + Map properties = new HashMap<>(); + String topicType = + rmqClientConfig.sendDelayMsg + ? TopicMessageType.DELAY.getValue() + : TopicMessageType.NORMAL.getValue(); + properties.put("+message.type", topicType); + topicConfig.setAttributes(properties); + + try { + Set brokerList = + fetchMasterAndSlaveAddrByClusterName( + this.rmqAdmin, this.rmqClientConfig.clusterName); + topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size())); + topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size())); + + for (String brokerAddr : brokerList) { + this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig); + } + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to create topic [%s] to cluster [%s]", + topic, this.rmqClientConfig.clusterName), + e); + } + }); + } + + @Override + public CompletableFuture createProducer(final String topic) { + + ClientServiceProvider provider = ClientServiceProvider.loadService(); + ClientConfigurationBuilder builder = + ClientConfiguration.newBuilder().setEndpoints(this.rmqClientConfig.grpcEndpoint); + SessionCredentialsProvider sessionCredentialsProvider = + new StaticSessionCredentialsProvider( + this.rmqClientConfig.accessKey, this.rmqClientConfig.secretKey); + ClientConfiguration configuration; + if (isAclEnabled()) { + builder.setCredentialProvider(sessionCredentialsProvider); + configuration = builder.build(); + } else { + configuration = builder.build(); + } + + Producer rmqProducer; + try { + rmqProducer = + provider + .newProducerBuilder() + .setTopics(topic) + .setClientConfiguration(configuration) + .build(); + } catch (ClientException e) { + throw new RuntimeException(e); + } + if (this.rmqClientConfig.sendDelayMsg) { + return CompletableFuture.completedFuture( + new RocketMQ5BenchmarkProducer( + rmqProducer, topic, true, this.rmqClientConfig.delayTimeInSec)); + } else { + return CompletableFuture.completedFuture(new RocketMQ5BenchmarkProducer(rmqProducer, topic)); + } + } + + public void createSubscriptionGroup(String fullSubName) { + try { + Set brokerList = + fetchMasterAndSlaveAddrByClusterName(this.rmqAdmin, this.rmqClientConfig.clusterName); + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(fullSubName); + + for (String brokerAddr : brokerList) { + this.rmqAdmin.createAndUpdateSubscriptionGroupConfig(brokerAddr, subscriptionGroupConfig); + } + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to create subscription [%s] to cluster [%s]", + fullSubName, this.rmqClientConfig.clusterName), + e); + } + } + + @Override + public CompletableFuture createConsumer( + final String topic, final String subscriptionName, final ConsumerCallback consumerCallback) { + PushConsumer rmqConsumer; + + // To avoid bench-tool encounter subscription relationship conflict when specifying multiple + // topics, let's add topic name as subscription name prefix. + String subPrefix; + if (topic.contains("%")) { + subPrefix = topic.split("%")[1]; + } else { + subPrefix = topic; + } + String fullSubName = String.format("%s_%s", subPrefix, subscriptionName); + createSubscriptionGroup(fullSubName); + + ClientServiceProvider provider = ClientServiceProvider.loadService(); + ClientConfigurationBuilder builder = + ClientConfiguration.newBuilder().setEndpoints(this.rmqClientConfig.grpcEndpoint); + SessionCredentialsProvider sessionCredentialsProvider = + new StaticSessionCredentialsProvider( + this.rmqClientConfig.accessKey, this.rmqClientConfig.secretKey); + ClientConfiguration configuration; + if (isAclEnabled()) { + builder.setCredentialProvider(sessionCredentialsProvider); + configuration = builder.build(); + } else { + configuration = builder.build(); + } + FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG); + + try { + rmqConsumer = + provider + .newPushConsumerBuilder() + .setClientConfiguration(configuration) + // Set the consumer group name. + .setConsumerGroup(fullSubName) + // Set the subscription for the consumer. + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .setMessageListener( + messageView -> { + // Handle the received message and return consume result. + consumerCallback.messageReceived( + messageView.getBody(), messageView.getBornTimestamp()); + return ConsumeResult.SUCCESS; + }) + .build(); + } catch (ClientException e) { + throw new RuntimeException(e); + } + return CompletableFuture.completedFuture(new RocketMQ5BenchmarkConsumer(rmqConsumer)); + } + + public boolean isAclEnabled() { + return !(StringUtils.isAnyBlank(this.rmqClientConfig.accessKey, this.rmqClientConfig.secretKey) + || StringUtils.isAnyEmpty(this.rmqClientConfig.accessKey, this.rmqClientConfig.secretKey)); + } + + public boolean isAdminAclEnabled() { + return !(StringUtils.isAnyBlank( + this.rmqClientConfig.adminAccessKey, this.rmqClientConfig.adminSecretKey) + || StringUtils.isAnyEmpty( + this.rmqClientConfig.adminAccessKey, this.rmqClientConfig.adminSecretKey)); + } + + @Override + public void close() throws Exception {} + + private static final ObjectMapper mapper = + new ObjectMapper(new YAMLFactory()) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + private static RocketMQClient5Config readConfig(File configurationFile) throws IOException { + return mapper.readValue(configurationFile, RocketMQClient5Config.class); + } + + private static final Random random = new Random(); + + private static String getRandomString() { + byte[] buffer = new byte[5]; + random.nextBytes(buffer); + return BaseEncoding.base64Url().omitPadding().encode(buffer); + } + + private static final Logger log = LoggerFactory.getLogger(RocketMQ5BenchmarkDriver.class); +} diff --git a/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkProducer.java b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkProducer.java new file mode 100644 index 000000000..34bbd8a78 --- /dev/null +++ b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQ5BenchmarkProducer.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.benchmark.driver.rocketmq; + + +import io.openmessaging.benchmark.driver.BenchmarkProducer; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.client.apis.message.MessageBuilder; +import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.java.message.MessageBuilderImpl; + +public class RocketMQ5BenchmarkProducer implements BenchmarkProducer { + private final Producer rmqProducer; + private final String rmqTopic; + private final Boolean sendDelayMsg; + private final Long delayTimeInSec; + + public RocketMQ5BenchmarkProducer(final Producer rmqProducer, final String rmqTopic) { + this.rmqProducer = rmqProducer; + this.rmqTopic = rmqTopic; + this.sendDelayMsg = false; + this.delayTimeInSec = 0L; + } + + public RocketMQ5BenchmarkProducer( + final Producer rmqProducer, + final String rmqTopic, + Boolean sendDelayMsg, + Long delayTimeInSec) { + this.rmqProducer = rmqProducer; + this.rmqTopic = rmqTopic; + this.sendDelayMsg = sendDelayMsg; + this.delayTimeInSec = delayTimeInSec; + } + + @Override + public CompletableFuture sendAsync(final Optional key, final byte[] payload) { + MessageBuilder messageBuilder = new MessageBuilderImpl(); + messageBuilder.setBody(payload); + messageBuilder.setTopic(this.rmqTopic); + + key.ifPresent(messageBuilder::setKeys); + + if (this.sendDelayMsg) { + // 延时消息,单位秒(s),在指定延迟时间(当前时间之后)进行投递,例如消息在10秒后投递。 + long delayTime = System.currentTimeMillis() + this.delayTimeInSec * 1000; + // 设置消息需要被投递的时间。 + messageBuilder.setDeliveryTimestamp(delayTime); + } + + CompletableFuture future = new CompletableFuture<>(); + this.rmqProducer + .sendAsync(messageBuilder.build()) + .whenComplete( + (sendReceipt, throwable) -> { + if (sendReceipt != null) { + future.complete(null); + } else { + future.completeExceptionally(throwable); + } + }); + return future; + } + + @Override + public void close() throws Exception { + if (rmqProducer != null) { + rmqProducer.close(); + } + } +} diff --git a/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClient5Config.java b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClient5Config.java new file mode 100644 index 000000000..11c9ae5a4 --- /dev/null +++ b/driver-rocketmq5/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClient5Config.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.benchmark.driver.rocketmq.client; + +public class RocketMQClient5Config { + public String namesrvAddr; + public String grpcEndpoint; + public String clusterName; + public String adminAccessKey; + public String adminSecretKey; + public String accessKey; + public String secretKey; + public boolean sendDelayMsg; + public Long delayTimeInSec; +} diff --git a/pom.xml b/pom.xml index 892179283..e624f7633 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,7 @@ driver-artemis driver-bookkeeper driver-rocketmq + driver-rocketmq5 driver-nats driver-nats-streaming driver-nsq