溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Kafka筆記整理(三):消費(fèi)形式驗(yàn)證與性能測試

發(fā)布時(shí)間:2020-06-30 12:46:27 來源:網(wǎng)絡(luò) 閱讀:23391 作者:xpleaf 欄目:大數(shù)據(jù)

[TOC]


Kafka消費(fèi)形式驗(yàn)證

前面的《Kafka筆記整理(一)》中有提到消費(fèi)者的消費(fèi)形式,說明如下:

1、每個(gè)consumer屬于一個(gè)consumer group,可以指定組id。group.id

2、消費(fèi)形式:
   組內(nèi):組內(nèi)的消費(fèi)者消費(fèi)同一份數(shù)據(jù);同時(shí)只能有一個(gè)consumer消費(fèi)一個(gè)Topic中的1個(gè)partition;
   一個(gè)consumer可以消費(fèi)多個(gè)partitions中的消息。所以,對(duì)于一個(gè)topic,同一個(gè)group中推薦不能有多于
   partitions個(gè)數(shù)的consumer同時(shí)消費(fèi),否則將意味著某些consumer將無法得到消息。
   組間:每個(gè)消費(fèi)組消費(fèi)相同的數(shù)據(jù),互不影響。

3、在一個(gè)consumer多個(gè)線程的情況下,一個(gè)線程相當(dāng)于一個(gè)消費(fèi)者。
   例如:partition為3,一個(gè)consumer起了3個(gè)線程消費(fèi),另一個(gè)后來的consumer就無法消費(fèi)。

下面就來驗(yàn)證Kafka的消費(fèi)形式,不過需要說明的是,在消費(fèi)者的程序代碼中,可以指定消費(fèi)者的group.id(我們下面將會(huì)在配置文件中指定)。

而在使用kafka的shell命令時(shí),其實(shí)也是可以指定配置文件來指定消費(fèi)者的group.id的,如果不指定,那么kafka將會(huì)隨機(jī)生成一個(gè)group.id(kafka-console-consumer.sh中的kafka.tools.ConsoleConsumer類,如果沒有指定group.id,其策略是隨機(jī)生成)。

在后面的程序代碼中,會(huì)使用同一group.id開啟4個(gè)消費(fèi)的線程(因?yàn)槲覀儎?chuàng)建的topic有3個(gè)partition),然后在終端中通過kafka shell來開啟另外一個(gè)消費(fèi)者,進(jìn)而達(dá)到驗(yàn)證kafka消費(fèi)形式的目的。

另外,在測試中使用的topic如下:

$ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
Topic:hadoop    PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: hadoop   Partition: 0    Leader: 103     Replicas: 103,101,102   Isr: 103,101,102
        Topic: hadoop   Partition: 1    Leader: 101     Replicas: 101,102,103   Isr: 101,102,103
        Topic: hadoop   Partition: 2    Leader: 102     Replicas: 102,103,101   Isr: 102,103,101

即partition為3,副本因?yàn)橐矠?.

程序代碼

KafkaProducerOps.java
package com.uplooking.bigdata.kafka.producer;

import com.uplooking.bigdata.kafka.constants.Constants;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;

/**
 * 通過這個(gè)KafkaProducerOps向Kafka topic中生產(chǎn)相關(guān)的數(shù)據(jù)
 * <p>
 * Producer
 */
public class KafkaProducerOps {
    public static void main(String[] args) throws IOException {
        /**
         * 專門加載配置文件
         * 配置文件的格式:
         * key=value
         *
         * 在代碼中要盡量減少硬編碼
         *  不要將代碼寫死,要可配置化
         */
        Properties properties = new Properties();
        InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
        properties.load(in);
        /**
         * 兩個(gè)泛型參數(shù)
         * 第一個(gè)泛型參數(shù):指的就是kafka中一條記錄key的類型
         * 第二個(gè)泛型參數(shù):指的就是kafka中一條記錄value的類型
         */
        String[] girls = new String[]{"姚慧瑩", "劉向前", "周  新", "楊柳"};
        Producer<String, String> producer = new KafkaProducer<String, String>(properties);
        Random random = new Random();
        int start = 1;
        for (int i = start; i <= start + 20; i++) {
            String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
            String key = i + "";
            String value = "今天的<--" + girls[random.nextInt(girls.length)] + "-->很美很美哦~";
            ProducerRecord<String, String> producerRecord =
                    new ProducerRecord<String, String>(topic, key, value);
            producer.send(producerRecord);
        }
        producer.close();
    }
}
KafkaConsumerOps.java
package com.uplooking.bigdata.kafka.consumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.*;

/**
 * 從kafka topic中消費(fèi)數(shù)據(jù)
 */
public class KafkaConsumerOps {
    public static void main(String[] args) throws IOException {
        //線程池
        ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
        System.out.println("外部開始時(shí)間:" + System.currentTimeMillis());
        for (int i =0; i < 4; i++){
            ScheduledFuture<?> schedule = service.schedule(
                    new ConsumerThread(),
                    5L,
                    TimeUnit.SECONDS);
        }
    }
}

class ConsumerThread implements Runnable {

    public void run() {
        System.out.println("線程ID:" + Thread.currentThread().getId() + "線程開始時(shí)間:" + System.currentTimeMillis());
        /**
         * 兩個(gè)泛型參數(shù)
         * 第一個(gè)泛型參數(shù):指的就是kafka中一條記錄key的類型
         * 第二個(gè)泛型參數(shù):指的就是kafka中一條記錄value的類型
         */
        Properties properties = new Properties();
        try {
            properties.load(KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        Collection<String> topics = Arrays.asList("hadoop");
        //消費(fèi)者訂閱topic
        consumer.subscribe(topics);
        ConsumerRecords<String, String> consumerRecords = null;
        while (true) {
            //接下來就要從topic中拉取數(shù)據(jù)
            consumerRecords = consumer.poll(1000);
            //遍歷每一條記錄
            for (ConsumerRecord consumerRecord : consumerRecords) {
                long offset = consumerRecord.offset();
                Object key = consumerRecord.key();
                Object value = consumerRecord.value();
                int partition = consumerRecord.partition();
                System.out.println("CurrentThreadID: " + Thread.currentThread().getId() + "\toffset: " + offset + "\tpartition: " + partition + "\tkey: " + key + "\tvalue: " + value);
            }
        }
    }
}
MyKafkaPartitioner.java
package com.uplooking.bigdata.kafka.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
import java.util.Random;

/**
 * 創(chuàng)建自定義的分區(qū),根據(jù)數(shù)據(jù)的key來進(jìn)行劃分
 * <p>
 * 可以根據(jù)key或者value的hashCode
 * 還可以根據(jù)自己業(yè)務(wù)上的定義將數(shù)據(jù)分散在不同的分區(qū)中
 * 需求:
 * 根據(jù)用戶輸入的key的hashCode值和partition個(gè)數(shù)求模
 */
public class MyKafkaPartitioner implements Partitioner {

    public void configure(Map<String, ?> configs) {

    }

    /**
     * 根據(jù)給定的數(shù)據(jù)設(shè)置相關(guān)的分區(qū)
     *
     * @param topic      主題名稱
     * @param key        key
     * @param keyBytes   序列化之后的key
     * @param value      value
     * @param valueBytes 序列化之后的value
     * @param cluster    當(dāng)前集群的元數(shù)據(jù)信息
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        Integer partitionNums = cluster.partitionCountForTopic(topic);
        int targetPartition = -1;
        if (key == null || keyBytes == null) {
            targetPartition = new Random().nextInt(10000) % partitionNums;
        } else {
            int hashCode = key.hashCode();
            targetPartition = hashCode % partitionNums;
            System.out.println("key: " + key + ", value: " + value + ", hashCode: " + hashCode + ", partition: " + targetPartition);
        }
        return targetPartition;
    }

    public void close() {
    }
}
Constants.java
package com.uplooking.bigdata.kafka.constants;

public interface Constants {
    /**
     * 生產(chǎn)的key對(duì)應(yīng)的常量
     */
    String KAFKA_PRODUCER_TOPIC = "producer.topic";
}
producer.properties
############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092

# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none

# name of the partitioner class for partitioning events; default partition spreads data randomly
partitioner.class=com.uplooking.bigdata.kafka.partitioner.MyKafkaPartitioner

# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=

# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=

# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=

# the maximum size of a request in bytes
#max.request.size=

# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=

# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=

#####設(shè)置自定義的topic
producer.topic=hadoop

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
consumer.properties
# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect= uplooking01:2181,uplooking02:2181,uplooking03:2181

bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#consumer group id
group.id=test-consumer-group

#consumer timeout
#consumer.timeout.ms=5000

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
pom.xml

主要是kafka-clients的依賴:

<dependencies>
  <!--kafka-->
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.0.1</version>
  </dependency>
</dependencies>

測試

先在終端啟動(dòng)一個(gè)消費(fèi)者,注意由于沒有指定配置文件,所以其group.id是隨機(jī)生成的:

$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181

接下來分別執(zhí)行消費(fèi)者的代碼和生產(chǎn)者的代碼,然后觀察各個(gè)終端的輸出。

生產(chǎn)者程序的終端輸出如下:

key: 1, value: 今天的<--劉向前-->很美很美哦~, hashCode: 49, partition: 1
key: 2, value: 今天的<--劉向前-->很美很美哦~, hashCode: 50, partition: 2
key: 3, value: 今天的<--劉向前-->很美很美哦~, hashCode: 51, partition: 0
key: 4, value: 今天的<--楊柳-->很美很美哦~, hashCode: 52, partition: 1
key: 5, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 53, partition: 2
key: 6, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 54, partition: 0
key: 7, value: 今天的<--楊柳-->很美很美哦~, hashCode: 55, partition: 1
key: 8, value: 今天的<--劉向前-->很美很美哦~, hashCode: 56, partition: 2
key: 9, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 57, partition: 0
key: 10, value: 今天的<--楊柳-->很美很美哦~, hashCode: 1567, partition: 1
key: 11, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 1568, partition: 2
key: 12, value: 今天的<--周  新-->很美很美哦~, hashCode: 1569, partition: 0
key: 13, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 1570, partition: 1
key: 14, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 1571, partition: 2
key: 15, value: 今天的<--劉向前-->很美很美哦~, hashCode: 1572, partition: 0
key: 16, value: 今天的<--劉向前-->很美很美哦~, hashCode: 1573, partition: 1
key: 17, value: 今天的<--楊柳-->很美很美哦~, hashCode: 1574, partition: 2
key: 18, value: 今天的<--劉向前-->很美很美哦~, hashCode: 1575, partition: 0
key: 19, value: 今天的<--楊柳-->很美很美哦~, hashCode: 1576, partition: 1
key: 20, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 1598, partition: 2
key: 21, value: 今天的<--楊柳-->很美很美哦~, hashCode: 1599, partition: 0

消費(fèi)者程序的終端輸出如下:

外部開始時(shí)間:1521991118178
線程ID:20線程開始時(shí)間:1521991123182
線程ID:21線程開始時(shí)間:1521991123182
線程ID:23線程開始時(shí)間:1521991123182
線程ID:22線程開始時(shí)間:1521991123182
CurrentThreadID: 22 offset: 78  partition: 1    key: 1  value: 今天的<--劉向前-->很美很美哦~
CurrentThreadID: 22 offset: 79  partition: 1    key: 4  value: 今天的<--楊柳-->很美很美哦~
CurrentThreadID: 22 offset: 80  partition: 1    key: 7  value: 今天的<--楊柳-->很美很美哦~
CurrentThreadID: 22 offset: 81  partition: 1    key: 10 value: 今天的<--楊柳-->很美很美哦~
CurrentThreadID: 22 offset: 82  partition: 1    key: 13 value: 今天的<--姚慧瑩-->很美很美哦~
CurrentThreadID: 23 offset: 81  partition: 0    key: 3  value: 今天的<--劉向前-->很美很美哦~
CurrentThreadID: 23 offset: 82  partition: 0    key: 6  value: 今天的<--姚慧瑩-->很美很美哦~
CurrentThreadID: 23 offset: 83  partition: 0    key: 9  value: 今天的<--姚慧瑩-->很美很美哦~
CurrentThreadID: 23 offset: 84  partition: 0    key: 12 value: 今天的<--周  新-->很美很美哦~
CurrentThreadID: 23 offset: 85  partition: 0    key: 15 value: 今天的<--劉向前-->很美很美哦~
CurrentThreadID: 23 offset: 86  partition: 0    key: 18 value: 今天的<--劉向前-->很美很美哦~
CurrentThreadID: 22 offset: 83  partition: 1    key: 16 value: 今天的<--劉向前-->很美很美哦~
CurrentThreadID: 23 offset: 87  partition: 0    key: 21 value: 今天的<--楊柳-->很美很美哦~
CurrentThreadID: 21 offset: 78  partition: 2    key: 2  value: 今天的<--劉向前-->很美很美哦~
CurrentThreadID: 22 offset: 84  partition: 1    key: 19 value: 今天的<--楊柳-->很美很美哦~
CurrentThreadID: 21 offset: 79  partition: 2    key: 5  value: 今天的<--姚慧瑩-->很美很美哦~
CurrentThreadID: 21 offset: 80  partition: 2    key: 8  value: 今天的<--劉向前-->很美很美哦~
CurrentThreadID: 21 offset: 81  partition: 2    key: 11 value: 今天的<--姚慧瑩-->很美很美哦~
CurrentThreadID: 21 offset: 82  partition: 2    key: 14 value: 今天的<--姚慧瑩-->很美很美哦~
CurrentThreadID: 21 offset: 83  partition: 2    key: 17 value: 今天的<--楊柳-->很美很美哦~
CurrentThreadID: 21 offset: 84  partition: 2    key: 20 value: 今天的<--姚慧瑩-->很美很美哦~

消費(fèi)者shell的終端輸出如下:

$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
今天的<--劉向前-->很美很美哦~
今天的<--姚慧瑩-->很美很美哦~
今天的<--劉向前-->很美很美哦~
今天的<--姚慧瑩-->很美很美哦~
今天的<--姚慧瑩-->很美很美哦~
今天的<--楊柳-->很美很美哦~
今天的<--姚慧瑩-->很美很美哦~
今天的<--劉向前-->很美很美哦~
今天的<--姚慧瑩-->很美很美哦~
今天的<--姚慧瑩-->很美很美哦~
今天的<--周  新-->很美很美哦~
今天的<--劉向前-->很美很美哦~
今天的<--劉向前-->很美很美哦~
今天的<--楊柳-->很美很美哦~
今天的<--劉向前-->很美很美哦~
今天的<--楊柳-->很美很美哦~
今天的<--楊柳-->很美很美哦~
今天的<--楊柳-->很美很美哦~
今天的<--姚慧瑩-->很美很美哦~
今天的<--劉向前-->很美很美哦~
今天的<--楊柳-->很美很美哦~

分析

因?yàn)槭褂胟afka shell的消費(fèi)者的group.id是隨機(jī)生成的,所以其肯定可以消費(fèi)到topic下partition的消息,這是屬于組間的消費(fèi)。

而由于在消費(fèi)者的程序代碼中,4個(gè)線程都是使用同一個(gè)group.id的(都是使用consumer.properties這個(gè)配置文件),按照理論知識(shí)的理解,因?yàn)閠opic hadoop只有3個(gè)partition,所以只能有3個(gè)線程即3個(gè)consumer進(jìn)行消息的消費(fèi),而觀察輸出,通過線程ID,發(fā)現(xiàn)確實(shí)只有三個(gè)線程消費(fèi)了topic中的消息,這也驗(yàn)證了kafka組內(nèi)消息的消費(fèi)形式。

Kafka性能測試

參考文檔:https://cwiki.apache.org/confluence/display/KAFKA/Performance+testing

生產(chǎn)能力測試

在kafka的安裝目錄的bin里有性能的評(píng)估工具bin/kafka-producer-perf-test.sh,主要輸出4項(xiàng)指標(biāo),總共發(fā)送消息量(以MB為單位),每秒發(fā)送消息量(MB/second),發(fā)送消息總數(shù),每秒發(fā)送消息數(shù)(records/second)。

測試如下:

[uplooking@uplooking01 ~]$ kafka-producer-perf-test.sh --topic flume-kafka --num-records 1000000 --producer-props bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092 --throughput 10000 --record-size 100
49972 records sent, 9994.4 records/sec (0.95 MB/sec), 3.1 ms avg latency, 258.0 max latency.
50200 records sent, 10040.0 records/sec (0.96 MB/sec), 2.4 ms avg latency, 141.0 max latency.
50020 records sent, 10004.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 19.0 max latency.
50010 records sent, 10000.0 records/sec (0.95 MB/sec), 2.3 ms avg latency, 127.0 max latency.
50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.3 ms avg latency, 24.0 max latency.
50020 records sent, 10004.0 records/sec (0.95 MB/sec), 2.4 ms avg latency, 186.0 max latency.
50010 records sent, 10002.0 records/sec (0.95 MB/sec), 15.1 ms avg latency, 466.0 max latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 11.1 ms avg latency, 405.0 max latency.
50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 19.0 max latency.
50030 records sent, 10004.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 20.0 max latency.
50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 30.0 max latency.
50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.3 ms avg latency, 19.0 max latency.
49990 records sent, 9998.0 records/sec (0.95 MB/sec), 1.4 ms avg latency, 49.0 max latency.
50033 records sent, 10006.6 records/sec (0.95 MB/sec), 37.9 ms avg latency, 617.0 max latency.
50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.5 ms avg latency, 74.0 max latency.
50007 records sent, 10001.4 records/sec (0.95 MB/sec), 1.3 ms avg latency, 19.0 max latency.
50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.8 ms avg latency, 132.0 max latency.
50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 15.0 max latency.
50020 records sent, 10000.0 records/sec (0.95 MB/sec), 1.9 ms avg latency, 121.0 max latency.
1000000 records sent, 9999.200064 records/sec (0.95 MB/sec), 4.96 ms avg latency, 617.00 ms max latency, 1 ms 50th, 3 ms 95th, 105 ms 99th, 541 ms 99.9th.

參數(shù)說明如下:

--num-records 1000000   總共生產(chǎn)的消息數(shù)量
--throughput 10000      每秒需要生產(chǎn)的消息數(shù)量
--record-size 100       每條消息的大小,單位為字節(jié)

消費(fèi)能力測試

[uplooking@uplooking01 ~]$ kafka-consumer-perf-test.sh --topic flume-kafka --messages 1000000 --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092 --threads 3 --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2018-03-26 05:17:21:185, 2018-03-26 05:17:22:458, 97.3055, 76.4380, 1020325, 801512.1760

上面的測試為需要消費(fèi)一百萬條消息,輸出的參數(shù)說明如下:

開始時(shí)間     結(jié)束時(shí)間     消費(fèi)消息總大小   每秒消費(fèi)大小    消費(fèi)消息總條數(shù)    每秒消費(fèi)條數(shù)
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2018-03-26 05:17:21:185, 2018-03-26 05:17:22:458, 97.3055, 76.4380, 1020325, 801512.1760
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI