溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

java分布式流式處理組件Producer分區(qū)的作用是什么

發(fā)布時間:2023-03-07 11:31:17 來源:億速云 閱讀:120 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要講解了“java分布式流式處理組件Producer分區(qū)的作用是什么”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“java分布式流式處理組件Producer分區(qū)的作用是什么”吧!

為什么需要分區(qū)

分區(qū)的作用

  • 合理的使用存儲資源:把海量的數(shù)據(jù)按照分區(qū)切割成一小塊的數(shù)據(jù)存儲在多臺Broker上。此時能夠保證每臺服務器存儲資源能夠被充分利用到。而且小塊數(shù)據(jù)在尋址時間上更有優(yōu)勢~

如果將全部的數(shù)據(jù)存儲在一臺機器上,那么要對當前數(shù)據(jù)做副本的時候,由于服務器資源配置不同,就有可能會出現(xiàn)副本數(shù)據(jù)存放失敗,從而增加數(shù)據(jù)丟失的可能性。

同時,如果單個文件過大,副本放置時間、內(nèi)容檢索時間都會極大的延長,從而導致Kafka性能降低。

  • 負載均衡: 數(shù)據(jù)生產(chǎn)或消費期間,生產(chǎn)者已分區(qū)的單位發(fā)送數(shù)據(jù),消費者分區(qū)的單位進行消費。 期間,各分區(qū)生產(chǎn)和消費數(shù)據(jù)互不影響,這樣能夠達到合理控制分區(qū)任務的程度,提高任務的并行度。從而達到負載均衡的效果。

剛才我們提到:生產(chǎn)者已分區(qū)為單位向Broker發(fā)送數(shù)據(jù)。那么問題來了:

  • 生產(chǎn)者是怎么知道該向哪個分區(qū)發(fā)送數(shù)據(jù)呢?

這就是我們接下來要研究的分區(qū)策略。

分區(qū)策略

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    // 如果在消息中指定了分區(qū)
    if (record.partition() != null)
        return record.partition();
    if (partitioner != null) {
        // 分區(qū)器通過計算得到分區(qū)
        int customPartition = partitioner.partition(
            record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
        if (customPartition < 0) {
            throw new IllegalArgumentException(String.format(
                "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
        }
        return customPartition;
    }
    // 通過序列化key計算分區(qū)
    if (serializedKey != null && !partitionerIgnoreKeys) {
        // hash the keyBytes to choose a partition
        return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
    } else {
        // 返回-1
        return RecordMetadata.UNKNOWN_PARTITION;
    }
}

下面的代碼可以說是整個分區(qū)器的核心部分,可以通過以下的步驟進行說明:

  • 如果在生產(chǎn)消息的時候,已經(jīng)指定了需要發(fā)送的分區(qū)位置,那么就會直接使用已經(jīng)指定的份具體的位置,這樣子還節(jié)省了也不計算的時間

  • 如果在生產(chǎn)者配置Properties中指定了分區(qū)策略類,那么消息生產(chǎn)就會通過已經(jīng)指定的分區(qū)策略類進行分區(qū)計算

  • 否則就會以serializedKey作為參數(shù),通過hash取模的方式計算。如果serializedKey == null,那么就會采用粘性分區(qū)的邏輯。 這在Kafka中屬于默認分區(qū)器。

  • 如果以上情況都沒有包含,那么他就會直接返回-1。相當于ack=0的情況。

在Kafka中分區(qū)策略我們是可以自定義的。當然Kafka也為我們內(nèi)置了三種分區(qū)策略類。 接下來我們挑個重點來介紹,來給我們自定義分區(qū)器做一個鋪墊~

java分布式流式處理組件Producer分區(qū)的作用是什么

我們已經(jīng)看到,DefaultPartitionerUniformStickyPartitioner已經(jīng)被標注為過期類,當然也并不妨礙我們來了解一下。

DefaultPartitioner

在當前版本中,如果沒有對partitioner.class進行配置,此時的分區(qū)策略就會采用當前類作為默認分區(qū)策略類。

而以下是DefaultPartitioner策略類的核心實現(xiàn)方式,并且標記部分的代碼實現(xiàn)其實就是UniformStickyPartitioner的計算邏輯

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
    if (keyBytes == null) {
        // 就是這段屬于UniformStickyPartitioner的實現(xiàn)邏輯
        return stickyPartitionCache.partition(topic, cluster);
    }
    return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
}

還有一段代碼讓我們來一起看看

public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}

這段代碼不管有多復雜,調(diào)用方法有多少,但最終我們是能夠發(fā)現(xiàn):

  • 它的本質(zhì)其實是在對序列化Key做哈希計算,然后通過hash值和分區(qū)數(shù)做取模運算,然后得到結(jié)果分區(qū)位置

這是一種比較重要的計算方式,但卻不是唯一的方式

java分布式流式處理組件Producer分區(qū)的作用是什么

---這是分割線---

接下來繼續(xù),我們看看如果無法對序列化Key計算,會是怎么樣的計算邏輯?

我們先開始來看一下,是在哪個地方得到的serializedKey,并且什么情況下serializedKey會是NULL

看看下面的這個代碼眼熟不?

// 生產(chǎn)者生產(chǎn)消息對象
ProducerRecord<String, String> record = new ProducerRecord<>(
        "newTopic001",
        "data from " + KafkaQuickProducer.class.getName()
);

java分布式流式處理組件Producer分區(qū)的作用是什么

// KafkaProducer#doSend()
// line994
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
public class StringSerializer implements Serializer<String> {
    // 省略。。。
    @Override
    public byte[] serialize(String topic, String data) {
        if (data == null) {
            return null;
        } else {
            return data.getBytes(encoding);
        }
    }
}

從上面的代碼來看,基本上能夠?qū)嶅N了:

  • 當在生成ProducerRecord對象的時候,如果沒有對消息設置key參數(shù),此時序列化之后的key就是個null

  • 那么當序列化之后的Key為NULL之后,此時分區(qū)計算邏輯就會改變。

此時相當于我們已經(jīng)進入到UniformStickyPartitioner的計算邏輯, 當然了在我們使用的3.3版本中當前類也已經(jīng)被標注為過期

根據(jù)前面的說法,粘性分區(qū)主要解決了消息無Key的分區(qū)計算邏輯,那么粘性分區(qū)并不是說每次都使用同一個分區(qū)

它是通過一個大的Batch為單位,盡量將batch內(nèi)的消息固定在同一個分區(qū)內(nèi),這樣在很大程度上能夠保證:

  • 防止消息無規(guī)律的分散在不同的分區(qū)內(nèi),降低分區(qū)傾斜

  • 同時不需要每次進行分區(qū)計算,也降低了Producer的延遲

而實現(xiàn)方式是采用ConcurrentMap來進行緩存,感興趣的大家可以看看StickyPartitionCache的源碼

而當Batch內(nèi)消息滿足發(fā)送條件被發(fā)送出去之后,才會開始再次計算下一個分區(qū),為此在KafkaProducer中還專門調(diào)用了新的方法

partitioner.onNewBatch(topic, cluster, prevPartition);
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}

java分布式流式處理組件Producer分區(qū)的作用是什么

RoundRobinPartitioner

這是在當前版本中唯一沒有被標注的類,未來說不定會成為默認分區(qū)策略類,我們不看,就瞄一眼

private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> new AtomicInteger(0));
    return counter.getAndIncrement();
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    int nextValue = nextValue(topic);
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (!availablePartitions.isEmpty()) {
        int part = Utils.toPositive(nextValue) % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        // no partitions are available, give a non-available partition
        return Utils.toPositive(nextValue) % numPartitions;
    }
}

這個類的解釋,嗯。。你們看那個合適吧~

java分布式流式處理組件Producer分區(qū)的作用是什么

其實這個邏輯非常簡單:

  • 通過AtomicInteger.getAndIncrement()的方式將每次寫入平均分配到不同的分區(qū)中

  • 不同與其他分區(qū)策略類,它不關(guān)心Key是否為NULL

我們先來做個小實驗吧: 將分區(qū)策略類修改為RoundRobinPartitioner,也方便后續(xù)自定義分區(qū)器的配置操作

config.setProperty(
        ProducerConfig.PARTITIONER_CLASS_CONFIG, 
        "org.apache.kafka.clients.producer.RoundRobinPartitioner"
);

就這樣就能實現(xiàn),看結(jié)果驗證~

java分布式流式處理組件Producer分區(qū)的作用是什么

中間穿插了一點小知識,那么接下來就會進入到我們最后一個環(huán)節(jié):嘗試自定義分區(qū)器

自定義分區(qū)器

前面我們也提到過,相信大家沒有忘記partitioner.class這個配置

那么接下來就進入到重頭戲:自定義分區(qū)器實戰(zhàn)編碼環(huán)節(jié)。

public class CustomPartitioner implements Partitioner {
    @Override
    public void configure(Map<String, ?> configs) {
        // nothing
    }
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 如果keyBytes == null
        // 直接去0號位置
        if (null == keyBytes) {
            return 0;
        }
        // 已默認分區(qū)策略實現(xiàn)
        int numPartitions = cluster.partitionsForTopic(topic).size();
        return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
    }
    @Override
    public void close() {
        // nothing
    }
}

我們就先做的簡單一點,主要是想讓大家明白自定義分區(qū)器的實現(xiàn):

  • 如果沒有給定指定key,那么就默認全部去0號分區(qū)

  • 否則就通過key做取模計算

當自定義分區(qū)器實現(xiàn)完成之后,接下來我們就需要通過發(fā)送者進行驗證。當然了,主要還是通過partitioner.class進行修改

// 給出關(guān)鍵代碼,其他的都是一樣的。就不贅述了~~~
config.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "top.zopx.kafka.partitioner.CustomPartitioner");

通過執(zhí)行之后,我們來看看它的運行效果是否滿足我們的預期

java分布式流式處理組件Producer分區(qū)的作用是什么

另一種運行結(jié)果與默認分區(qū)器有Key的情況類似,這里就不再重復貼圖

感謝各位的閱讀,以上就是“java分布式流式處理組件Producer分區(qū)的作用是什么”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對java分布式流式處理組件Producer分區(qū)的作用是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI