您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)python如何實現(xiàn)消費kafka數(shù)據(jù)批量插入到es,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
1、es的批量插入
這是為了方便后期配置的更改,把配置信息放在logging.conf中
用elasticsearch來實現(xiàn)批量操作,先安裝依賴包,sudo pip install Elasticsearch3
from elasticsearch import Elasticsearch class ImportEsData: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg") def __init__(self,hosts,index,type): self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000) self.index = index self.type = type def set_date(self,data): # 批量處理 # es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()}) self.es.index(index=self.index,doc_type=self.index,body=data)
2、使用pykafka消費kafka
1.因為kafka是0.8,pykafka不支持zk,只能用get_simple_consumer來實現(xiàn)
2.為了實現(xiàn)多個應(yīng)用同時消費而且不重消費,所以一個應(yīng)用消費一個partition
3. 為是確保消費數(shù)據(jù)量在不滿足10000這個批量值,能在一個時間范圍內(nèi)插入到es中,這里設(shè)置consumer_timeout_ms一個超時等待時間,退出等待消費阻塞。
4.退出等待消費阻塞后導(dǎo)致無法再消費數(shù)據(jù),因此在獲取self.consumer 的外層加入了while True 一個死循環(huán)
#!/usr/bin/python # -*- coding: UTF-8 -*- from pykafka import KafkaClient import logging import logging.config from ConfigUtil import ConfigUtil import datetime class KafkaPython: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg") logger_data = logging.getLogger("data") def __init__(self): self.server = ConfigUtil().get("kafka","kafka_server") self.topic = ConfigUtil().get("kafka","topic") self.group = ConfigUtil().get("kafka","group") self.partition_id = int(ConfigUtil().get("kafka","partition")) self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms")) self.consumer = None self.hosts = ConfigUtil().get("es","hosts") self.index_name = ConfigUtil().get("es","index_name") self.type_name = ConfigUtil().get("es","type_name") def getConnect(self): client = KafkaClient(self.server) topic = client.topics[self.topic] p = topic.partitions ps={p.get(self.partition_id)} self.consumer = topic.get_simple_consumer( consumer_group=self.group, auto_commit_enable=True, consumer_timeout_ms=self.consumer_timeout_ms, # num_consumer_fetchers=1, # consumer_id='test1', partitions=ps ) self.starttime = datetime.datetime.now() def beginConsumer(self): print("beginConsumer kafka-python") imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name) #創(chuàng)建ACTIONS count = 0 ACTIONS = [] while True: endtime = datetime.datetime.now() print (endtime - self.starttime).seconds for message in self.consumer: if message is not None: try: count = count + 1 # print(str(message.partition.id)+","+str(message.offset)+","+str(count)) # self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count)) action = { "_index": self.index_name, "_type": self.type_name, "_source": message.value } ACTIONS.append(action) if len(ACTIONS) >= 10000: imprtEsData.set_date(ACTIONS) ACTIONS = [] self.consumer.commit_offsets() endtime = datetime.datetime.now() print (endtime - self.starttime).seconds #break except (Exception) as e: # self.consumer.commit_offsets() print(e) self.logger.error(e) self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n") # self.logger_data.error(message.value+"\n") # self.consumer.commit_offsets() if len(ACTIONS) > 0: self.logger.info("等待時間超過,consumer_timeout_ms,把集合數(shù)據(jù)插入es") imprtEsData.set_date(ACTIONS) ACTIONS = [] self.consumer.commit_offsets() def disConnect(self): self.consumer.close() from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk class ImportEsData: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg") def __init__(self,hosts,index,type): self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000) self.index = index self.type = type def set_date(self,data): # 批量處理 success = bulk(self.es, data, index=self.index, raise_on_error=True) self.logger.info(success)
3、運行
if __name__ == '__main__': kp = KafkaPython() kp.getConnect() kp.beginConsumer() # kp.disConnect()
注:簡單的寫了一個從kafka中讀取數(shù)據(jù)到一個list里,當(dāng)數(shù)據(jù)達(dá)到一個閾值時,在批量插入到 es的插件
現(xiàn)在還在批量的壓測中。。。
關(guān)于“python如何實現(xiàn)消費kafka數(shù)據(jù)批量插入到es”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。