您好,登錄后才能下訂單哦!
本篇文章為大家展示了Java中怎么利用pulsar-flink-connector讀取pulsar catalog元數(shù)據(jù),內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。
通過(guò) pulsar-flink-connector 讀取到 Apache pulsar 中的namespaces、topics的元數(shù)據(jù)信息。
pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink
<dependency> <groupId>io.streamnative.connectors</groupId> <artifactId>pulsar-flink-connector-2.11-1.12</artifactId> <version>2.7.3</version> </dependency> <!-- JAR repositories --> <repositories> <repository> <id>central</id> <layout>default</layout> <url>https://repo1.maven.org/maven2</url> </repository> <repository> <id>bintray-streamnative-maven</id> <name>bintray</name> <url>https://dl.bintray.com/streamnative/maven</url> </repository> </repositories>
使用PulsarMetadataReader獲取元數(shù)據(jù)
package com.levi.demo; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Test. * * @author levi * @version 1.0 **/ public class Test { public static void main(String[] args) { final ClientConfigurationData configurationData = new ClientConfigurationData(); configurationData.setServiceUrl("pulsar://127.0.0.1:6650"); //Your Pulsar Token final AuthenticationToken token = new AuthenticationToken( "eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx"); configurationData.setAuthentication(token); try (final PulsarMetadataReader reader = new PulsarMetadataReader("http://127.0.0.1:8443", configurationData, "", new HashMap(), -1, -1)) { //獲取namespaces final List<String> namespaces = reader.listNamespaces(); System.out.println("namespaces: " + namespaces.toString()); for (final String namespace : namespaces) { //獲取Topics final List<String> topics = reader.getTopics(namespace); System.out.println("topic: " + topics.toString()); for (String topic : topics) { //獲取字段SchemaInfo final SchemaInfo schemaInfo = reader.getPulsarSchema(topic); final String name = schemaInfo.getName(); System.out.println("SchemaName:" + name); //topicName final SchemaType type = schemaInfo.getType(); System.out.println("SchemaType:" + type.toString());// "JSON"... final Map<String, String> properties = schemaInfo.getProperties(); System.out.println(properties); final String schemaDefinition = schemaInfo.getSchemaDefinition(); System.out.println(schemaDefinition); // Field info. } } } catch (IOException | PulsarAdminException e) { e.printStackTrace(); } } }
上述內(nèi)容就是Java中怎么利用pulsar-flink-connector讀取pulsar catalog元數(shù)據(jù),你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。