溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么接入Apache Kafka服務(wù)器

發(fā)布時(shí)間:2021-12-24 17:12:15 來源:億速云 閱讀:184 作者:iii 欄目:互聯(lián)網(wǎng)科技

這篇文章主要講解了“怎么接入Apache Kafka服務(wù)器”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“怎么接入Apache Kafka服務(wù)器”吧!

交易撮合引擎(Matching/Trading Engine),顧名思義是用來撮合交易的軟件,廣泛地應(yīng)用在金融、證券、加密貨幣交易等領(lǐng)域。交易引擎負(fù)責(zé)管理加密資產(chǎn)市場中所有的開口訂單(Open Orders),并在發(fā)現(xiàn)匹配的訂單對(Trading Pair)時(shí)自動執(zhí)行交易。本文將首先介紹有關(guān)加密資產(chǎn)交易撮合引擎的基本概念,例如委托單、交易委托賬本等,然后使用Golang實(shí)現(xiàn)一個(gè)原理性的撮合引擎。

1、基本概念與術(shù)語

在開始打造交易撮合引擎之前,讓我們首先熟悉相關(guān)的基本概念與術(shù)語。

撮合/交易引擎

正如前面所述,交易撮合引擎是用來撮合交易的軟件,可以先把交易撮合引擎看作一個(gè)黑盒子,它有一些輸入和輸出。

例如,可能的輸入包括:

  • 創(chuàng)建新的委托單(NewOrder):一個(gè)新的委托單可以作為交易撮合引擎的輸入,引擎會嘗試將其與已有的 委托單進(jìn)行撮合。

  • 取消已有的委托單(CancelOrder):用戶也可以取消一個(gè)之前輸入的委托單,如果它還沒有執(zhí)行的話,即開口訂單。

當(dāng)然你可以定義其他的輸入,出于簡化考慮,我們現(xiàn)在只定義上述兩個(gè)輸入。

交易撮合引擎的輸出是一些事件,以便及時(shí)通知其他應(yīng)用處理。例如,當(dāng)引擎撮合了一筆交易后,就會觸發(fā)一個(gè)TradesGenerated事件;而當(dāng)取消了一個(gè)已有的委托單后,引擎就會觸發(fā)rderCancelled。同樣,你可以根據(jù)自己的需求來定義引擎的輸出,這里我們還是簡單點(diǎn),只定義這兩個(gè)輸出事件。

交易委托賬本

交易委托賬本(Order Book)就是一個(gè)買方委托單或買方委托單的列表,通常按照價(jià)格和時(shí)間排序。

當(dāng)一個(gè)新的買方(買方)委托單進(jìn)入引擎后,引擎就會將嘗試其與現(xiàn)有的賣方(買方)委托賬本 進(jìn)行匹配,看是否存在執(zhí)行交易的可能。如果找到了匹配的對手單,引擎就可以執(zhí)行這兩個(gè)委托單了,也就是撮合成功了。

委托單

在任何交易引擎中,都可能有多種類型的委托單供用戶選擇。其中常見的類型包括:

  • 限價(jià)委托單

限價(jià)委托單是在當(dāng)前的加密貨幣交易環(huán)境中最常用的委托類型。這種委托單允許用戶指定一個(gè)價(jià)格,只有當(dāng)撮合引擎找到同樣價(jià)格甚至更好價(jià)格的對手單時(shí)才執(zhí)行交易。

對于一個(gè)買方委托單而言,這意味著如果你的委托價(jià)格是¥100,那么該委托單將會在任何不高于¥100的價(jià)格成交 —— 買到指定的價(jià)格或者更便宜的價(jià)格;而對于一個(gè)賣方委托單而言,同樣的委托價(jià)格意味著該委托單將在任何不低于¥100的價(jià)格成交—— 賣出指定的價(jià)格或者更高的價(jià)格。

  • 市價(jià)委托單

市價(jià)委托單的撮合會完全忽略價(jià)格因素,而致力于有限完成指定數(shù)量的成交。市價(jià)委托單在交易委托賬本中有較高的優(yōu)先級,在流動性充足的市場中市價(jià)單可以保證成交。

例如,當(dāng)用戶委托購買2個(gè)以太幣時(shí),該委托單可以在¥900、¥1000、¥2000或任何其他價(jià)位成交,這依賴于市場中當(dāng)前的敞口委托單的情況。

  • 止損委托單

止損委托單盡在市場價(jià)格到達(dá)指定價(jià)位時(shí)才被激活,因此它的執(zhí)行方式與市價(jià)委托單相反。一旦止損委托單激活,它們可以自動轉(zhuǎn)化為市價(jià)委托單或限價(jià)委托單。

如果你希望打造一個(gè)高級的交易所,那么還有其他一些需要了解的概念,例如流動性、多/空交易、FIX/FAST協(xié)議等等,但是同樣出于簡化考慮,我們將這些內(nèi)容留給你自己去發(fā)現(xiàn)。

2、系統(tǒng)架構(gòu)

現(xiàn)在,對于交易撮合引擎的構(gòu)成我們已經(jīng)有了一些了解,那么讓我們看一下整個(gè)系統(tǒng)的架構(gòu),以及我們將要使用的技術(shù):

怎么接入Apache Kafka服務(wù)器 正如你上面看到的,我們的系統(tǒng)將包含引擎的多個(gè)客戶端,這些客戶端可以是交易所系統(tǒng)中的其他組件,例如接收終端用戶委托請求的App等等。

在客戶端和引擎之間的通信是使用Apache Kafka作為消息總線來實(shí)現(xiàn)的,每個(gè)交易對都對應(yīng)Kafka的一個(gè)主題,這樣我們可以確保當(dāng)消息隊(duì)列接收到用戶委托單時(shí),引擎將以同樣的先后順序處理委托單。這保證了即使引擎崩潰重啟我們也可以重建交易委托賬本。

引擎將監(jiān)聽Kafka主題,執(zhí)行委托賬本命令并將引擎的輸出事件發(fā)布到消息隊(duì)列中。當(dāng)然如果能夠監(jiān)測委托單的處理速度以及交易的執(zhí)行情況會更酷。我們可以使用Prometheus來采集性能指標(biāo),使用grafana來實(shí)現(xiàn)一個(gè)監(jiān)視儀表盤。

3、開發(fā)語言選擇

可以選擇你熟悉的開發(fā)語言,不過由于交易撮合引擎計(jì)算量巨大,通常我們應(yīng)當(dāng)選擇底層系列的語言,例如:C/C++、GoLang、Rust、Java等等。在這個(gè)教程中,我們使用Golang,因?yàn)樗芸?、容易理解、并發(fā)實(shí)現(xiàn)簡單,而且我也有好久沒有用C++了。

4、開發(fā)交易撮合引擎

我們將按照以下的步驟來開發(fā)交易撮合引擎:

  • 基礎(chǔ)類型定義

  • Consumer實(shí)現(xiàn)

  • Order Book實(shí)現(xiàn)

  • Producer實(shí)現(xiàn)

  • Monitoring實(shí)現(xiàn)

4.1 基礎(chǔ)類型定義

我們需要首先定義一些基礎(chǔ)類型,這包括Order、OrderBook和Trade,分別表示委托單、交易委托賬本和交易:

下面是engine/order.go文件的內(nèi)容:

package engine

import "encoding/json"

type Order struct {
	Amount uint64 `json:"amount"`
	Price  uint64 `json:"price"`
	ID     string `json:"id"`
	Side   int8   `json:"side"`
}

func (order *Order) FromJSON(msg []byte) error {
	return json.Unmarshal(msg, order)
}

func (order *Order) ToJSON() []byte {
	str, _ := json.Marshal(order)
	return str
}

這里我們就是簡單地創(chuàng)建了一個(gè)結(jié)構(gòu)用來記錄訂單的主要信息,然后添加了一個(gè)方法用于快速的JSON轉(zhuǎn)換。

類似地engine/trade.go文件的內(nèi)容:

package engine

import "encoding/json"

type Trade struct {
	TakerOrderID string `json:"taker_order_id"`
	MakerOrderID string `json:"maker_order_id"`
	Amount       uint64 `json:"amount"`
	Price        uint64 `json:"price"`
}

func (trade *Trade) FromJSON(msg []byte) error {
	return json.Unmarshal(msg, trade)
}

func (trade *Trade) ToJSON() []byte {
	str, _ := json.Marshal(trade)
	return str
}

現(xiàn)在我們已經(jīng)定義了基本的輸入和輸出類型,現(xiàn)在看看交易委托賬本engine/order_book.go文件的內(nèi)容:

package engine

// OrderBook type
type OrderBook struct {
	BuyOrders  []Order
	SellOrders []Order
}

// Add a buy order to the order book
func (book *OrderBook) addBuyOrder(order Order) {
	n := len(book.BuyOrders)
	var i int
	for i := n - 1; i >= 0; i-- {
		buyOrder := book.BuyOrders[i]
		if buyOrder.Price < order.Price {
			break
		}
	}
	if i == n-1 {
		book.BuyOrders = append(book.BuyOrders, order)
	} else {
		copy(book.BuyOrders[i+1:], book.BuyOrders[i:])
		book.BuyOrders[i] = order
	}
}

// Add a sell order to the order book
func (book *OrderBook) addSellOrder(order Order) {
	n := len(book.SellOrders)
	var i int
	for i := n - 1; i >= 0; i-- {
		sellOrder := book.SellOrders[i]
		if sellOrder.Price > order.Price {
			break
		}
	}
	if i == n-1 {
		book.SellOrders = append(book.SellOrders, order)
	} else {
		copy(book.SellOrders[i+1:], book.SellOrders[i:])
		book.SellOrders[i] = order
	}
}

// Remove a buy order from the order book at a given index
func (book *OrderBook) removeBuyOrder(index int) {
	book.BuyOrders = append(book.BuyOrders[:index], book.BuyOrders[index+1:]...)
}

// Remove a sell order from the order book at a given index
func (book *OrderBook) removeSellOrder(index int) {
	book.SellOrders = append(book.SellOrders[:index], book.SellOrders[index+1:]...)
}

在交易委托賬本中,除了創(chuàng)建保存買/賣方委托單的列表外,我們還需要定義添加新委托單的方法。

委托單列表應(yīng)當(dāng)根據(jù)其類型按升序或降序排列:賣方委托單是按降序排列的,這樣在列表中序號最大的委托單價(jià)格最低;買方委托單是按升序排列的,因此在其列表中最后的委托單價(jià)格最高。

由于絕大多數(shù)交易會在市場價(jià)格附近成交,我們可以輕松地從這些數(shù)組中插入或移除成員。

4.2 委托單處理

現(xiàn)在讓我們來處理委托單。

在下面的代碼中我們添加了一個(gè)命令來實(shí)現(xiàn)對限價(jià)委托單的處理。

文件engine/order_book_limit_order.go的內(nèi)容:

package engine

// Process an order and return the trades generated before adding the remaining amount to the market
func (book *OrderBook) Process(order Order) []Trade {
	if order.Side == 1 {
		return book.processLimitBuy(order)
	}
	return book.processLimitSell(order)
}

// Process a limit buy order
func (book *OrderBook) processLimitBuy(order Order) []Trade {
	trades := make([]Trade, 0, 1)
	n := len(book.SellOrders)
	// check if we have at least one matching order
	if n != 0 || book.SellOrders[n-1].Price <= order.Price {
		// traverse all orders that match
		for i := n - 1; i >= 0; i-- {
			sellOrder := book.SellOrders[i]
			if sellOrder.Price > order.Price {
				break
			}
			// fill the entire order
			if sellOrder.Amount >= order.Amount {
				trades = append(trades, Trade{order.ID, sellOrder.ID, order.Amount, sellOrder.Price})
				sellOrder.Amount -= order.Amount
				if sellOrder.Amount == 0 {
					book.removeSellOrder(i)
				}
				return trades
			}
			// fill a partial order and continue
			if sellOrder.Amount < order.Amount {
				trades = append(trades, Trade{order.ID, sellOrder.ID, sellOrder.Amount, sellOrder.Price})
				order.Amount -= sellOrder.Amount
				book.removeSellOrder(i)
				continue
			}
		}
	}
	// finally add the remaining order to the list
	book.addBuyOrder(order)
	return trades
}

// Process a limit sell order
func (book *OrderBook) processLimitSell(order Order) []Trade {
	trades := make([]Trade, 0, 1)
	n := len(book.BuyOrders)
	// check if we have at least one matching order
	if n != 0 || book.BuyOrders[n-1].Price >= order.Price {
		// traverse all orders that match
		for i := n - 1; i >= 0; i-- {
			buyOrder := book.BuyOrders[i]
			if buyOrder.Price < order.Price {
				break
			}
			// fill the entire order
			if buyOrder.Amount >= order.Amount {
				trades = append(trades, Trade{order.ID, buyOrder.ID, order.Amount, buyOrder.Price})
				buyOrder.Amount -= order.Amount
				if buyOrder.Amount == 0 {
					book.removeBuyOrder(i)
				}
				return trades
			}
			// fill a partial order and continue
			if buyOrder.Amount < order.Amount {
				trades = append(trades, Trade{order.ID, buyOrder.ID, buyOrder.Amount, buyOrder.Price})
				order.Amount -= buyOrder.Amount
				book.removeBuyOrder(i)
				continue
			}
		}
	}
	// finally add the remaining order to the list
	book.addSellOrder(order)
	return trades
}

看起來我們將一個(gè)方法變成了兩個(gè),分別處理買方委托單和賣方委托單。這兩個(gè)方法在每個(gè)方面 都很相似,除了處理的市場側(cè)不同。

算法非常簡單。我們將一個(gè)買方委托單與所有的賣方委托單進(jìn)行匹配,找出任何與買方委托價(jià)格 一致甚至更低的賣方委托單。當(dāng)這一條件不能滿足時(shí),或者該買方委托單完成后,我們返會撮合 的交易。

4.3 接入Kafka

現(xiàn)在就快完成我們的交易引擎了,還需要接入Apache Kafka服務(wù)器,然后開始監(jiān)聽委托單。

main.go文件的內(nèi)容:

package main

import (
	"engine/engine"
	"log"

	"github.com/Shopify/sarama"
	cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// create the consumer and listen for new order messages
	consumer := createConsumer()

	// create the producer of trade messages
	producer := createProducer()

	// create the order book
	book := engine.OrderBook{
		BuyOrders:  make([]engine.Order, 0, 100),
		SellOrders: make([]engine.Order, 0, 100),
	}

	// create a signal channel to know when we are done
	done := make(chan bool)

	// start processing orders
	go func() {
		for msg := range consumer.Messages() {
			var order engine.Order
			// decode the message
			order.FromJSON(msg.Value)
			// process the order
			trades := book.Process(order)
			// send trades to message queue
			for _, trade := range trades {
				rawTrade := trade.ToJSON()
				producer.Input() <- &sarama.ProducerMessage{
					Topic: "trades",
					Value: sarama.ByteEncoder(rawTrade),
				}
			}
			// mark the message as processed
			consumer.MarkOffset(msg, "")
		}
		done <- true
	}()

	// wait until we are done
	<-done
}

//
// Create the consumer
//

func createConsumer() *cluster.Consumer {
	// define our configuration to the cluster
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = false
	config.Group.Return.Notifications = false
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	// create the consumer
	consumer, err := cluster.NewConsumer([]string{"127.0.0.1:9092"}, "myconsumer", []string{"orders"}, config)
	if err != nil {
		log.Fatal("Unable to connect consumer to kafka cluster")
	}
	go handleErrors(consumer)
	go handleNotifications(consumer)
	return consumer
}

func handleErrors(consumer *cluster.Consumer) {
	for err := range consumer.Errors() {
		log.Printf("Error: %s\n", err.Error())
	}
}

func handleNotifications(consumer *cluster.Consumer) {
	for ntf := range consumer.Notifications() {
		log.Printf("Rebalanced: %+v\n", ntf)
	}
}

//
// Create the producer
//

func createProducer() sarama.AsyncProducer {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = false
	config.Producer.Return.Errors = true
	config.Producer.RequiredAcks = sarama.WaitForAll
	producer, err := sarama.NewAsyncProducer([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		log.Fatal("Unable to connect producer to kafka server")
	}
	return producer
}

利用Golang的Sarama Kafka客戶端開發(fā)庫,我們可以分別創(chuàng)建一個(gè)接入Kafka的消費(fèi)者和生產(chǎn)者。

消費(fèi)者將在指定的Kafka主題上等待新的委托單,然后進(jìn)行撮合處理。生成的交易接下來使用生產(chǎn)者發(fā)送到指定的交易主題。

Kafka消息采用字節(jié)數(shù)組編碼,因此我們需要將其解碼。反之,將交易傳入消息隊(duì)列時(shí),我們還需要進(jìn)行必要的編碼。

感謝各位的閱讀,以上就是“怎么接入Apache Kafka服務(wù)器”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對怎么接入Apache Kafka服務(wù)器這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI