溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Kafka2.7是如何重設(shè)消費者組位移

發(fā)布時間:2021-10-12 09:10:38 來源:億速云 閱讀:199 作者:iii 欄目:編程語言

本篇內(nèi)容介紹了“Kafka2.7是如何重設(shè)消費者組位移”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

重設(shè)消費

Earliest

首先看看重置位移前的消費進(jìn)度

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --describe

Kafka2.7是如何重設(shè)消費者組位移
根據(jù)進(jìn)度截圖,能看到所有分區(qū)的Lag均為0,說明消息已經(jīng)被消費完,現(xiàn)在根據(jù)Earliest策略重置消費進(jìn)度,要求重置后所有的消息均可重新消費。

腳本命令方式

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic --to-earliest --execute

此時再度查看消費進(jìn)度,可以看到 Kafka2.7是如何重設(shè)消費者組位移 此時消費者可以重新消費這些消息。

腳本命令方式(指定分區(qū))

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic:1,2 --to-earliest --execute 

Java API方式

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

final String topic = "mytopic";
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList(topic));
    consumer.poll(0);
    Collection<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
            .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
            .collect(Collectors.toList());
    consumer.seekToBeginning(partitions);
    consumer.partitionsFor(topic).forEach(i -> consumer.position(new TopicPartition(topic, i.partition())));
}

需要特殊說明的是,seekToBeginning、seekToEnd等方法執(zhí)行完需要執(zhí)行position才會立刻生效
Kafka2.7是如何重設(shè)消費者組位移

Java API方式(指定分區(qū))

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

final String topic = "mytopic";
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList(topic));
    consumer.poll(0);
    List<TopicPartition> partitions = new ArrayList<TopicPartition>();
    partitions.add(new TopicPartition(topic, 1));
    partitions.add(new TopicPartition(topic, 2));
    consumer.seekToBeginning(partitions);
    consumer.position(new TopicPartition(topic, 1));
    consumer.position(new TopicPartition(topic, 2));
}
 

Latest

首先看看重置位移前的消費進(jìn)度。
Kafka2.7是如何重設(shè)消費者組位移
根據(jù)上圖可以看到,kafka當(dāng)前沒有任何消息被消費,現(xiàn)在根據(jù)Latest策略重置消費進(jìn)度,要求重置后原消息不再消費。

腳本命令

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic --to-latest --execute

重置后
Kafka2.7是如何重設(shè)消費者組位移

腳本命令(指定分區(qū))

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic:1,2 --to-latest --execute 

Java API

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");

final String topic = "mytopic";
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList(topic));
    consumer.poll(0);
    consumer.seekToEnd(consumer.partitionsFor(topic).stream()
            .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
            .collect(Collectors.toList()));
    consumer.partitionsFor(topic).forEach(i -> consumer.position(new TopicPartition(topic, i.partition())));
}
 

Java API(指定分區(qū))

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");

final String topic = "mytopic";
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList(topic));
    consumer.poll(0);
    List<TopicPartition> partitions = new ArrayList<TopicPartition>();
    partitions.add(new TopicPartition(topic, 1));
    partitions.add(new TopicPartition(topic, 2));
    consumer.seekToEnd(partitions); 
    consumer.position(new TopicPartition(topic, 1));
    consumer.position(new TopicPartition(topic, 2));
}
 

Current

此方法暫時聯(lián)想不到相應(yīng)的應(yīng)用場景,粗略跳過,待以后了解后再補充。

腳本命令

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic --to-current --execute

腳本命令(指定分區(qū))

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic:1,2 --to-current --execute 

Java API

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");

final String topic = "mytopic";
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList(topic));
    consumer.poll(0);
    consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).forEach(tp -> {
        long committedOffset = consumer.committed(tp).offset();
        consumer.seek(tp, committedOffset);
    });
}
 

Java API(指定分區(qū))

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");

final String topic = "mytopic";
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList(topic));
    consumer.poll(0);
    TopicPartition tp1 = new TopicPartition(topic, 1);
    TopicPartition tp2 = new TopicPartition(topic, 2);
    consumer.seek(tp1, consumer.committed(tp1).offset());
    consumer.seek(tp2, consumer.committed(tp2).offset());
}

Specified-Offset

重置前
Kafka2.7是如何重設(shè)消費者組位移

腳本命令

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic --to-offset 5 --execute
Kafka2.7是如何重設(shè)消費者組位移

腳本命令(指定分區(qū))

通常來說,各個分區(qū)的提交位移往往是不同的,所以將所有分區(qū)的位移設(shè)置成同一個值并不顯示,需要指定分區(qū)。

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic:2 --to-offset 11 --execute 

Java API

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");

final String topic = "mytopic";
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList(topic));
    consumer.poll(0);
    consumer.partitionsFor(topic).stream().forEach(pi -> {
        TopicPartition tp = new TopicPartition(topic, pi.partition());
        consumer.seek(tp, 5L);
    });
}
 

Java API(指定分區(qū))

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");

final String topic = "mytopic";
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList(topic));
    consumer.poll(0);
    consumer.seek(new TopicPartition(topic, 2), 10L);
}
 

Shift-By-N

重置前
Kafka2.7是如何重設(shè)消費者組位移

腳本命令

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic --shift-by -1 --execute
Kafka2.7是如何重設(shè)消費者組位移

腳本命令(指定分區(qū))

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic:2 --shift-by -2 --execute 

Java API

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");

final String topic = "mytopic";
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList(topic));
    consumer.poll(0);
    for (PartitionInfo info : consumer.partitionsFor(topic)) {
        TopicPartition tp = new TopicPartition(topic, info.partition());
        consumer.seek(tp, consumer.committed(tp).offset() - 1L);
    }
}
 

Java API(指定分區(qū))

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");

final String topic = "mytopic";
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList(topic));
    consumer.poll(0);
    TopicPartition tp = new TopicPartition(topic, 2);
    consumer.seek(tp, consumer.committed(tp).offset() + 2L);
}
 

DateTime

有時按照時間點來重置位移是個不錯的方式,重置前:
Kafka2.7是如何重設(shè)消費者組位移

腳本命令

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic --to-datetime 2021-05-09T00:00:00.000 --execute
Kafka2.7是如何重設(shè)消費者組位移

腳本命令(指定分區(qū))

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic:2 --to-datetime 2020-05-09T00:00:00.000 --execute

Java API

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");

final String topic = "mytopic";
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList(topic));
    consumer.poll(0);
    long ts = new Date().getTime() - 24 * 60 * 60 * 1000;
    Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream()
            .map(pi -> new TopicPartition(topic, pi.partition()))
            .collect(Collectors.toMap(Function.identity(), tp -> ts));
    for (Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {
        consumer.seek(entry.getKey(), entry.getValue() == null ? consumer.committed(entry.getKey()).offset() : entry.getValue().offset());
    }
}
 

Java API(指定分區(qū))

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group");

final String topic = "mytopic";
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList(topic));
    consumer.poll(0);
    long ts = new Date().getTime() -  365 * 24 * 60 * 60 * 1000;
    Map<TopicPartition, Long> timeToSearch = new HashMap<TopicPartition, Long>(){{
        put(new TopicPartition(topic, 2), ts);
    }};
    for (Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {
        consumer.seek(entry.getKey(), entry.getValue() == null ? consumer.committed(entry.getKey()).offset() : entry.getValue().offset());
    }
}
 

Duration

重置前
Kafka2.7是如何重設(shè)消費者組位移

腳本命令

首先需要了解Java Duration的格式PnDTnHnMnS,這里不做詳細(xì)展開。

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic --by-duration P1DT0H0M0S --execute
Kafka2.7是如何重設(shè)消費者組位移

腳本命令(指定分區(qū))

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.108:9092 --group mytopic-consumer-group --reset-offsets --topic mytopic:2 --by-duration P1DT0H0M0S --execute

Java API方式

DateTime

“Kafka2.7是如何重設(shè)消費者組位移”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI