溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Apache Pulsar啟動(dòng)了哪些服務(wù)

發(fā)布時(shí)間:2021-12-24 10:34:28 來源:億速云 閱讀:272 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“Apache Pulsar啟動(dòng)了哪些服務(wù)”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Apache Pulsar啟動(dòng)了哪些服務(wù)”吧!

1.啟動(dòng)入口

PulsarStandaloneStarter
在standalone模式下,主要啟動(dòng)了以下幾個(gè)服務(wù)

  1. PulsarService

  2. PulsarAdmin

  3. LocalBookeeperEnsemble

  4. WorkerService

PulsarBrokerStarter.BrokerStarter
在普通模式下,啟動(dòng)了以下幾個(gè)服務(wù)

  1. PulsarService

  2. BookieServer

  3. AutoRecoveryMain

  4. StatsProvider

  5. WorkerService

簡單說一些這幾個(gè)服務(wù)

  • WorkerService: Pulsar function 相關(guān),可以不啟動(dòng)

  • PulsarService: 主要的PulsarBroker相關(guān)

  • BookieServer: Bookeeper相關(guān)

  • AutoRecoveryMain: Bookeeper autorecovery相關(guān)

  • StatsProvider: Metric Exporter類似的功能

2. PulsarService

PulsarService.start

  1. ProtocolHandlers
    支持不同protocol處理(kafka協(xié)議等)

  2. localZookeeperConnectionProvider
    維護(hù)zk session 和zk連接

  3. startZkCacheService

    • LocalZooKeeperCache => LocalZooKeeperCacheService

    • GlobalZooKeeperCache => ConfigurationCacheService

  4. BookkeeperClientFactory
    創(chuàng)建配置Bookkeeper 客戶端

  5. managedLedgerClientFactory
    維護(hù)一個(gè)ManagedLedger的客戶端,借用BookkeeperClient

  6. BrokerService
    這個(gè)是服務(wù)器的主要邏輯了,這個(gè)放在后面說

  7. loadManager
    收集集群機(jī)器負(fù)載,并根據(jù)負(fù)載情況均衡負(fù)載

  8. startNamespaceService
    NameSpaceService,管理放置的ResourceBundle,和LoadManager相關(guān)

  9. schemaStorage

  10. schemaRegistryService
    上面2個(gè)都是和Schema相關(guān)的

  11. defaultOffloader
    LedgerOffloader,用來將Ledger(Bookkeeper)中的冷數(shù)據(jù)放到其他存儲當(dāng)中

  1. WebService

  2. webSocketService
    http,websocket相關(guān)

  3. LeaderElectionService
    和LoadManager有關(guān),如果是集中方式的話需要選出一個(gè)Leader定期根據(jù)集群情況進(jìn)行均衡負(fù)載

  4. transactionMetadataStoreService
    事務(wù)相關(guān)

  5. metricGenerator
    metric相關(guān)

  6. WorkerService
    pulsar function 相關(guān)

3. BrokerService

public void start() throws Exception {
        // producer id 分布式生成器
        this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath,
                pulsar.getConfiguration().getClusterName());

        // 網(wǎng)絡(luò)層配置
        ServerBootstrap bootstrap = defaultServerBootstrap.clone();

        ServiceConfiguration serviceConfig = pulsar.getConfiguration();

        bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
        ...
        // 綁定端口
        listenChannel = bootstrap.bind(addr).sync().channel();
        ...

       // metric
        this.startStatsUpdater(
                serviceConfig.getStatsUpdateInitialDelayInSecs(),
                serviceConfig.getStatsUpdateFrequencyInSecs());

       // 啟動(dòng)了一堆需要定期執(zhí)行的任務(wù)
        this.startInactivityMonitor();
       // 啟動(dòng)3個(gè)schedule任務(wù)分別檢測
       // 1. 長時(shí)間無效的topic
       // 2. 長時(shí)間無效的producer(和message去重相關(guān))
       // 3. 長時(shí)間無效的subscription
        this.startMessageExpiryMonitor();
        this.startCompactionMonitor();
        this.startMessagePublishBufferMonitor();
        this.startConsumedLedgersMonitor();
        this.startBacklogQuotaChecker();
        this.updateBrokerPublisherThrottlingMaxRate();
        this.startCheckReplicationPolicies();

        // register listener to capture zk-latency
        ClientCnxnAspect.addListener(zkStatsListener);
        ClientCnxnAspect.registerExecutor(pulsar.getExecutor());

4. PulsarChannelInitializer

順著netty的初始化方式我們直接看ChannelInitializer,這里應(yīng)該和Kafka類似進(jìn)行處理請求的操作。

protected void initChannel(SocketChannel ch) throws Exception {
        
        ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
     
        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
            brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));

        ch.pipeline().addLast("flowController", new FlowControlHandler());
        ServerCnx cnx = new ServerCnx(pulsar);
        ch.pipeline().addLast("handler", cnx);

        connections.put(ch.remoteAddress(), cnx);
    }

5. ServerCnx

這個(gè)類的作用可以對標(biāo)KafkaApis,處理各種Api請求
這個(gè)類實(shí)際上是一個(gè)ChannelHandler
繼承了PulsarHandler(主要負(fù)責(zé)一些連接的keepalive邏輯)
PulsarHandler繼承了 PulsarDecoder ( 主要負(fù)責(zé)序列化,反序列化Api請求)
PulsarDecoder實(shí)際上是一個(gè) ChannelInboundHandlerAdapter

而PulsarAPi實(shí)際上是通過Pulsar.proto 生成的,這里編寫了各種Api的定義

感謝各位的閱讀,以上就是“Apache Pulsar啟動(dòng)了哪些服務(wù)”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Apache Pulsar啟動(dòng)了哪些服務(wù)這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI