溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Kafka 分區(qū)分配計(jì)算(分區(qū)器 Partitions )

發(fā)布時(shí)間:2020-07-21 09:35:45 來源:網(wǎng)絡(luò) 閱讀:522 作者:Java_老男孩 欄目:編程語言

KafkaProducer在調(diào)用send方法發(fā)送消息至broker的過程中,首先是經(jīng)過攔截器Inteceptors處理,然后是經(jīng)過序列化Serializer處理,之后就到了Partitions階段,即分區(qū)分配計(jì)算階段。在某些應(yīng)用場(chǎng)景下,業(yè)務(wù)邏輯需要控制每條消息落到合適的分區(qū)中,有些情形下則只要根據(jù)默認(rèn)的分配規(guī)則即可。在KafkaProducer計(jì)算分配時(shí),首先根據(jù)的是ProducerRecord中的partition字段指定的序號(hào)計(jì)算分區(qū)。讀者有可能剛睡醒,看到這個(gè)ProducerRecord似曾相識(shí),沒有關(guān)系,先看段Kafka生產(chǎn)者的示例片段:

Producer<String,String> producer = new KafkaProducer<String,String>(properties);
String message = "kafka producer demo";
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);
try {
    producer.send(producerRecord).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

沒錯(cuò),ProducerRecord只是一個(gè)封裝了消息的對(duì)象而已,ProducerRecord一共有5個(gè)成員變量,即:

private final String topic;//所要發(fā)送的topic
private final Integer partition;//指定的partition序號(hào)
private final Headers headers;//一組鍵值對(duì),與RabbitMQ中的headers類似,kafka0.11.x版本才引入的一個(gè)屬性
private final K key;//消息的key
private final V value;//消息的value,即消息體
private final Long timestamp;//消息的時(shí)間戳,可以分為Create_Time和LogAppend_Time之分,這個(gè)以后的文章中再表。123456

在KafkaProducer的源碼(1.0.0)中,計(jì)算分區(qū)時(shí)調(diào)用的是下面的partition()方法:

/**
 * computes partition for given record.
 * if the record has partition returns the value otherwise
 * calls configured partitioner class to compute the partition.
 */
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

可以看出的確是先判斷有無指明ProducerRecord的partition字段,如果沒有指明,則再進(jìn)一步計(jì)算分區(qū)。上面這段代碼中的partitioner在默認(rèn)情況下是指Kafka默認(rèn)實(shí)現(xiàn)的org.apache.kafka.clients.producer.DefaultPartitioner,其partition()方法實(shí)現(xiàn)如下:

/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
        counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
        AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
        if (currentCounter != null) {
            counter = currentCounter;
        }
    }
    return counter.getAndIncrement();
}

由上源碼可以看出partition的計(jì)算方式:

  • 如果key為null,則按照一種輪詢的方式來計(jì)算分區(qū)分配
  • 如果key不為null則使用稱之為murmur的Hash算法(非加密型Hash函數(shù),具備高運(yùn)算性能及低碰撞率)來計(jì)算分區(qū)分配。

KafkaProducer中還支持自定義分區(qū)分配方式,與org.apache.kafka.clients.producer.internals.DefaultPartitioner一樣首先實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner接口,然后在KafkaProducer的配置中指定partitioner.class為對(duì)應(yīng)的自定義分區(qū)器(Partitioners)即可,即:

properties.put("partitioner.class","com.hidden.partitioner.DemoPartitioner");

自定義DemoPartitioner主要是實(shí)現(xiàn)Partitioner接口的public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)的方法。DemoPartitioner稍微修改了下DefaultPartitioner的計(jì)算方式,詳細(xì)參考如下:

public class DemoPartitioner implements Partitioner {

    private final AtomicInteger atomicInteger = new AtomicInteger(0);

    @Override
    public void configure(Map<String, ?> configs) {}

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (null == keyBytes || keyBytes.length<1) {
            return atomicInteger.getAndIncrement() % numPartitions;
        }
        //借用String的hashCode的計(jì)算方式
        int hash = 0;
        for (byte b : keyBytes) {
            hash = 31 * hash + b;
        }
        return hash % numPartitions;
    }

    @Override
    public void close() {}
}

這個(gè)自定義分區(qū)器的實(shí)現(xiàn)比較簡(jiǎn)單,讀者可以根據(jù)自身業(yè)務(wù)的需求來靈活實(shí)現(xiàn)分配分區(qū)的計(jì)算方式,比如:一般大型電商都有多個(gè)倉庫,可以將倉庫的名稱或者ID作為Key來靈活的記錄商品信息。


本文的重點(diǎn)是你有沒有收獲與成長(zhǎng),其余的都不重要,希望讀者們能謹(jǐn)記這一點(diǎn)。同時(shí)我經(jīng)過多年的收藏目前也算收集到了一套完整的學(xué)習(xí)資料,包括但不限于:分布式架構(gòu)、高可擴(kuò)展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個(gè)知識(shí)點(diǎn)高級(jí)進(jìn)階干貨,希望對(duì)想成為架構(gòu)師的朋友有一定的參考和幫助

需要更詳細(xì)思維導(dǎo)圖和以下資料的可以加一下技術(shù)交流分享群:“708 701 457”免費(fèi)獲取

Kafka 分區(qū)分配計(jì)算(分區(qū)器 Partitions )
Kafka 分區(qū)分配計(jì)算(分區(qū)器 Partitions )
Kafka 分區(qū)分配計(jì)算(分區(qū)器 Partitions )
Kafka 分區(qū)分配計(jì)算(分區(qū)器 Partitions )

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI