服务器设置
1. 安装64位JDK;
2. 设置Linux文件系统为Ext4
3.开启9876,10911防火墙端口
源码编码
1. 安装Maven 2. 下载RocketMQ源码,下载地址:http://github.com/alibaba/RocketMQ.git/trunk,进入到源码解压目录下运行install.bat或DOS命令行切换到解压目录运行: mvn -Dmaven.test.skip=true clean package install assembly:assembly -U,编译成功后,在target目录下会有alibaba-rocketmq-3.1.1.tar.gz,该压缩包就是安装包。
3. 安装
将alibaba-rocketmq-3.1.1.tar.gz上传到linux服务器,解压:tar -zxvf
alibaba-rocketmq-3.1.1.tar.gz设置执行权限chmod +x ./alibaba-rocketmq/bin/*
4. 运行
配置采用双Master,双Slave,异步复制的配置方式,共需要4台服务器做硬件支持。 a. 修改配置
(1)创建目录
mkdir /home/rocket/alibaba-rocketmq/logs #创建日志目录
mkdir -p /home/rocket/alibaba-rocketmq/data/store/commitlog #创建数据存储目录
更改日志目录
cd /home/rocket/alibaba-rocketmq/conf
(2)修改A主配置
vi ./conf/2m-2s-async/broker-a.properties:
1. # brokerClusterName=DefaultCluster
2. brokerName=broker-a
3. brokerId=0
4. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1
92.168.1.122:9876
5. defaultTopicQueueNums=4
6. autoCreateTopicEnable=true
7. autoCreateSubscriptionGroup=true
8. listenPort=10911
9. deleteWhen=04
10. fileReservedTime=120
11. mapedFileSizeCommitLog=1073741824
12. mapedFileSizeConsumeQueue=50000000
13. destroyMapedFileIntervalForcibly=120000
14. redeleteHangedFileInterval=120000
15. diskMaxUsedSpaceRatio=88
16. storePathRootDir=/usr/framework/rocketmq/datas
17. storePathCommitLog=/usr/framework/rocketmq/logs
18. maxMessageSize=65536
19. flushCommitLogLeastPages=4
20. flushConsumeQueueLeastPages=2
21. flushCommitLogThoroughInterval=10000
22. flushConsumeQueueThoroughInterval=60000
23. checkTransactionMessageEnable=false
24. sendMessageThreadPoolNums=128
25. pullMessageThreadPoolNums=128
26. brokerRole=SYNC_MASTER
27. flushDiskType=ASYNC_FLUSH
(3)修改A从配置
vi ./conf/2m-2s-async/broker-a-s.properties:
1. # brokerClusterName=DefaultCluster
2. brokerName=broker-a
3. brokerId=1
4. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1
92.168.1.122:9876
5. defaultTopicQueueNums=4
6. autoCreateTopicEnable=true
7. autoCreateSubscriptionGroup=true
8. listenPort=10911
9. deleteWhen=04
10. fileReservedTime=120
11. mapedFileSizeCommitLog=1073741824
12. mapedFileSizeConsumeQueue=50000000
13. destroyMapedFileIntervalForcibly=120000
14. redeleteHangedFileInterval=120000
15. diskMaxUsedSpaceRatio=88
16. storePathRootDir=/usr/framework/rocketmq/datas
17. storePathCommitLog=/usr/framework/rocketmq/logs
18. maxMessageSize=65536
19. flushCommitLogLeastPages=4
20. flushConsumeQueueLeastPages=2
21. flushCommitLogThoroughInterval=10000
22. flushConsumeQueueThoroughInterval=60000
23. checkTransactionMessageEnable=false
24. sendMessageThreadPoolNums=128
25. pullMessageThreadPoolNums=128
26. brokerRole=SLAVE
27. flushDiskType=ASYNC_FLUSH
(4)修改B主配置
vi ./conf/2m-2s-async/broker-b.properties:
28. # brokerClusterName=DefaultCluster
29. brokerName=broker-b
30. brokerId=0
31. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1
92.168.1.122:9876
32. defaultTopicQueueNums=4
33. autoCreateTopicEnable=true
34. autoCreateSubscriptionGroup=true
35. listenPort=10911
36. deleteWhen=04
37. fileReservedTime=120
38. mapedFileSizeCommitLog=1073741824
39. mapedFileSizeConsumeQueue=50000000
40. destroyMapedFileIntervalForcibly=120000
41. redeleteHangedFileInterval=120000
42. diskMaxUsedSpaceRatio=88
43. storePathRootDir=/usr/framework/rocketmq/datas
44. storePathCommitLog=/usr/framework/rocketmq/logs
45. maxMessageSize=65536
46. flushCommitLogLeastPages=4
47. flushConsumeQueueLeastPages=2
48. flushCommitLogThoroughInterval=10000
49. flushConsumeQueueThoroughInterval=60000
50. checkTransactionMessageEnable=false
51. sendMessageThreadPoolNums=128
52. pullMessageThreadPoolNums=128
53. brokerRole=SYNC_MASTER
54. flushDiskType=ASYNC_FLUSH (5)修改B从配置
vi ./conf/2m-2s-async/broker-b-s.properties:
28. # brokerClusterName=DefaultCluster
29. brokerName=broker-b
30. brokerId=1
31. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1
92.168.1.122:9876
32. defaultTopicQueueNums=4
33. autoCreateTopicEnable=true
34. autoCreateSubscriptionGroup=true
35. listenPort=10911
36. deleteWhen=04
37. fileReservedTime=120
38. mapedFileSizeCommitLog=1073741824
39. mapedFileSizeConsumeQueue=50000000
40. destroyMapedFileIntervalForcibly=120000
41. redeleteHangedFileInterval=120000
42. diskMaxUsedSpaceRatio=88
43. storePathRootDir=/usr/framework/rocketmq/datas
44. storePathCommitLog=/usr/framework/rocketmq/logs
45. maxMessageSize=65536
46. flushCommitLogLeastPages=4
47. flushConsumeQueueLeastPages=2
demo:
Producer类
package com.lvxc.study.tech.rmq;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
//nameserver服务,多个以;分开
producer.setNamesrvAddr("192.168.133.128:9876");
try{
producer.start();
Message msg = new Message("PushTopic","push","1","Just for test.".getBytes());
SendResult result = producer.send(msg);
System.out.println("id:"+result.getMsgId()+" result:" +result.getSendStatus());;
msg = new Message("PushTopic","push","2","Just for test.".getBytes());
result = producer.send(msg);
System.out.println("id:"+result.getMsgId()+" result:" +result.getSendStatus());;
msg = new Message("PullTopic","pull","1","Just for test.".getBytes());
result = producer.send(msg);
System.out.println("id:"+result.getMsgId()+" result:" +result.getSendStatus());;
}catch(Exception e){
e.printStackTrace();
}finally {
producer.shutdown();
}
}
}
Consumer类
package com.lvxc.study.tech.rmq;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("192.168.133.128:9876");
try {
//订阅PushTopic下Tag为push的消息
consumer.subscribe("PushTopic", "push");
//程序第一次启动从消息队列头取数据
consumer.setConsumeFromWhere(
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> list,
ConsumeConcurrentlyContext Context) {
Message msg = list.get(0);
System.out.println(msg.toString());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
相关推荐
RocketMQ消息队列demo,输入IP端口,就可以对消息队列进行操作,发布消息,订阅消息。
初学者可以来学习一下,rocketMQ的简单的小demo 简单易懂
java编写的RocketMQ入门demo,maven 更新依赖,可直接运行Producer和Consumer 简单进行测试
RocketMQ原生API使用 SpringBoot整合RocketMQ SpringCloudStream整合RocketMQ
Apache RocketMQ入门demo,用来理解RocketMQ的基本原理,附带必要代码注释。基于maven的用Java api编写的Producer和Consumer;
该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。
很全的rocket包及安装详细说明附加demo示例。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。...
springboot 整合 rocketmq实例代码
windows环境下配置安装rocketmq,文档分上下两篇,上篇讲配置,下篇讲和springMVC集成,很好的demo
spring-boot操作rocketmq的demo,亲测可用,代码整理的好
rocketmq-demo
为了帮助大家更好的学习和使用RocketMQ,因此提供相关学习文档,一起学习,资源包括 RocketMQ初步认知、RocketMQ单机环境搭建、RocketMQ集群部署实践、基于myeclipse的RocketMQ--Demo实践、基于RocketMQ--Demo项目的...
rocketmq-demo:最终一致性分布式事务-rocketmq使用
springboot1.5.10.RELEASE集成rocketmq4.3.1消息服务demo,多个消费者多监听
基于java springboot的rocketmq生产和消费demo,开发工具为idea
近段时间对RocketMQ队列的一个整理,收集绝大部分网上相关的资料,以及一个demo的实现
java 使用 rocketmq的一个生产者和消费者的实现,其中要先启动rocket的nameserver 和borker
rocketmq集成至springmvc,rocketmq快速上手,快速集成至原有项目进行开发
rocketmq监控 查看rocketmq.namesrv对应下的broker、topic、consuemr/producer等