您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“Linux下怎么部署分布式消息系統(tǒng)RocketMQ”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“Linux下怎么部署分布式消息系統(tǒng)RocketMQ”吧!
鏈接:https://pan.baidu.com/s/17iUB1lBOjv4CBAEQFvn65A 提取碼:v0sn
RocketMQ java編寫,需要jdk環(huán)境
下載jdk 1.7.0_80 上傳到linux ,必須64位,32位RocketMQ不支持
tar -zxvf jdk-7u80-linux-x64.tar.gz //解壓
修改環(huán)境變量 vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.7.0_80export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jarexport PATH=$JAVA_HOME/bin:$PATH
刷新配置
source /etc/profile
或jdk1.8下載安裝教程:https://blog.csdn.net/qq_41463655/article/details/99173682
2.1、上傳alibaba-rocketmq-3.2.6.tar.gz 上傳到linux解壓安裝
tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local //解壓到 /usr/localmv /usr/local/alibaba-rocketmq /usr/local/alibaba-rocketmq-3.2.6 //重命名 ln -s /usr/local/alibaba-rocketmq-3.2.6 rocketmq //安裝
安裝好了
2.2、創(chuàng)建存儲路徑
cd /usr/local/rocketmq mkdir store mkdir store/commitlog mkdir store/consumequeue mkdir store/index
2.3、日志配置
cd /usr/local/rocketmq mkdir logs cd conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
2.4、配置 broker-a.properties / broker-b.properties /usr/local/rocketmq/conf/2m-noslave/ 目錄下
2.4.1、broker-a.properties
#所屬集群名字brokerClusterName=rocketmq-cluster#broker名字,注意此處不同的配置文件填寫的不一樣brokerName=broker-a|broker-b#0 表示 Master,>0 表示 SlavebrokerId=0#nameServer地址,分號分割namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876#在發(fā)送消息時,自動創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊列數(shù)defaultTopicQueueNums=4#是否允許 Broker 自動創(chuàng)建Topic,建議線下開啟,線上關(guān)閉autoCreateTopicEnable=true#是否允許 Broker 自動創(chuàng)建訂閱組,建議線下開啟,線上關(guān)閉autoCreateSubscriptionGroup=true#Broker 對外服務(wù)的監(jiān)聽端口listenPort=10911#刪除文件時間點,默認(rèn)凌晨 4點deleteWhen=04#文件保留時間,默認(rèn) 48 小時fileReservedTime=120#commitLog每個文件的大小默認(rèn)1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每個文件默認(rèn)存30W條,根據(jù)業(yè)務(wù)情況調(diào)整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#檢測物理文件磁盤空間diskMaxUsedSpaceRatio=88#存儲路徑storePathRootDir=/usr/local/rocketmq/store#commitLog 存儲路徑storePathCommitLog=/usr/local/rocketmq/store/commitlog#消費隊列存儲路徑存儲路徑storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue#消息索引存儲路徑storePathIndex=/usr/local/rocketmq/store/index#checkpoint 文件存儲路徑storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存儲路徑abortFile=/usr/local/rocketmq/store/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 異步復(fù)制Master#- SYNC_MASTER 同步雙寫Master#- SLAVEbrokerRole=ASYNC_MASTER#刷盤方式#- ASYNC_FLUSH 異步刷盤#- SYNC_FLUSH 同步刷盤flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#發(fā)消息線程池數(shù)量#sendMessageThreadPoolNums=128#拉消息線程池數(shù)量#pullMessageThreadPoolNums=128
2.4.2、broker-b.properties
#所屬集群名字brokerClusterName=rocketmq-cluster#broker名字,注意此處不同的配置文件填寫的不一樣brokerName=broker-a|broker-b#0 表示 Master,>0 表示 SlavebrokerId=0#nameServer地址,分號分割namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876#在發(fā)送消息時,自動創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊列數(shù)defaultTopicQueueNums=4#是否允許 Broker 自動創(chuàng)建Topic,建議線下開啟,線上關(guān)閉autoCreateTopicEnable=true#是否允許 Broker 自動創(chuàng)建訂閱組,建議線下開啟,線上關(guān)閉autoCreateSubscriptionGroup=true#Broker 對外服務(wù)的監(jiān)聽端口listenPort=10911#刪除文件時間點,默認(rèn)凌晨 4點deleteWhen=04#文件保留時間,默認(rèn) 48 小時fileReservedTime=120#commitLog每個文件的大小默認(rèn)1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每個文件默認(rèn)存30W條,根據(jù)業(yè)務(wù)情況調(diào)整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#檢測物理文件磁盤空間diskMaxUsedSpaceRatio=88#存儲路徑storePathRootDir=/usr/local/rocketmq/store#commitLog 存儲路徑storePathCommitLog=/usr/local/rocketmq/store/commitlog#消費隊列存儲路徑存儲路徑storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue#消息索引存儲路徑storePathIndex=/usr/local/rocketmq/store/index#checkpoint 文件存儲路徑storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存儲路徑abortFile=/usr/local/rocketmq/store/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 異步復(fù)制Master#- SYNC_MASTER 同步雙寫Master#- SLAVEbrokerRole=ASYNC_MASTER#刷盤方式#- ASYNC_FLUSH 異步刷盤#- SYNC_FLUSH 同步刷盤flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#發(fā)消息線程池數(shù)量#sendMessageThreadPoolNums=128#拉消息線程池數(shù)量#pullMessageThreadPoolNums=128
兩個配置文件需修改處
brokerName=broker-a|broker-b 集群a服務(wù)器配置修改為 brokerName=broker-abrokerName=broker-a|broker-b 集群b服務(wù)器配置修改為 brokerName=broker-b
2.5、修改啟動參數(shù) /rocketm/bin下 (jvm)
runbroker.sh 的JAVA_OPT runserver.sh 的JAVA_OPT
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"修改為 JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m -XX:MaxPermSize=320m"
2.6、啟動 NameServer 安裝目錄 /usr/local/ /rocketmq/bin 目錄下
nohup sh mqnamesrv &
2.7、啟動 BrokerServer /rocketmq/bin 目錄下
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 & netstat -ntlp
查看啟動狀態(tài)
jps
結(jié)果如下啟動成功
本機ip,配置域名
192.168.177.128 rocketmq-nameserver1 192.168.177.128 rocketmq-master1 192.168.111.129 rocketmq-nameserver2 192.168.111.129 rocketmq-master2
圖片
解壓安裝 tomcat 7.0到 /usr/local/
tar -zxvf apache-tomcat-7.0.65.tar.gz -C /usr/local
把rocketmq-web-console.war 復(fù)制到apache-tomcat-7.0.65 的webapps 目錄下 啟動tomcat 自動解壓,然后修改config /rocketmq-web-console/WEB-INF/classes 的 config.properties 配置 修改ip
單服務(wù)器 rocketmq.namesrv.addr=192.168.177.128:9876 多服務(wù)器 rocketmq.namesrv.addr=192.168.177.128:9876;192.168.177.129:9876
關(guān)閉tomcat / 重啟tomcat
關(guān)閉防火墻
systemctl disable firewalld 或 chkconfig iptables off
訪問 —-》 ip:8080/rocketmq-web-console 出現(xiàn)下方界面就ok了
import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("rmq-group"); producer.setNamesrvAddr("192.168.177.128:9876;192.268.177.129:9876"); producer.setInstanceName("producer"); producer.start(); try { for (int i = 0; i "test-topic", "TagA", ("test-topic-"+i).getBytes() ); SendResult sendResult = producer.send(msg); System.out.println(sendResult.toString()); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group"); consumer.setNamesrvAddr("192.168.177.128:9876;192.268.177.129:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("test-topic", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg.getMsgId()+"---"+new String(msg.getBody())); } //返回成功消費狀態(tài) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
會出現(xiàn)冪等問題,使用全局id,或者時間戳,業(yè)務(wù)的唯一id 進(jìn)行判斷,使用redis等日志記錄判斷是否存在,存在表示已經(jīng)成功消費
到此,相信大家對“Linux下怎么部署分布式消息系統(tǒng)RocketMQ”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。