您好,登錄后才能下訂單哦!
這篇文章主要介紹“Apache Pulsar二進制協(xié)議怎么實現(xiàn)”,在日常操作中,相信很多人在Apache Pulsar二進制協(xié)議怎么實現(xiàn)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Apache Pulsar二進制協(xié)議怎么實現(xiàn)”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
pulsar 使用protocolBuf 作為二進制協(xié)議編寫的工具
message BaseCommand { enum Type { CONNECT = 2; CONNECTED = 3; // consumer 注冊 SUBSCRIBE = 4; // producer 注冊 PRODUCER = 5; // 向topic寫入消息 SEND = 6; // 寫入的response SEND_RECEIPT= 7; // 寫入異常的response SEND_ERROR = 8; // 發(fā)message 給consumer MESSAGE = 9; // 確認某個消息是否成功消費 ACK = 10; // consumer 請求消息 FLOW = 11; UNSUBSCRIBE = 12; // 通用的一個成功的response SUCCESS = 13; // 通用的一個異常的response ERROR = 14; CLOSE_PRODUCER = 15; CLOSE_CONSUMER = 16; // Producer 的 response PRODUCER_SUCCESS = 17; // 網(wǎng)絡層keepAlive 用的 PING = 18; PONG = 19; // REDELIVER_UNACKNOWLEDGED_MESSAGES = 20; PARTITIONED_METADATA = 21; PARTITIONED_METADATA_RESPONSE = 22; LOOKUP = 23; LOOKUP_RESPONSE = 24; CONSUMER_STATS = 25; CONSUMER_STATS_RESPONSE = 26; // REACHED_END_OF_TOPIC = 27; SEEK = 28; GET_LAST_MESSAGE_ID = 29; GET_LAST_MESSAGE_ID_RESPONSE = 30; // ACTIVE_CONSUMER_CHANGE = 31; GET_TOPICS_OF_NAMESPACE = 32; GET_TOPICS_OF_NAMESPACE_RESPONSE = 33; GET_SCHEMA = 34; GET_SCHEMA_RESPONSE = 35; AUTH_CHALLENGE = 36; AUTH_RESPONSE = 37; ACK_RESPONSE = 38; GET_OR_CREATE_SCHEMA = 39; GET_OR_CREATE_SCHEMA_RESPONSE = 40; // transaction related // 事務相關的比較容易理解,下面先忽略了 50 - 61 } // ..... }
這里是客戶端與server連接的channel一連上就會發(fā)送一個CONNECT 請求
這里會有一些鑒權和協(xié)議版本上報的信息。
溝通客戶端版本之后,服務端就知道客戶端支持哪些特性,會做一些兼容處理
相當于kafka 里面的ApiVersionRequest
// org.apache.pulsar.client.impl.ClientCnx public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); this.timeoutTask = this.eventLoopGroup.scheduleAtFixedRate(() -> checkRequestTimeout(), operationTimeoutMs, operationTimeoutMs, TimeUnit.MILLISECONDS); if (proxyToTargetBrokerAddress == null) { if (log.isDebugEnabled()) { log.debug("{} Connected to broker", ctx.channel()); } } else { log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress); } // Send CONNECT command ctx.writeAndFlush(newConnectCommand()) .addListener(future -> { if (future.isSuccess()) { if (log.isDebugEnabled()) { log.debug("Complete: {}", future.isSuccess()); } state = State.SentConnectFrame; } else { log.warn("Error during handshake", future.cause()); ctx.close(); } }); }
這里實際上是CommandConnect 的response ,但是換了名字
(很容易對不上號)
// org.apache.pulsar.broker.service.ServerCnx protected void handleConnect(CommandConnect connect) { checkArgument(state == State.Start); if (log.isDebugEnabled()) { log.debug("Received CONNECT from {}, auth enabled: {}:" + " has original principal = {}, original principal = {}", remoteAddress, service.isAuthenticationEnabled(), connect.hasOriginalPrincipal(), connect.getOriginalPrincipal()); } String clientVersion = connect.getClientVersion(); int clientProtocolVersion = connect.getProtocolVersion(); features = new FeatureFlags(); if (connect.hasFeatureFlags()) { features.copyFrom(connect.getFeatureFlags()); } if (!service.isAuthenticationEnabled()) { completeConnect(clientProtocolVersion, clientVersion); return; } // ...... }
這個RPC是consumer用來在服務端注冊的。
具體調(diào)用的位置是,在ConsumerImpl
構(gòu)造函數(shù)的最后一行會請求服務端和客戶端進行連接,如果拿到了一個Connection,會調(diào)用這個連接成功的回調(diào)connectionOpened
如果是consumer的話就會發(fā)送這個請求,來注冊consumer相關的信息。
如果和上面的CommandConnect
請求聯(lián)動起來,這個請求是在CommandConnect
之后發(fā)送的。
// org.apache.pulsar.client.impl.ConsumerImpl @Override public void connectionOpened(final ClientCnx cnx) { // ... 上面做了一大堆的準備參數(shù)先忽略 // 構(gòu)建一個subscription ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, conf.isReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()), startMessageRollbackDuration, schemaInfo, createTopicIfDoesNotExist, conf.getKeySharedPolicy()); }
proto定義說明(見注釋)
message CommandSubscribe { // 這里對應subscription的4種類型 enum SubType { Exclusive = 0; Shared = 1; Failover = 2; Key_Shared = 3; } // topic 名字 required string topic = 1; // subscription 名字 required string subscription = 2; // subscription 類型 required SubType subType = 3; // 這個是用來標記這個網(wǎng)絡連接上的consumer標識 required uint64 consumer_id = 4; // 網(wǎng)絡層的請求標識 required uint64 request_id = 5; // consumer 名字 optional string consumer_name = 6; // consumer 的優(yōu)先級,優(yōu)先級高的consumer 容易先收到請求 optional int32 priority_level = 7; // 這個subsciption是否是持久化的 // Signal wether the subscription should be backed by a // durable cursor or not optional bool durable = 8 [default = true]; // If specified, the subscription will position the cursor // markd-delete position on the particular message id and // will send messages from that point optional MessageIdData start_message_id = 9; // 加了一些consumer 的自定義tag Map<String,String> /// Add optional metadata key=value to this consumer repeated KeyValue metadata = 10; optional bool read_compacted = 11; optional Schema schema = 12; // 初始化位置從哪里開始,最新還是最舊 enum InitialPosition { Latest = 0; Earliest = 1; } // Signal whether the subscription will initialize on latest // or not -- earliest optional InitialPosition initialPosition = 13 [default = Latest]; // geo-replication 相關,先忽略 // Mark the subscription as "replicated". Pulsar will make sure // to periodically sync the state of replicated subscriptions // across different clusters (when using geo-replication). optional bool replicate_subscription_state = 14; // If true, the subscribe operation will cause a topic to be // created if it does not exist already (and if topic auto-creation // is allowed by broker. // If false, the subscribe operation will fail if the topic // does not exist. optional bool force_topic_creation = 15 [default = true]; // 這個是按照時間重置消費進度的時候 // If specified, the subscription will reset cursor's position back // to specified seconds and will send messages from that point optional uint64 start_message_rollback_duration_sec = 16 [default = 0]; // key_Share 模式使用的,暫時不看 optional KeySharedMeta keySharedMeta = 17; }
這個RPC 和 consumer相對應的,是producer在服務端注冊用的,調(diào)用位置也是相同的org.apache.pulsar.client.impl.ProducerImpl.connectionOpened
里面。
/// Create a new Producer on a topic, assigning the given producer_id, /// all messages sent with this producer_id will be persisted on the topic message CommandProducer { // topic required string topic = 1; required uint64 producer_id = 2; // 網(wǎng)絡層的請求編號 required uint64 request_id = 3; /// If a producer name is specified, the name will be used, /// otherwise the broker will generate a unique name optional string producer_name = 4; // 是否是加密的寫入 optional bool encrypted = 5 [default = false]; // 元數(shù)據(jù) Map<String,String> /// Add optional metadata key=value to this producer repeated KeyValue metadata = 6; optional Schema schema = 7; // 這里應該叫producer_epoch // If producer reconnect to broker, the epoch of this producer will +1 optional uint64 epoch = 8 [default = 0]; // Indicate the name of the producer is generated or user provided // Use default true here is in order to be forward compatible with the client optional bool user_provided_producer_name = 9 [default = true]; // 這里是寫入的3種方式 // Require that this producers will be the only producer allowed on the topic optional ProducerAccessMode producer_access_mode = 10 [default = Shared]; // Topic epoch is used to fence off producers that reconnects after a new // exclusive producer has already taken over. This id is assigned by the // broker on the CommandProducerSuccess. The first time, the client will // leave it empty and then it will always carry the same epoch number on // the subsequent reconnections. optional uint64 topic_epoch = 11; } enum ProducerAccessMode { Shared = 0; // By default multiple producers can publish on a topic Exclusive = 1; // Require exclusive access for producer. Fail immediately if there's already a producer connected. WaitForExclusive = 2; // Producer creation is pending until it can acquire exclusive access }
這個是作為CommandProduce 請求的成功response
/// Response from CommandProducer message CommandProducerSuccess { // 網(wǎng)絡層id required uint64 request_id = 1; // producer 名字 required string producer_name = 2; // The last sequence id that was stored by this producer in the previous session // This will only be meaningful if deduplication has been enabled. optional int64 last_sequence_id = 3 [default = -1]; optional bytes schema_version = 4; // The topic epoch assigned by the broker. This field will only be set if we // were requiring exclusive access when creating the producer. optional uint64 topic_epoch = 5; // 這個應該和上面ProducerAccessMode 相關,后面有機會來介紹這個吧 // If producer is not "ready", the client will avoid to timeout the request // for creating the producer. Instead it will wait indefinitely until it gets // a subsequent `CommandProducerSuccess` with `producer_ready==true`. optional bool producer_ready = 6 [default = true]; }
這個是producer 用來發(fā)送消息到服務端用的RPC
可以通過org.apache.pulsar.client.impl.ProducerImpl.sendAsync
這個方法一路追到這個調(diào)用的位置,一般消息經(jīng)過batch,加密,分塊等邏輯處理之后,會將消息序列化成這個請求。
具體序列化的格式是下面這個
BaseCommand就是CommandSend
// org.apache.pulsar.common.protocol.Commands private static ByteBufPair serializeCommandSendWithSize(BaseCommand cmd, ChecksumType checksumType, MessageMetadata msgMetadata, ByteBuf payload) { // / Wire format // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
這里面的protocol格式實際只包含了上面的 [CMD] 部分
message CommandSend { required uint64 producer_id = 1; required uint64 sequence_id = 2; optional int32 num_messages = 3 [default = 1]; optional uint64 txnid_least_bits = 4 [default = 0]; optional uint64 txnid_most_bits = 5 [default = 0]; /// Add highest sequence id to support batch message with external sequence id optional uint64 highest_sequence_id = 6 [default = 0]; optional bool is_chunk =7 [default = false]; }
這個是服務端成功處理完消息持久化之后成功的response
message CommandSendReceipt { required uint64 producer_id = 1; // 這個是用來保證順序的 required uint64 sequence_id = 2; optional MessageIdData message_id = 3; // 這個應該是用來去重的 optional uint64 highest_sequence_id = 4 [default = 0]; } // 這個是返回的寫入成功的消息id,這個結(jié)構(gòu)會在其他位置復用 message MessageIdData { required uint64 ledgerId = 1; required uint64 entryId = 2; optional int32 partition = 3 [default = -1]; // 這里是 optional int32 batch_index = 4 [default = -1]; repeated int64 ack_set = 5; optional int32 batch_size = 6; }
這個是CommandSend 異常的response
message CommandSendError { required uint64 producer_id = 1; required uint64 sequence_id = 2; required ServerError error = 3; required string message = 4; }
這個是用來告知服務端我這個consumer當前可以接受消息的數(shù)目
服務端會記錄一個subscription里面每個consumer當前可以接受消息的數(shù)目
分配消息給哪個consumer的時候會按照這個數(shù)目來確定consumer當前能否接受消息。
目前了解到的位置是在connectionOpened
的這個方法成功處理Subscription 注冊之后會發(fā)送一個CommandFlow
請求,來讓服務端推送消息。
不過可以想到,如果consumer隊列是空閑的狀態(tài)下都會發(fā)送這個消息。
message CommandFlow { required uint64 consumer_id = 1; // Max number of messages to prefetch, in addition // of any number previously specified required uint32 messagePermits = 2; }
這里實際上可能是服務端推消息給consumer,服務端會主動發(fā)送這個請求給consumer。(這個邏輯在服務端的 subscription 里的 dispatcher里面)
具體的調(diào)用位置在 org.apache.pulsar.broker.service.Consumer#sendMessages
這個方法在往上看一層的話都是org.apache.pulsar.broker.service.Dispatcher
這個類調(diào)用的。
這里和上面寫入的格式一樣這里的Command 實際上是一個RPC的header后面會加上消息的payload。
// Wire format // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD] // // metadataAndPayload contains from magic-number to the payload included
message CommandMessage { required uint64 consumer_id = 1; // 這里是消息的id required MessageIdData message_id = 2; // 這個消息重發(fā)了多少次 optional uint32 redelivery_count = 3 [default = 0]; // 這個消息里面哪些已經(jīng)被ack了 repeated int64 ack_set = 4; }
這個用來ack成功消費的消息,可以單獨ack一條消息,
也可以累積確認(類似kafka)。
這里為了減少RPC的頻率,在客戶端做了一個batch ack 的優(yōu)化。
服務端的對應處理一般會更新ManagedCursor
里面保存的數(shù)據(jù),將這個ack的結(jié)果持久化。
message CommandAck { // ack 類型,是累積確認還是單獨確認 enum AckType { Individual = 0; Cumulative = 1; } required uint64 consumer_id = 1; required AckType ack_type = 2; // 這里類型是repeated類型的可以把ack做batch // In case of individual acks, the client can pass a list of message ids repeated MessageIdData message_id = 3; // Acks can contain a flag to indicate the consumer // received an invalid message that got discarded // before being passed on to the application. enum ValidationError { UncompressedSizeCorruption = 0; DecompressionError = 1; ChecksumMismatch = 2; BatchDeSerializeError = 3; DecryptionError = 4; } // 一些異常情況可能也會ack這個消息,這里會記錄一些信息 optional ValidationError validation_error = 4; repeated KeyLongValue properties = 5; optional uint64 txnid_least_bits = 6 [default = 0]; optional uint64 txnid_most_bits = 7 [default = 0]; // 網(wǎng)絡層請求id optional uint64 request_id = 8; }
這個是consumer告訴服務端哪些消息需要重新被投遞的RPC
message CommandRedeliverUnacknowledgedMessages { required uint64 consumer_id = 1; repeated MessageIdData message_ids = 2; }
這個其實是一個公用的response,如果請求沒有特殊需要返回的字段的話,幾乎可以被所有的請求使用。
這里不像Kafka 每個request和response 都帶著一個ApiKey
不會嚴格一一對應。
message CommandSuccess { required uint64 request_id = 1; optional Schema schema = 2; } message CommandError { required uint64 request_id = 1; required ServerError error = 2; required string message = 3; }
這2個都是空的,主要作用是用來維護tcp連接應用層的keepAlive
org.apache.pulsar.common.protocol.PulsarHandler#handleKeepAliveTimeout
// Commands to probe the state of connection. // When either client or broker doesn't receive commands for certain // amount of time, they will send a Ping probe. message CommandPing { } message CommandPong { }
到此,關于“Apache Pulsar二進制協(xié)議怎么實現(xiàn)”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。