|
1.引入依赖
- org.apache.rocketmq
- rocketmq-client
- 4.3.2
-
-
- org.apache.rocketmq
- rocketmq-spring-boot-starter
- 2.0.3
-
复制代码 2.Producerpublic class MyProducer {
public static void main(String[] args) throws Exception{
// 构造Producer时,必须指定groupId
DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
//指定NameServer的地址 只用namesrv的地址就行,它会从namesrv上拿到broker的地址和topic信息
producer.setNamesrvAddr("localhost:9876");
//启动生产者
producer.start();
int num = 0;
while (num < 20) {
num++;
/**
* rocketmq封装了Message
* String topic,
* String tags, 标签(分类)---> 筛选
* byte[] body
*/
Message message = new Message("my_test_topic", "", ("hello rocketmq:" + num).getBytes());
//同步发送 发送消息,拿到返回SendResult
SendResult result = producer.send(message);
System.out.println(result);
}
//关闭生产者
producer.shutdown();
}
}
- [indent]public class MyProducer {
复制代码 启动并发送消成功后,返回的SendResult如下:
【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)
SendResult中,有一个sendStatus状态,表示消息的发送状态。一共有四种状态
FLUSH_DISK_TIMEOUT : 表示没有在规定时间内完成刷盘(需要Broker 的刷盘策Ill创立设置成 SYNC_FLUSH 才会报这个错误)
FLUSH_SLAVE_TIMEOUT :表示在主备方式下,并且Broker 被设置成SYNC_MASTER 方式,没有在设定时间内完成主从同步。
SLAVE_NOT_AVAILABLE : 这个状态产生的场景和FLUSH_SLAVE_TIMEOUT 类似, 表示在主备方式下,并且Broker 被设置成SYNC_MASTER ,但是没有找到被配置成Slave 的Broker 。
SEND OK :表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave 上?消息在Slave 上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND OK
【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)
3.Consumer
- public class MyConsumer {
- public static void main(String[] args) throws MQClientException {
- // 构造Consumer时,必须指定groupId
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
- consumer.setNamesrvAddr("localhost:9876"); // nameServer地址,用于获取broker、topic信息
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- // 指定订阅的主题与tag,通过tag可以定制性消费(*表示全部tag)
- consumer.subscribe("my_test_topic", "*");
-
- // 异步消费
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List msgs,
- ConsumeConcurrentlyContext context) {
- // System.out.println("Receive Message:" + msgs.toString());
- // 1 try catch(throwable)确保不会因为业务逻辑的异常,导致消息出现重复消费的现象
- // 2 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest.run()中会对Throwable进行捕获,
- // 并且返回ConsumeConcurrentlyStatus.RECONSUME_LATER
- try {
- for(MessageExt msg:msgs){
- String msgbody = new String(msg.getBody(), "utf-8");
- System.out.println(" MessageBody: "+ msgbody);//输出消息内容
- }
- } catch (Exception e) {
- e.printStackTrace();
- return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
- }
-
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 签收
- }
- });
- //启动消费者
- consumer.start();
- System.out.println("消费者启动成功。。。");
- }
- }
复制代码 收到消息的内容:
【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)
consumer Group:位于同一个consumer Group中的consumer实例
和producer Group中的各个produer实例承担的角色类似
同一个group中可以配置多个consumer,可以提高消费端的并发消费能力以及容灾
和kafka一样,多个consumer会对消息做负载均衡,意味着同一个topic下的不同messageQueue会分发给同一个group中的不同consumer。
同时,如果我们希望消息能够达到广播的目的,那么只需要把consumer加入到不同的group就行。
【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)
|
|