今天就跟大家聊聊有關(guān)如何進(jìn)行flink中的kafka源碼分析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
最近一直在弄flink sql相關(guān)的東西,第一階段的目標(biāo)是從解決kafka的消費(fèi)和寫入的問(wèn)題。不過(guò)也有些同學(xué)并不是很了解,今天我們來(lái)詳細(xì)分析一下包的繼承層次。
flink源碼如下:
public class KafkaTableSourceFactory implements StreamTableSourceFactory<Row>{ private ConcurrentHashMap<String, KafkaTableSource> kafkaTableSources = new ConcurrentHashMap<>(); @Override public Map<String, String> requiredContext() { Map<String, String> context = new HashMap<>(); context.put(CONNECTOR_TYPE(), KafkaConnectorDescriptor.CONNECTOR_TYPE); context.put(CONNECTOR_PROPERTY_VERSION(),String.valueOf(KafkaConnectorDescriptor.CONNECTOR_PROPERTY_VERSION)); return context; } @Override public List<String> supportedProperties() { List<String> properties = new ArrayList<>(); properties.add(KafkaConnectorDescriptor.DATABASE_KEY); properties.add(KafkaConnectorDescriptor.TABLE_KEY); return properties; } @Override public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) { //避免頻繁的觸發(fā) 是否需要加緩存 KafkaTableSource kafkaTableSource; String dataBase = properties.get(KafkaConnectorDescriptor.DATABASE_KEY); String table = properties.get(KafkaConnectorDescriptor.TABLE_KEY); if (!kafkaTableSources.containsKey(dataBase + table)) { Kafka08UDMPBTableSource.Builder builder = new Kafka08UDMPBTableSource.Builder(); kafkaTableSource = builder .cluster(dataBase) .subject(table) .build(); kafkaTableSources.put(dataBase + table,kafkaTableSource); } else { kafkaTableSource = kafkaTableSources.get(dataBase + table); } return kafkaTableSource; } }
class Kafka08PBTableSource protected(topic: String, properties: Properties, schema: TableSchema, typeInformation: TypeInformation[Row], paramMap: util.LinkedHashMap[String, AnyRef], entryClass: String) extends KafkaTableSource(schema, topic, properties, new PBRowDeserializationSchema(typeInformation, paramMap,entryClass)) { override def createKafkaConsumer(topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row]): FlinkKafkaConsumerBase[Row] = { this.setStartupMode(StartupMode.EARLIEST) new FlinkKafkaConsumer08(topic, deserializationSchema, properties).setStartFromEarliest() } }
下面用戶自定義的kafka的sink類:
class Kafka08UDMPBTableSink (topic: String, properties: Properties, partitioner: Optional[FlinkKafkaPartitioner[Row]], paramMap: util.LinkedHashMap[String, AnyRef], serializationSchema: SerializationSchema[Row], fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]] ) extends KafkaTableSink(topic, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) { override def createKafkaProducer(topic: String, properties: Properties, serializationSchema: SerializationSchema[Row], partitioner: Optional[FlinkKafkaPartitioner[Row]]): SinkFunction[Row]={ new FlinkKafkaProducer08[Row](topic, serializationSchema, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) } override def createSerializationSchema(rowSchema: RowTypeInfo) = serializationSchema override def createCopy = new Kafka08UDMPBTableSink(topic, properties, this.partitioner, paramMap, serializationSchema, fieldNames, fieldTypes) override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): KafkaTableSink = { super.configure(this.fieldNames, this.fieldTypes) } override def getFieldNames: Array[String]=this.fieldNames /** Returns the types of the table fields. */ override def getFieldTypes: Array[TypeInformation[_]]=this.fieldTypes override def emitDataStream(dataStream: DataStream[Row]): Unit = { val kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner) dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass, fieldNames)) } }
public class TrackRowDeserializationSchema implements SerializationSchema<Row>, DeserializationSchema<Row> { private static final long serialVersionUID = -2885556750743978636L; /** Type information describing the input type. */ private TypeInformation<Row> typeInfo = null; private LinkedHashMap paraMap; private String inSchema; private String outSchema; private String inClass; private String outClass; }
public class TrackRowFormatFactory extends TableFormatFactoryBase<Row> implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> { public TrackRowFormatFactory() { super(TrackValidator.FORMAT_TYPE_VALUE, 1, false); } public TrackRowFormatFactory(String type, int version, boolean supportsSchemaDerivation) { super(type, version, supportsSchemaDerivation); } @Override protected List<String> supportedFormatProperties() { final List<String> properties = new ArrayList<>(); properties.add(TrackValidator.FORMAT_IN_SCHEMA); properties.add(TrackValidator.FORMAT_IN_CLASS); properties.add(TrackValidator.FORMAT_OUT_CLASS); properties.add(TrackValidator.FORMAT_OUT_SCHEMA); properties.add(TrackValidator.FORMAT_TYPE_INFORMATION); properties.add(TrackValidator.FORMAT_TYPE_VALUE); return properties; } }
看完上述內(nèi)容,你們對(duì)如何進(jìn)行flink中的kafka源碼分析有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。