您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“怎么用C語言與java實現(xiàn)kafka avro生產(chǎn)者和消費者”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
原始數(shù)據(jù)格式
請求IP 應(yīng)答IP 域名 類型
3183375114 3729673322 "mx.hc.spdrb.com" A
以上數(shù)據(jù)是test文件的內(nèi)容
schema定義如下
{
"type":"record",
"name":"data",
"fields":
[
{"name":"qip","type":"long"},
{"name":"aip","type":"long"},
{"name":"domain","type":"string"},
{"name":"type","type":"string"}
]
}
C語言生產(chǎn)者代碼如下
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include "avro.h" #include "producer.h" const char PERSON_SCHEMA[] = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}"; const char *file = "avro_file.dat"; const char *brokers = "xxxxx:9092"; const char *topic = "topic1"; void print_avro_value(avro_value_t *value) { char *json; if (!avro_value_to_json(value, 1, &json)) { printf("%s\n", json); free(json); } } if (avro_schema_from_json(PERSON_SCHEMA, sizeof(PERSON_SCHEMA), &test_schema, &error)) { fprintf(stderr, "schema error\n"); exit(EXIT_FAILURE); } return test_schema; avro_schema_t init_schema() { avro_schema_t test_schema; avro_schema_error_t error; } void add_data(avro_writer_t writer, avro_schema_t schema, int64_t qip, uint64_t aip, const char* domain, const char* type) { avro_datum_t data = avro_record(schema); avro_datum_t dqip = avro_int64(qip); avro_datum_t daip = avro_int64(aip); avro_datum_t ddomain = avro_string(domain); avro_datum_t dtype = avro_string(type); avro_record_set(data, "qip", dqip); avro_record_set(data, "aip", daip); avro_record_set(data, "domain", ddomain); avro_record_set(data, "type", dtype); avro_write_data(writer, NULL, f2c); avro_datum_decref(dqip); avro_datum_decref(daip); avro_datum_decref(ddomain); avro_datum_decref(dtype); avro_datum_decref(data); } int main(int argc, char* argv[]) { int len = 0; avro_schema_t schema; avro_writer_t mem_writer; char buf[1024]; char tmp[4][500]={{0x00}}; FILE *fp = fopen("test","r"); if(!fp) { printf("open test file error!\n"); return -1; } schema = init_schema(); mem_writer = avro_writer_memory(buf, 1024); while(fgets(buf, 1024,fp)!=NULL) { if(buf[strlen(buf)] == '\n') buf[strlen(buf)] = '\0'; if(sscanf(buf, "%s%s%s%s", tmp[0],tmp[1],tmp[2],tmp[3])!=4) continue; add_data(mem_writer,schema,atol(tmp[0]),atol(tmp[1]),tmp[2],tmp[3]); printf("data len = %ld\n", avro_writer_tell(mem_writer)); len = avro_writer_tell(mem_writer); kafka_putdata(buf, len,brokers,topic);//librdkafka實現(xiàn)的生產(chǎn)者代碼 未列出 memset(tmp, 0x00, sizeof(tmp)); memset(buf, 0x00, sizeof(buf)); avro_writer_reset(mem_writer); } fclose(fp); avro_writer_free(mem_writer); return 0; }
C語言實現(xiàn)的消費者如下
#include "consumer.h" #include "avro.h" #include <time.h> #include <unistd.h> const char *brokers = "xxxx:9092"; const char *topic = "topic1"; const char *group = "avrotest"; const char PERSON_SCHEMA[] = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}"; avro_schema_t init_schema() { avro_schema_t test_schema; avro_schema_error_t error; if (avro_schema_from_json(PERSON_SCHEMA, sizeof(PERSON_SCHEMA), &test_schema, &error)) { fprintf(stderr, "schema error\n"); exit(EXIT_FAILURE); } return test_schema; } void print_data(avro_reader_t reader, avro_schema_t schema) { avro_datum_t data; if(avro_read_data(reader, schema, schema, &data) == 0) { int64_t qip; int64_t aip; char *domain; char *type; avro_datum_t q_datum,a_datum,d_datum,t_datum; avro_record_get(data, "qip", &q_datum); avro_int64_get(q_datum, &qip); avro_record_get(data, "aip", &a_datum); avro_int64_get(a_datum, &aip); avro_record_get(data, "domain", &d_datum); avro_string_get(d_datum, &domain); avro_record_get(data, "type", &t_datum); avro_string_get(t_datum, &type); printf("qip: %lld, aip: %lld,domain: %s,type:%s\n", qip,aip,domain,type); avro_datum_decref(data); } int main(int argc, char* argv[]) { rd_kafka_t *rk; rd_kafka_topic_partition_list_t *topics; if(initKafka(&rk, brokers, group, topic, &topics)<0){return -1;} char buf[1024] = {0x00}; int len = 0; avro_schema_t schema; avro_reader_t mem_reader; schema = init_schema(); mem_reader = avro_reader_memory(buf, 1024); while(1) { get_consumer_msg(rk, buf, &len); //librdkafka實現(xiàn)的消費者 代碼未列出 if(len == 0) continue; printf("len=%d\n",len); print_data(mem_reader,schema); avro_reader_reset(mem_reader); memset(buf, 0x00, sizeof(buf)); } return 0; }
C編譯的Makefile如下 兩個C程序通用
TARGET=avro-test INCLUDE=./avrolib/include/ SLIB=./avrolib/lib/libavro.a DLIB=-lz -llzma -lrdkafka INC = -I. -I./avrolib/include SOURCES =$(wildcard *.c) OBJECTS =$(SOURCES:.c=.o) RM=rm -rf CC=gcc -g CFLAGS= -Wall $(INC) all:$(TARGET) $(TARGET): $(OBJECTS) $(CC) -o $@ $? $(SLIB) $(DLIB) $(CFLAGS) :$(SOURCES) $(CC) -c clean: $(RM) $(TARGET) $(OBJECTS) *~
java消費者 gradle配置
dependencies { testCompile group: 'junit', name: 'junit', version: '4.12' compile group: 'org.apache.avro', name: 'avro', version: '1.9.1' compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0' }
avro解析 借鑒別人 言作者未知 請作者見諒
package zc; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.specific.SpecificDatumReader; import java.io.IOException; public class MyRecordDecoder { public static GenericRecord genericRecord; datumReader; static MyRecordDecoder myRecordDecoder = new MyRecordDecoder(); final String USER_SCHEMA = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}"; public MyRecordDecoder() { Schema schema = null; schema = new Schema.Parser().parse(USER_SCHEMA); datumReader = new SpecificDatumReader<GenericRecord>(schema); } public GenericRecord getGenericRecord(BinaryDecoder decoder, byte[] value) throws IOException{ return datumReader.read(null, decoder); } public static MyRecordDecoder getInstance() { if (myRecordDecoder==null) myRecordDecoder = new MyRecordDecoder(); return myRecordDecoder; } }
java消費者 package zc;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; public class KafkaMessageAvro{ public static void main(String[] args) throws Exception { String inTopic = args[0]; Properties props = new Properties(); props.setProperty("bootstrap.servers", "xxxxx:9092"); props.setProperty("group.id", "flink-topn-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(inTopic)); try { while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(1000); for (ConsumerRecord<String, byte[]> record : records) { byte[] ss = record.value(); if (ss==null) { continue; } System.out.println(ss.toString()); GenericRecord genericRecord = null; BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(ss, null); while (!decoder.isEnd()) { genericRecord = MyRecordDecoder.getInstance().getGenericRecord(decoder, ss); System.out.println(genericRecord.get("qip").toString()+" "+genericRecord.get("aip").toString()+" "+genericRecord.get("domain").toString()+" "+genericRecord.get("type").toString()); } } } } finally { consumer.close(); } }
“怎么用C語言與java實現(xiàn)kafka avro生產(chǎn)者和消費者”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。