您好,登錄后才能下訂單哦!
這篇文章主要講解了“Apache Pulsar啟動(dòng)了哪些服務(wù)”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Apache Pulsar啟動(dòng)了哪些服務(wù)”吧!
PulsarStandaloneStarter
在standalone模式下,主要啟動(dòng)了以下幾個(gè)服務(wù)
PulsarService
PulsarAdmin
LocalBookeeperEnsemble
WorkerService
PulsarBrokerStarter.BrokerStarter
在普通模式下,啟動(dòng)了以下幾個(gè)服務(wù)
PulsarService
BookieServer
AutoRecoveryMain
StatsProvider
WorkerService
簡單說一些這幾個(gè)服務(wù)
WorkerService: Pulsar function 相關(guān),可以不啟動(dòng)
PulsarService: 主要的PulsarBroker相關(guān)
BookieServer: Bookeeper相關(guān)
AutoRecoveryMain: Bookeeper autorecovery相關(guān)
StatsProvider: Metric Exporter類似的功能
PulsarService.start
ProtocolHandlers
支持不同protocol處理(kafka協(xié)議等)
localZookeeperConnectionProvider
維護(hù)zk session 和zk連接
startZkCacheService
LocalZooKeeperCache => LocalZooKeeperCacheService
GlobalZooKeeperCache => ConfigurationCacheService
BookkeeperClientFactory
創(chuàng)建配置Bookkeeper 客戶端
managedLedgerClientFactory
維護(hù)一個(gè)ManagedLedger的客戶端,借用BookkeeperClient
BrokerService
這個(gè)是服務(wù)器的主要邏輯了,這個(gè)放在后面說
loadManager
收集集群機(jī)器負(fù)載,并根據(jù)負(fù)載情況均衡負(fù)載
startNamespaceService
NameSpaceService,管理放置的ResourceBundle,和LoadManager相關(guān)
schemaStorage
schemaRegistryService
上面2個(gè)都是和Schema相關(guān)的
defaultOffloader
LedgerOffloader,用來將Ledger(Bookkeeper)中的冷數(shù)據(jù)放到其他存儲當(dāng)中
WebService
webSocketService
http,websocket相關(guān)
LeaderElectionService
和LoadManager有關(guān),如果是集中方式的話需要選出一個(gè)Leader定期根據(jù)集群情況進(jìn)行均衡負(fù)載
transactionMetadataStoreService
事務(wù)相關(guān)
metricGenerator
metric相關(guān)
WorkerService
pulsar function 相關(guān)
public void start() throws Exception { // producer id 分布式生成器 this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath, pulsar.getConfiguration().getClusterName()); // 網(wǎng)絡(luò)層配置 ServerBootstrap bootstrap = defaultServerBootstrap.clone(); ServiceConfiguration serviceConfig = pulsar.getConfiguration(); bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false)); ... // 綁定端口 listenChannel = bootstrap.bind(addr).sync().channel(); ... // metric this.startStatsUpdater( serviceConfig.getStatsUpdateInitialDelayInSecs(), serviceConfig.getStatsUpdateFrequencyInSecs()); // 啟動(dòng)了一堆需要定期執(zhí)行的任務(wù) this.startInactivityMonitor(); // 啟動(dòng)3個(gè)schedule任務(wù)分別檢測 // 1. 長時(shí)間無效的topic // 2. 長時(shí)間無效的producer(和message去重相關(guān)) // 3. 長時(shí)間無效的subscription this.startMessageExpiryMonitor(); this.startCompactionMonitor(); this.startMessagePublishBufferMonitor(); this.startConsumedLedgersMonitor(); this.startBacklogQuotaChecker(); this.updateBrokerPublisherThrottlingMaxRate(); this.startCheckReplicationPolicies(); // register listener to capture zk-latency ClientCnxnAspect.addListener(zkStatsListener); ClientCnxnAspect.registerExecutor(pulsar.getExecutor());
順著netty的初始化方式我們直接看ChannelInitializer,這里應(yīng)該和Kafka類似進(jìn)行處理請求的操作。
protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); ch.pipeline().addLast("flowController", new FlowControlHandler()); ServerCnx cnx = new ServerCnx(pulsar); ch.pipeline().addLast("handler", cnx); connections.put(ch.remoteAddress(), cnx); }
這個(gè)類的作用可以對標(biāo)KafkaApis,處理各種Api請求
這個(gè)類實(shí)際上是一個(gè)ChannelHandler
繼承了PulsarHandler(主要負(fù)責(zé)一些連接的keepalive邏輯)
PulsarHandler繼承了 PulsarDecoder ( 主要負(fù)責(zé)序列化,反序列化Api請求)
PulsarDecoder實(shí)際上是一個(gè) ChannelInboundHandlerAdapter
而PulsarAPi實(shí)際上是通過Pulsar.proto 生成的,這里編寫了各種Api的定義
感謝各位的閱讀,以上就是“Apache Pulsar啟動(dòng)了哪些服務(wù)”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Apache Pulsar啟動(dòng)了哪些服務(wù)這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。