您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“ksqlDB的基本使用方法”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
ksqlDB是事件流數(shù)據(jù)庫,是一種特殊的數(shù)據(jù)庫,基于Kafka的實(shí)時數(shù)據(jù)流處理引擎,提供了強(qiáng)大且易用的SQL交互方式來對Kafka數(shù)據(jù)流進(jìn)行處理,而無需編寫代碼。KSQL具備高擴(kuò)展、高彈性、容錯式等優(yōu)良特性,并且它提供了大范圍的流式處理操作,比如數(shù)據(jù)過濾、轉(zhuǎn)化、聚合、連接join、窗口化和 Sessionization (即捕獲單一會話期間的所有的流事件)等。
KSQL命令行界面(CLI)以交互方式編寫KSQL查詢。 KSQL CLI充當(dāng)KSQL Server的客戶端。
ksqlDB旨在通過使用較低級別的流處理器來提高抽象度。通常,一個事件稱為“行”,就像它是關(guān)系數(shù)據(jù)庫中的一行一樣。
流代表是一系列歷史數(shù)據(jù)的分區(qū)的,不可變的,僅可以追加的集合。 一旦將一行插入流中,就無法更改??梢栽诹鞯哪┪蔡砑有滦校怯肋h(yuǎn)不能更新或者刪除現(xiàn)有的行。 每一行數(shù)據(jù)存儲在特定的分區(qū)中,每行隱式或顯式地?fù)碛幸粋€代表其身份的鍵,具有相同鍵的所有行都位于同一分區(qū)中。
表是可變的、分區(qū)的集合,它的內(nèi)容會隨時間而變化。 流表示事件的歷史序列,與之相反,表表示目前的真實(shí)情況。表通過利用每一行的鍵來工作。如果一個行序列共享一個鍵,那么給定鍵的最后一行表示該鍵標(biāo)識的最新信息,后臺進(jìn)程定期運(yùn)行并刪除除最新行以外的所有行。
假設(shè)用戶Alice和Bob剛開始分別有200美元和100美元,經(jīng)過了以下一系列交易:
Alice轉(zhuǎn)給Bob 100美元。
Bob轉(zhuǎn)給Alice 50美元。
Bob轉(zhuǎn)給Alice 100美元。
在例子中Stream表示資金從一個賬號轉(zhuǎn)移到另一個賬號的歷史記錄,Table反映了每個用戶賬號的最新狀態(tài)。因此我們得出結(jié)論:Table將具有賬戶的當(dāng)前狀態(tài),而Stream將捕獲交易記錄。
Stream可以看作是Table的變更日志,因?yàn)殡S著時間的推移更新Stream的聚合會產(chǎn)生一個表。 可以將某個Table在某個時間點(diǎn)視為Stream中每個鍵的最新值的快照(流的數(shù)據(jù)記錄是鍵值對),觀察Table隨時間的變化會產(chǎn)生一個Stream。
創(chuàng)建docker-compose.yaml文件,包含ksqlDB Server和ksqlDB Cli:
--- version: '2' services: ksqldb-server: image: confluentinc/ksqldb-server:0.15.0 hostname: ksqldb-server container_name: ksqldb-server ports: - "8088:8088" environment: KSQL_LISTENERS: http://0.0.0.0:8088 KSQL_BOOTSTRAP_SERVERS: 192.168.1.87:9092 #要連接的kafka集群的地址 KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" ksqldb-cli: image: confluentinc/ksqldb-cli:0.15.0 container_name: ksqldb-cli depends_on: - ksqldb-server entrypoint: /bin/sh tty: true
通過docker-compose up -d
命令啟動,然后用下面命令連接ksql:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. =========================================== = _ _ ____ ____ = = | | _____ __ _| | _ \| __ ) = = | |/ / __|/ _` | | | | | _ \ = = | <\__ \ (_| | | |_| | |_) | = = |_|\_\___/\__, |_|____/|____/ = = |_| = = Event Streaming Database purpose-built = = for stream processing apps = =========================================== Copyright 2017-2020 Confluent Inc. CLI v0.15.0, Server v0.15.0 located at http://ksqldb-server:8088 Server Status: RUNNING Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>
package tuling.kafkaDemo; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class MsgProducer { private final static String TOPIC_NAME = "cr7-topic"; public static void main(String[] args) throws InterruptedException, ExecutionException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092"); // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.87:9092,192.168.1.88:9092,192.168.1.89:9092"); // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "11.8.36.125:9092,11.8.38.116:9092,11.8.38.120:9092"); /* 發(fā)出消息持久化機(jī)制參數(shù) (1)acks=0: 表示producer不需要等待任何broker確認(rèn)收到消息的回復(fù),就可以繼續(xù)發(fā)送下一條消息。性能最高,但是最容易丟消息。 (2)acks=1: 至少要等待leader已經(jīng)成功將數(shù)據(jù)寫入本地log,但是不需要等待所有follower是否成功寫入。就可以繼續(xù)發(fā)送下一 條消息。這種情況下,如果follower沒有成功備份數(shù)據(jù),而此時leader又掛掉,則消息會丟失。 (3)acks=-1或all: 需要等待 min.insync.replicas(默認(rèn)為1,推薦配置大于等于2) 這個參數(shù)配置的副本個數(shù)都成功寫入日志,這種策略 會保證只要有一個備份存活就不會丟失數(shù)據(jù)。這是最強(qiáng)的數(shù)據(jù)保證。一般除非是金融級別,或跟錢打交道的場景才會使用這種配置。 */ props.put(ProducerConfig.ACKS_CONFIG, "1"); /* 發(fā)送失敗會重試,默認(rèn)重試間隔100ms,重試能保證消息發(fā)送的可靠性,但是也可能造成消息重復(fù)發(fā)送,比如網(wǎng)絡(luò)抖動,所以需要在 接收者那邊做好消息接收的冪等性處理 */ props.put(ProducerConfig.RETRIES_CONFIG, 3); //重試間隔設(shè)置 props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); //設(shè)置發(fā)送消息的本地緩沖區(qū),如果設(shè)置了該緩沖區(qū),消息會先發(fā)送到本地緩沖區(qū),可以提高消息發(fā)送性能,默認(rèn)值是33554432,即32MB props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); /* kafka本地線程會從緩沖區(qū)取數(shù)據(jù),批量發(fā)送到broker, 設(shè)置批量發(fā)送消息的大小,默認(rèn)值是16384,即16kb,就是說一個batch滿了16kb就發(fā)送出去 */ props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); /* 默認(rèn)值是0,意思就是消息必須立即被發(fā)送,但這樣會影響性能 一般設(shè)置10毫秒左右,就是說這個消息發(fā)送完后會進(jìn)入本地的一個batch,如果10毫秒內(nèi),這個batch滿了16kb就會隨batch一起被發(fā)送出去 如果10毫秒內(nèi),batch沒滿,那么也必須把消息發(fā)送出去,不能讓消息的發(fā)送延遲時間太長 */ props.put(ProducerConfig.LINGER_MS_CONFIG, 10); //把發(fā)送消息的key從字符串序列化為字節(jié)數(shù)組 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //把發(fā)送消息value從字符串序列化為字節(jié)數(shù)組 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //創(chuàng)建Kafka消費(fèi)者實(shí)例 Producer<String, String> producer = new KafkaProducer<String, String>(props); int msgNum = 50; final CountDownLatch countDownLatch = new CountDownLatch(msgNum); for (int i = 1; i <= msgNum; i++) { Order order = new Order(i, 100 + i, 1, 1000.00); //指定發(fā)送分區(qū) /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME , 0, order.getOrderId().toString(), JSON.toJSONString(order));*/ //未指定發(fā)送分區(qū),具體發(fā)送的分區(qū)計算公式:hash(key)%partitionNum ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME , order.getOrderId().toString(), JSON.toJSONString(order)); //等待消息發(fā)送成功的同步阻塞方法 // RecordMetadata metadata = producer.send(producerRecord).get(); // System.out.println("同步方式發(fā)送消息結(jié)果:" + "topic-" + metadata.topic() + "|partition-" // + metadata.partition() + "|offset-" + metadata.offset()); //異步回調(diào)方式發(fā)送消息 producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("發(fā)送消息失?。?quot; + exception.getStackTrace()); } if (metadata != null) { System.out.println("異步方式發(fā)送消息結(jié)果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); } // CountDownLatch能夠使一個線程在等待另外一些線程完成各自工作之后,再繼續(xù)執(zhí)行。使用一個計數(shù)器進(jìn)行實(shí)現(xiàn)。計數(shù)器初始值為線程的數(shù)量。 // 當(dāng)每一個線程完成自己任務(wù)后,計數(shù)器的值就會減一。 countDownLatch.countDown(); } }); //異步應(yīng)用場景送積分TODO } countDownLatch.await(5, TimeUnit.SECONDS); //當(dāng)計數(shù)器的值為0時,表示所有的線程都已經(jīng)完成一些任務(wù),然后在CountDownLatch上等待的線程就可以恢復(fù)執(zhí)行接下來的任務(wù)。 producer.close(); //所有生產(chǎn)者線程完成任務(wù)后,主線程關(guān)閉和kafka broker的連接 } }
Producer會以如下Json格式向Kafka Broker發(fā)送數(shù)據(jù):
生產(chǎn)者會以如下Json格式 {"orderAmount":1000,"orderId":2,"productId":102,"productNum":1}
ksql> PRINT 'cr7-topic' FROM BEGINNING limit 5; Key format: JSON or KAFKA_STRING Value format: JSON or KAFKA_STRING rowtime: 2021/02/27 16:11:46.239 Z, key: 2, value: {"orderAmount":1000,"orderId":2,"productId":102,"productNum":1}, partition: 2 rowtime: 2021/02/27 16:11:46.239 Z, key: 3, value: {"orderAmount":1000,"orderId":3,"productId":103,"productNum":1}, partition: 2 rowtime: 2021/02/27 16:11:46.240 Z, key: 9, value: {"orderAmount":1000,"orderId":9,"productId":109,"productNum":1}, partition: 2 rowtime: 2021/02/27 16:11:46.241 Z, key: 16, value: {"orderAmount":1000,"orderId":16,"productId":116,"productNum":1}, partition: 2 rowtime: 2021/02/27 16:11:46.241 Z, key: 29, value: {"orderAmount":1000,"orderId":29,"productId":129,"productNum":1}, partition: 2
基于名為cr7-topic的topic創(chuàng)建一個Stream,注意Stream的名字不能有-
:
ksql> CREATE STREAM cr7_topic_stream ( orderAmount INTEGER, orderId INTEGER, productId INTEGER, productNum INTEGER) WITH (kafka_topic='cr7-topic',value_format='json'); Message ---------------- Stream created ----------------
ksql> list streams; Stream Name | Kafka Topic | Key Format | Value Format | Windowed ------------------------------------------------------------------------------------------ CR7_TOPIC_STREAM | cr7-topic | KAFKA | JSON | false
運(yùn)行Producer程序,可以看到會持續(xù)輸出數(shù)據(jù):
ksql> select * from CR7_TOPIC_STREAM EMIT CHANGES; +---------------------------+---------------------------+---------------------------+---------------------------+ |ORDERAMOUNT |ORDERID |PRODUCTID |PRODUCTNUM | +---------------------------+---------------------------+---------------------------+---------------------------+ |1000 |4 |104 |1 | |1000 |6 |106 |1 | |1000 |10 |110 |1 | |1000 |12 |112 |1 | |1000 |13 |113 |1 | |1000 |14 |114 |1 | |1000 |18 |118 |1 | |1000 |19 |119 |1 | |1000 |20 |120 |1 | |1000 |24 |124 |1 | |1000 |26 |126 |1 | |1000 |31 |131 |1 | |1000 |35 |135 |1 | |1000 |38 |138 |1 | |1000 |39 |139 |1 | |1000 |42 |142 |1 | |1000 |46 |146 |1 | |1000 |1 |101 |1 | |1000 |5 |105 |1 | |1000 |7 |107 |1 | |1000 |8 |108 |1 | |1000 |11 |111 |1 | |1000 |15 |115 |1 | |1000 |17 |117 |1 | |1000 |21 |121 |1 | |1000 |22 |122 |1 | |1000 |23 |123 |1 | |1000 |25 |125 |1 | |1000 |2 |102 |1 | |1000 |3 |103 |1 |
將Stream cr7_topic_stream中orderid為單數(shù)的數(shù)據(jù)寫入新的Stream s3中:
ksql> CREATE STREAM s3 AS SELECT * FROM cr7_topic_stream WHERE (orderid%2) != 0 EMIT CHANGES;
查看Stream s3,可以看到只有orderid為單數(shù)的數(shù)據(jù):
ksql> select * from s3 emit changes; +---------------------------+---------------------------+---------------------------+---------------------------+ |ORDERAMOUNT |ORDERID |PRODUCTID |PRODUCTNUM | +---------------------------+---------------------------+---------------------------+---------------------------+ |1000 |1 |101 |1 | |1000 |5 |105 |1 | |1000 |7 |107 |1 | |1000 |11 |111 |1 | |1000 |15 |115 |1 | |1000 |17 |117 |1 | |1000 |21 |121 |1 | |1000 |23 |123 |1 | |1000 |25 |125 |1 | |1000 |27 |127 |1 | |1000 |33 |133 |1 | |1000 |37 |137 |1 | |1000 |43 |143 |1 | |1000 |45 |145 |1 | |1000 |47 |147 |1 | |1000 |13 |113 |1 | |1000 |19 |119 |1 | |1000 |31 |131 |1 | |1000 |35 |135 |1 | |1000 |39 |139 |1 | |1000 |3 |103 |1 |
查詢Stream cr7_topic_stream中的條目總數(shù)和orderamount的總和,并以productnum作為分組:
ksql> SELECT COUNT(*),SUM(orderamount) from cr7_topic_stream GROUP BY productnum EMIT CHANGES; +---------------------------------------------------------+---------------------------------------------------------+ |KSQL_COL_0 |KSQL_COL_1 | +---------------------------------------------------------+---------------------------------------------------------+ |50 |50000
ksql> INSERT INTO cr7_topic_stream (orderId,productNum) values (777,7777);
ksql> describe cr7_topic_stream; Name : CR7_TOPIC_STREAM Field | Type ----------------------- ORDERAMOUNT | INTEGER ORDERID | INTEGER PRODUCTID | INTEGER PRODUCTNUM | INTEGER ----------------------- For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
將上EXTENDED
參數(shù)查看詳細(xì)信息:
ksql> describe extended cr7_topic_stream; Name : CR7_TOPIC_STREAM Type : STREAM Timestamp field : Not set - using <ROWTIME> Key format : KAFKA Value format : JSON Kafka topic : cr7-topic (partitions: 3, replication: 3) Statement : CREATE STREAM CR7_TOPIC_STREAM (ORDERAMOUNT INTEGER, ORDERID INTEGER, PRODUCTID INTEGER, PRODUCTNUM INTEGER) WITH (KAFKA_TOPIC='cr7-topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON'); Field | Type ----------------------- ORDERAMOUNT | INTEGER ORDERID | INTEGER PRODUCTID | INTEGER PRODUCTNUM | INTEGER ----------------------- Sources that have a DROP constraint on this source -------------------------------------------------- S3 Local runtime statistics ------------------------ (Statistics of the local KSQL server interaction with the Kafka topic cr7-topic)
DROP STREAM cr7_topic_stream;
必須要含有主鍵,主鍵是Kafka生產(chǎn)者生產(chǎn)消息時指定的key。
ksql> CREATE TABLE cr7_topic_table ( orderAmount INTEGER, orderId INTEGER, productId INTEGER, productNum INTEGER, kafkaProducerKey VARCHAR PRIMARY KEY ) WITH (kafka_topic='cr7-topic',value_format='json');
kafka腳本生產(chǎn)消息指定key的方法:
#以逗號作為key和value的分隔符。 kafka-console-producer.sh --broker-list kafka1:9092 --topic cr7-topic --property parse.key=true --property key.separator=, >mykey,{"orderAmount":1000,"orderId":1,"productId":101,"productNum":1}
ksql> describe cr7_topic_table; Name : CR7_TOPIC_TABLE Field | Type ---------------------------------------------- ORDERAMOUNT | INTEGER ORDERID | INTEGER (primary key) PRODUCTID | INTEGER PRODUCTNUM | INTEGER ---------------------------------------------- For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>; ksql>
ksql> select * from cr7_topic_table emit changes; +---------------------+---------------------+---------------------+---------------------+---------------------+ |KAFKAPRODUCERKEY |ORDERAMOUNT |ORDERID |PRODUCTID |PRODUCTNUM | +---------------------+---------------------+---------------------+---------------------+---------------------+ |1 |1000 |1 |101 |1 | |2 |1000 |2 |102 |2 | |3 |1000 |3 |103 |3 | ...... #當(dāng)生產(chǎn)者重新生產(chǎn)數(shù)據(jù),把Java代碼中 #Order order = new Order(i, 100 + i, 1, 1000.00); 修改為 #Order order = new Order(i, 100 + i, 1, 2000.00); #在key值一樣的情況下,查cr7_topic_table會是最新的值 |2 |2000 |2 |102 |2 | |3 |2000 |3 |103 |3 | |1 |2000 |1 |101 |1 | ......
“ksqlDB的基本使用方法”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。