要拉取某段時間的數(shù)據(jù),可以使用Kafka的Consumer API來實現(xiàn)。以下是一些步驟和示例代碼可供參考:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic-name"));
// 設(shè)置要拉取數(shù)據(jù)的起始時間點
long timestamp = new Date().getTime() - 24 * 60 * 60 * 1000; // 24小時前的時間點
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
timestampsToSearch.put(new TopicPartition("topic-name", 0), timestamp);
// 從指定時間點開始拉取數(shù)據(jù)
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampsToSearch);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
if (entry.getValue() != null) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("offset = " + record.offset() + ", key = " + record.key() + ", value = " + record.value());
}
}
通過這些步驟,您可以使用Kafka Consumer API從指定時間點開始拉取數(shù)據(jù)并進行處理。請注意,在設(shè)置offset時,需要根據(jù)分區(qū)來設(shè)置,并且可能需要處理一些異常情況例如某些分區(qū)不存在或者指定時間點之前沒有數(shù)據(jù)等。