天气与日历 切换到窄版

 找回密码
 立即注册

QQ登录

只需一步,快速开始

限时开通VIP永久会员,可免费下载所有附件
查看: 244|回复: 0

[小程序源码] RocketMq——windows环境搭建与入门

[复制链接]

3188

主题

4

回帖

3290

积分

管理员

积分
3290
发表于 2024-4-7 20:33:03 | 显示全部楼层 |阅读模式
站长测试环境通过:
windows10 64位 企业版;rocketmq(v4.4.0);jdk-8u152-windows-x64;

RocketMq——windows环境搭建与入门


一、准备工作

运行当前最新版本的RockitMQ(v4.4.0),必须先安装64bit的JDK1.8或以上版本。

从RockitMQ官网 http://rocketmq.apache.org/release_notes/ 下载最新的release包。

将rocketmq复制到D:\\usr\\rocketmq下面。

二、RocketMQ基本结构

在动手开发之前,我们需要了解一下RocketMQ的基本结构。


RocketMQ基本结构

如上图所示,一个正常工作的RocketMQ包括四个部分。

NameServer :基于高可用设计产生的,用于服务发现和路由。正式应用时通常采用集群部署。

Broker:实现队列机制,负责消息存储和转发。正式应用时也采用集群部署。

Producer:消息生产者,生成消息并发送到RocketMQ中,生产者通常是我们自己实现的应用程序。

Consumer:消息消费者,从RocketMQ中接收消息并进行业务处理。这部分也通常是我们自己实现的。

三、Windows环境下启动最小应用

从上面的图可以了解到,RocketMQ自身分为 NameServer 和 Broker 两个部分,因此,用作本机开发调试用的最小应用,应该分别启动一个NameServer和一个Broker节点。

RocketMQ默认提供了 windows环境 和 linux环境 下的启动脚本。脚本位于bin目录下,windows的脚本以.cmd为文件名后缀,linux环境的脚本以.sh为文件名后缀。

不过,通常情况下,windows下的脚本双击启动时,都是窗口一闪而过,启动失败。下面的内容就帮大家解决这些问题。

第一步,配置 JAVA_HOME 和 ROCKETMQ_HOME 环境变量

JAVA_HOME 的配置如下:

JAVA_HOME    D:\\usr\\java\\jdk1.8.152

CLASSPATH     .;%JAVA_HOME%\\lib;%JAVA_HOME%\\lib\\tools.jar

path               %JAVA_HOME%/bin;%JAVA_HOME%/jre/bin;


ROCKETMQ_HOME 应指向解压后的Readme.md文件所在目录。

如上面的第二张图,我的 ROCKETMQ_HOME 应配置为D:\\usr\\rocketmq

NAMESRV_ADDR   localhost:9876


第二步,启动 NameServer

NameServer的启动脚本是bin目录下的mqnamesrv.cmd。

上文讲过,即使配置好了ROCKETMQ_HOME环境变量,mqnamesrv.cmd的启动通常也以失败告终。

阅读mqnamesrv.cmd脚本,发现其实际上是调用了runserver.cmd脚本来实现启动的动作。

而在runserver.cmd脚本,java的默认启动参数中,启动时堆内存的大小为2g,老旧一点的机器上根本没有这么多空闲内存。

因此,用编辑器修改一下runserver.cmd脚本。将原来的内存参数注释掉(cmd脚本使用rem关键字),修改为:

  1. rem set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
  2. set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m"
复制代码

直接双击mqnamesrv.cmd脚本启动NameServer。


NameServer启动显示

看到 The Name Server boot success 字样,表示NameServer己启动成功。

windows环境下,可以在目录%USERPROFILE%\\logs\\rocketmqlogs下找到NameServer的启动日志。文件名为namesrv.log。


第三步,启动 Broker

Broker的启动脚本是mqbroker.cmd。

与mqnamesrv.cmd脚本类似,mqbroker.cmd是调用runbroker.cmd脚本启动Broker的。

同样的,优化一下runbroker.cmd的启动内存

  1. rem set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g"
  2. set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m"
复制代码

双击mqbroker.cmd脚本启动Broker。

Broker启动成功

看到 The broker ... boot success 字样,表示Broker己启动成功。

与NameServer类似,可以在目录%USERPROFILE%\\logs\\rocketmqlogs下找到Broker的启动日志。文件名为broker.log。


NameServer与Broker修改日志保存路径:

修改上述三个文件,在<configuration>下面添加<property name=\"LOG_HOME\" value=\"D:/usr/rocketmq\" />,然后将${user.home}替换为${LOG_HOME},即可更改日志路径

四、验证RocketMQ功能

RocketMQ自带了恬送与接收消息的脚本tools.cmd,用来验证RocketMQ的功能是否正常。

tool.cmd脚本需要带参数执行,无法用简单的双击方式启动。因此,我们打开一个cmd窗口,并跳转到bin目录下。

打开cmd窗口并跳转到bin目录下

启动消费者

与mqbroker.cmd脚本类似,启动tool.cmd命令之前我们要指定NameServer地址。

这里我们采用命令方式指定,并启动消费者。依次执行如下命令:

  1. tools.cmd org.apache.rocketmq.example.quickstart.Consumer
复制代码

启动消费者成功

启动生产者

再打开一个cmd窗口,依次执行如下命令:

  1. tools.cmd org.apache.rocketmq.example.quickstart.Producer
复制代码

生产者启动命令

启动成功后,生产者会发送1000个消息,然后自动退出。

生产者发送消息并退出

此时,在消费者界面就会收到刚刚生产者发出的消息。

消费者接收消息

至此,RocketMQ最小应用己经可以正常工作,能满足我们开发环境下调试代码的需求。
五、java代码实现生产者:

  1. package com.suirui.mq.rocket;

  2. import org.apache.rocketmq.client.exception.MQClientException;
  3. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  4. import org.apache.rocketmq.client.producer.SendResult;
  5. import org.apache.rocketmq.common.message.Message;
  6. import org.apache.rocketmq.remoting.common.RemotingHelper;

  7. /**
  8. * @Author zongx
  9. * @Date 2020/5/7 17:08
  10. * @Version 1.0
  11. */
  12. public class Producer {
  13.     public static void main(String[] args) throws MQClientException, InterruptedException {

  14.         //以group名字创建一个producer
  15.         DefaultMQProducer producer = new DefaultMQProducer("producer1");

  16.         //设置NameServer地址
  17.         producer.setNamesrvAddr("localhost:9876");
  18.         //防止connect to <ip:xxx> failed  rocketmq默认开启了vip通道
  19.         producer.setVipChannelEnabled(false);

  20.         //为避免程序启动的时候报错,添加此代码,可以让rocketMq自动创建topickey
  21.         producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");

  22.         //启动生产者
  23.         producer.start();

  24.         for(int i = 0; i < 10; i++){
  25.             try {
  26.                 /*
  27.                 * 1、topic
  28.                 * Message都有Topic这一属性,Producer发送指定Topic的消息,Consumer订阅Topic下的消息。
  29.                 * 通过Topic字段,Producer会获取消息投递的路由信息,决定发送给哪个Broker。
  30.                 * topic是单独使用命令进行创建的。此处使用默认的orderTopic
  31.                 * 2、flag
  32.                 * 网络通信层标记。(没确定用处)
  33.                 * 3、body
  34.                 * Producer要发送的实际消息内容,以字节数组形式进行存储。Message消息有一定大小限制
  35.                 * 4、transactionId
  36.                 * RocketMQ 4.3.0引入的事务消息相关的事务编号
  37.                 * 5、properties
  38.                 * 该字段为一个HashMap,存储了Message其余各项参数,比如tag、key等关键的消息属性。
  39.                 * RocketMQ预定义了一组内置属性,除了内置属性之外,还可以设置任意自定义属性。
  40.                 * 当然属性的数量也是有限的,消息序列化之后的大小不能超过预设的最大消息大小。
  41.                 * 6、tag
  42.                 * 不同的消费组,订阅同一 topic 不同的 tag,拉取不同的消息并消费。在 topic 内部对消息进行隔离。
  43.                 * */
  44.                 Message message = new Message("TopicTest", "Tag1",
  45.                         ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  46.                 SendResult send = producer.send(message);

  47.                 System.out.println("发送的消息Id: " + send.getMsgId() +"-------发送消息的状态:" + send.getSendStatus());

  48.             } catch (Exception e) {
  49.                 e.printStackTrace();
  50.                 Thread.sleep(1000);
  51.             }
  52.         }


  53.     }

  54. }
复制代码

消费者:
DefaultMQPushConsumer
使用 DefaultMQPushConsumer 主要是设置好各种参数和传人处理消息的函数 。 系统收到消息后自动调用处理函数来处理消息,自动保存 Offset,而且加入新的 DefaultMQPushConsumer后会自动做负载均衡。

DefaultMQPushConsumer需要设置三 个 参数 : 一 是这个 Consumer 的 GroupName,二是 NameServer 的地址和端 口号,三是 Topic 的名称 ,下面将分 别进行详细介绍 。

(1) Consumer的 GroupName用于把多个 Consumer组织到一起, 提高并发 处理能力, GroupName需要和消息模式 (MessageModel)配合使用。

      RocketMQ支持两种消息模式: Clustering和Broadcasting。

      在 Clustering模式下,同一个 ConsumerGroup(GroupName相同) 里的每 个 Consumer 只消费所订阅消 息的一部分 内容,       同一个 ConsumerGroup 里所有的 Consumer消费的内容合起来才是所订阅 Topic 内容的整体, 从而达到负载均衡的目的 。
      在 Broadcasting模式下,同一个 ConsumerGroup里的每个 Consumer都 能消费到所订阅 Topic 的全部消息,也就是一个消          息会被多次分发,被 多个 Consumer消费。
(2)NameServer 的地址和端口号,可以填写多个,用分号隔开,达到消除单点故障的目的,比如 “ip1:port;ip2:port” 。

(3)Topic名称用来标识消息类型, 需要提前创建。如果不需要消费某 个 Topic 下的所有消息,可以通过指定消息的 Tag 进行消息过滤,比如: Consumer.subscribe (”TopicTest”,’tag1 || tag2 || tag3”), 表示这个 Consumer要 消费“ TopicTest”下带有 tag1 或 tag2 或 tag3 的消息。在填写 Tag 参数的位置,用 null 或者“ *“ 表示要消费这个 Topic 的所有消息 。

  1. package com.suirui.mq.rocket;


  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  8. import org.apache.rocketmq.common.message.MessageExt;

  9. import java.util.List;

  10. /**
  11. * @Author zongx
  12. * @Date 2020/5/7 17:59
  13. * @Version 1.0
  14. */
  15. public class Consumer {
  16.     public static void main(String[] args) throws MQClientException {
  17.         /*
  18.         * DefaultMQPushConsumer需要设置三 个 参数 :
  19.         * 一 是这个 Consumer 的 GroupName.二是 NameServer 的地址和端 口号,三是 Topic 的名称,下面将分 别进行详细介绍 。
  20.         * (1) Consumer的 GroupName用于把多个 Consumer组织到一起, 提高并发 处理能力,
  21.         * GroupName需要和消息模式 (MessageModel)配合使用。
  22.         * RocketMQ支持两种消息模式: Clustering和Broadcasting。
  23.         * 1、在 Clustering模式下,同一个 ConsumerGroup(GroupName相同) 里的每 个 Consumer 只消费所订阅消 息的一部分 内容,
  24.          *   同一个 ConsumerGroup 里所有的 Consumer消费的内容合起来才是所订阅 Topic 内容的整体, 从而达到负载均衡的目的 。
  25.         * 2、在 Broadcasting模式下,同一个 ConsumerGroup里的每个 Consumer都 能消费到所订阅 Topic 的全部消息,
  26.         *    也就是一个消息会被多次分发,被 多个 Consumer消费。
  27.         * (2)NameServer 的地址和端口 号,可以填写多个 ,用分号隔开,达到消除单点故障的目的,比如 “ip1:port;ip2:port;ip3 :port” 。
  28.         * (3)Topic名称用来标识消息类型, 需要提前创建。如果不需要消费某 个 Topic 下的所有消息,可以通过指定消息的 Tag 进行消息过滤,比如: Consumer.subscribe (”TopicTest”,’tag1 || tag2 || tag3”),
  29.         * 表示这个 Consumer要 消费“ TopicTest”下带有 tag1 或 tag2 或 tag3 的消息。在填写 Tag 参数的位置,用 null 或者“ *“ 表示要消费这个 Topic 的所有消息 。
  30.         */
  31.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_LRW_DEV_SUBS");

  32.         consumer.setVipChannelEnabled(false);
  33.         consumer.setNamesrvAddr("localhost:9876");

  34.         //设置消费者端消息拉取策略,表示从哪里开始消费
  35.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

  36.         //设置消费者拉取消息的策略,*表示消费该topic下的所有消息,也可以指定tag进行消息过滤
  37.         consumer.subscribe("TopicTest","*");

  38.         //消费者端启动消息监听,一旦生产者发消息被监听到,就打印消息。和rabbitmq的handlerDelivery类似
  39.         consumer.registerMessageListener(new MessageListenerConcurrently() {
  40.             @Override
  41.             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  42.                 for (MessageExt messageExt : list) {
  43.                     String topic = messageExt.getTopic();
  44.                     String tags = messageExt.getTags();
  45.                     byte[] body = messageExt.getBody();
  46.                     String msg = new String(body);
  47.                     String msgId = messageExt.getMsgId();
  48.                     System.out.println("*********************************");
  49.                     System.out.println("消费响应:msgId : " + msgId + ",  msgBody : " + msg + ", tag:" + tags + ", topic:" + topic);
  50.                     System.out.println("*********************************");
  51.                 }
  52.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  53.             }
  54.         });

  55.         //调用start()方法启动consumer
  56.         consumer.start();
  57.         System.out.println("Consumer Started....");
  58.     }
  59. }
复制代码

DefaultMQPullConsumer
处理 逻辑是逐个 读 取某 Topic 下所有 Message Queue 的内容, 读完一遍后退出, 主要处理额外的三件事情:

( 1 )获取 Message Queue 并遍历

一 个 Topic 包括多个 Message Queue,如果这个 Consumer 需要获取 Topic 下所有的消息,就 要遍历多有的 Message Queue。 如果有特殊情况,也可以选 择某些特定的 Message Queue 来读取消息 。

( 2 )维护 Offsetstore

从一个 Message Queue 里拉取消息的时候,要传人 Offset参数( long类型 的值),随着不断读取消息 , Offset会不断增长 。 这个时候由用户负责把 Offset 存储下来,根据具体情况可以存到内存里、写到磁盘或者数据库里等 。

( 3 )根据不同的消息状态做不同的处理

拉取消息的请求发出后,会返回: FOUND、 NO_MATCHED_MSG、 NO_NEW_MSG、 OFFSET_ILLEGAL 四种状态,需要根据每个状态做不同的处理 。比较重要的两个状态是 FOUNT 和 NO NEW MSG ,分别表示获取到消息和没 有新的消息 。

  1. package com.suirui.mq.rocket;


  2. import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
  3. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  4. import org.apache.rocketmq.client.consumer.PullResult;
  5. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  6. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  7. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  8. import org.apache.rocketmq.client.exception.MQClientException;
  9. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  10. import org.apache.rocketmq.common.message.MessageExt;
  11. import org.apache.rocketmq.common.message.MessageQueue;

  12. import java.util.HashMap;
  13. import java.util.List;
  14. import java.util.Map;
  15. import java.util.Set;

  16. /**
  17. * @Author zongx
  18. * @Date 2020/5/7 17:59
  19. * @Version 1.0
  20. */
  21. public class PullConsumer {
  22.     public static void main(String[] args) throws MQClientException {

  23.         DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("CID_LRW_DEV_SUBS");

  24.         consumer.setVipChannelEnabled(false);
  25.         consumer.setNamesrvAddr("localhost:9876");
  26.         consumer.start();
  27.         Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
  28.         for (MessageQueue mq : mqs) {
  29.             System.out.printf("Consume from the queue: %s%n", mq);
  30.             SINGLE_MQ:
  31.             while (true) {
  32.                 try {
  33.                     PullResult pullResult =
  34.                             consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
  35.                     System.out.printf("%s%n", pullResult);
  36.                     putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
  37.                     switch (pullResult.getPullStatus()) {
  38.                         case FOUND:
  39.                             break;
  40.                         case NO_MATCHED_MSG:
  41.                             break;
  42.                         case NO_NEW_MSG:
  43.                             break SINGLE_MQ;
  44.                         case OFFSET_ILLEGAL:
  45.                             break;
  46.                         default:
  47.                             break;
  48.                     }
  49.                 } catch (Exception e) {
  50.                     e.printStackTrace();
  51.                 }
  52.             }
  53.         }
  54.     }

  55.     private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

  56.     private static long getMessageQueueOffset(MessageQueue mq) {
  57.         Long offset = OFFSE_TABLE.get(mq);
  58.         if (offset != null)
  59.             return offset;

  60.         return 0;
  61.     }

  62.     private static void putMessageQueueOffset(MessageQueue mq, long offset) {
  63.         OFFSE_TABLE.put(mq, offset);
  64.     }
  65. }
复制代码



相关帖子

扫码关注微信公众号,及时获取最新资源信息!下载附件优惠VIP会员5折;永久VIP免费
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

免责声明:
1、本站提供的所有资源仅供参考学习使用,版权归原著所有,禁止下载本站资源参与商业和非法行为,请在24小时之内自行删除!
2、本站所有内容均由互联网收集整理、网友上传,并且以计算机技术研究交流为目的,仅供大家参考、学习,请勿任何商业目的与商业用途。
3、若您需要商业运营或用于其他商业活动,请您购买正版授权并合法使用。
4、论坛的所有内容都不保证其准确性,完整性,有效性,由于源码具有复制性,一经售出,概不退换。阅读本站内容因误导等因素而造成的损失本站不承担连带责任。
5、用户使用本网站必须遵守适用的法律法规,对于用户违法使用本站非法运营而引起的一切责任,由用户自行承担
6、本站所有资源来自互联网转载,版权归原著所有,用户访问和使用本站的条件是必须接受本站“免责声明”,如果不遵守,请勿访问或使用本网站
7、本站使用者因为违反本声明的规定而触犯中华人民共和国法律的,一切后果自己负责,本站不承担任何责任。
8、凡以任何方式登陆本网站或直接、间接使用本网站资料者,视为自愿接受本网站声明的约束。
9、本站以《2013 中华人民共和国计算机软件保护条例》第二章 “软件著作权” 第十七条为原则:为了学习和研究软件内含的设计思想和原理,通过安装、显示、传输或者存储软件等方式使用软件的,可以不经软件著作权人许可,不向其支付报酬。若有学员需要商用本站资源,请务必联系版权方购买正版授权!
10、本网站如无意中侵犯了某个企业或个人的知识产权,请来信【站长信箱312337667@qq.com】告之,本站将立即删除。
郑重声明:
本站所有资源仅供用户本地电脑学习源代码的内含设计思想和原理,禁止任何其他用途!
本站所有资源、教程来自互联网转载,仅供学习交流,不得商业运营资源,不确保资源完整性,图片和资源仅供参考,不提供任何技术服务。
本站资源仅供本地编辑研究学习参考,禁止未经资源商正版授权参与任何商业行为,违法行为!如需商业请购买各资源商正版授权
本站仅收集资源,提供用户自学研究使用,本站不存在私自接受协助用户架设游戏或资源,非法运营资源行为。
快速回复 返回顶部 返回列表