一、环境说明
ip地址 | 主机名 | 操作系统版本 | RocketMQ版本 | JDK版本 | maven版本 | 备注 |
---|---|---|---|---|---|---|
172.16.7.91 | nameserver01 | centos 7.6 | 4.8.0 | 1.8.0_291 | 3.6 | Name Server集群 |
172.16.7.92 | nameserver03 | centos 7.6 | 4.8.0 | 1.8.0_291 | 3.6 | Name Server集群 |
172.16.7.93 | master01 | centos 7.6 | 4.8.0 | 1.8.0_291 | 3.6 | Broker集群1 |
172.16.7.94 | slave01 | centos 7.6 | 4.8.0 | 1.8.0_291 | 3.6 | Broker集群1 |
172.16.7.95 | master02 | centos 7.6 | 4.8.0 | 1.8.0_291 | 3.6 | Broker集群2 |
172.16.7.96 | slave02 | centos 7.6 | 4.8.0 | 1.8.0_291 | 3.6 | Broker集群2 |
二、部署概况
三、创建Maven Project
1.新建Maven project
选择Maven Project
配置目录
选择原型
自定义group id和artifact id,完成maven project的创建。
2.导入依赖库
修改pom.xml,加入如下代码
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
会发现多了很多依赖包
四、生产者测试
1.测试前集群查看
启动各节点服务,查看集群状态
测试前无消息生产和消费
2.新建topic
2.1新增主题topic_test_123
主题配置如下:
集群名为MyRocketmq,BROKER_NAME两个broker都选择
2.2查看新增的主题
4.新建订阅组
4.1新建订阅组group_test_123
配置如下:
4.2查看新建的订阅组
5.新建类Producer
新建类Producer
生产者消息发送代码:
package com.my.maven.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("group_test_123");
// Specify name server addresses.
producer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");
producer.setRetryTimesWhenSendAsyncFailed(2);
//Launch the instance.
producer.start();
for (int i = ; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("topic_test_123" /* Topic */,
"TagA" /* Tag */,
("Message Test" +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
生产者配置项 retryTimesWhenSendAsyncFailed 表示异步重试的次数,默认为 2 次,加上正常发送的1次,总共有3次发送机会。
发送消息Message Test0--Message Test99,共100条消息。
6.运行报错
运行Produce发送消息时报错,如图:
解决:
由于测试是在本地电脑虚机上进行的,同时开多个虚机和eclipse应用会占用很多内存,解决办法是进入eclipse的安装目录,修改文件eclipse.ini,将参数-Xms和-Xmx改小点即可。
7.运行Produce
8.发送消息状态查看
8.1集群查看
可以看到broker-a和broker-b各产生了50条消息
8.2消息查看
消息详情:
8.3消费者查看
此时还未消费
五、消费者测试
1.新建类Consumer
消费代码:
package com.my.maven.rocketmq;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws InterruptedException,
MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"group_test_123");
consumer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");
consumer.subscribe("topic_test_123", "TagA || TagB");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs);
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("topic_test_123")) {
if (msg.getTags() != null && msg.getTags().equals("TagA")) {
// 获取消息体
String message = new String(msg.getBody());
System.out.println("receive TagA message:" + message);
} else if (msg.getTags() != null
&& msg.getTags().equals("TagB")) {
// 获取消息体
String message = new String(msg.getBody());
System.out.println("receive TagB message:" + message);
}
}
// 成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
2.运行Consumer
3.消费消息状态查看
3.1消费者查看
3.2查看消费详情
3.3集群查看
3.4消息详情查看
发现消息已被消费
4消费者console日志
一共100条消息被消费
来自:https://mp.weixin.qq.com/s/UInB84ileUDxcdPmtrFMiA