溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何使用Kafka保存紐約時報并進(jìn)行推送

發(fā)布時間:2021-12-15 09:49:12 來源:億速云 閱讀:123 作者:柒染 欄目:大數(shù)據(jù)

如何使用Kafka保存紐約時報并進(jìn)行推送,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

紐約時報有很多內(nèi)容生成系統(tǒng),我們使用第三方數(shù)據(jù)來編寫故事。另外,我們有161年的新聞行業(yè)積累和21年的在線內(nèi)容發(fā)布經(jīng)驗,所以大量的在線內(nèi)容需要被搜索到,并提供給不同的服務(wù)和應(yīng)用使用。

另一方面,有很多服務(wù)和應(yīng)用需要訪問到這些內(nèi)容——搜索引擎、個性化定制服務(wù)、新聞種子生成器,以及其他各種前端應(yīng)用,如網(wǎng)站和移動應(yīng)用。一旦有新內(nèi)容發(fā)布,就要在很短的時間內(nèi)讓這些服務(wù)訪問到,而且不能有數(shù)據(jù)丟失——畢竟這些內(nèi)容都是有價值的新聞。

我們將詳細(xì)介紹我們是如何基于Apache Kafka解決上述問題的。我們把這個系統(tǒng)叫做發(fā)布管道(Publishing Pipeline)。主要關(guān)注后端的系統(tǒng),我們會介紹如何使用Kafka保存紐約時報的文章,以及如何使用Kafka和Steams API將發(fā)布的內(nèi)容實時推送給各種應(yīng)用。下面是總體的架構(gòu)圖,具體細(xì)節(jié)稍后詳述。

如何使用Kafka保存紐約時報并進(jìn)行推送

基于API解決方案的不足之處

訪問已發(fā)布內(nèi)容的后端系統(tǒng)有著各種不同的需求。

  • 我們有一個服務(wù)專門為網(wǎng)站和移動應(yīng)用提供實時內(nèi)容,所以在內(nèi)容發(fā)布之后,它需要立即訪問到這些內(nèi)容。

  • 我們還有一些服務(wù)用于提供內(nèi)容清單。有些清單是手動編輯的,有些則是通過查詢獲得的。對于通過查詢獲得的清單來說,一旦有符合查詢條件的內(nèi)容發(fā)布,就需要被包含在清單里。而如果已發(fā)布的內(nèi)容經(jīng)過修改后不再符合查詢條件,就要從清單里移除。我們還要支持對查詢條件本身進(jìn)行修改,比如創(chuàng)建新的清單,而要新建清單就需要訪問之前發(fā)布的內(nèi)容。

  • 我們的Elasticsearch集群為網(wǎng)站提供了搜索服務(wù)。我們對延遲沒有很高的要求,比如在內(nèi)容發(fā)布后的一兩分鐘內(nèi)搜索不到新內(nèi)容不算是個大問題。不過,搜索引擎仍然要訪問之前發(fā)布的內(nèi)容,因為一旦Elasticsearch的schema定義發(fā)生變更,或者修改了搜索攝取管道,就需要對所有內(nèi)容進(jìn)行重新索引。

  • 我們還有一個個性化定制系統(tǒng),它只對最新的內(nèi)容感興趣。在個性化定制算法發(fā)生變化后,需要重新處理這些內(nèi)容。

在一開始,我們?yōu)檫@些應(yīng)用提供了API,讓它們直接訪問已發(fā)布的內(nèi)容,或者讓它們訂閱種子,一旦有新內(nèi)容發(fā)布,它們就會收到通知。

如何使用Kafka保存紐約時報并進(jìn)行推送

這種典型的基于API的解決方案存在很多問題。

不同的API是由不同的團(tuán)隊在不同的時期以不同的方式開發(fā)出來的。端點(diǎn)存在差異,語義存在差異,甚至連參數(shù)也存在差異。雖然我們可以試著去解決這些問題,但那需要協(xié)調(diào)各個團(tuán)隊,耗時耗力。

這些系統(tǒng)都定義了自己的schema,同一字段在不同的系統(tǒng)里叫法不一樣,而同一名字的字段在不同的系統(tǒng)里表示的卻是不同的意思。

另一個問題是,要訪問到之前發(fā)布的內(nèi)容是很困難的。大部分系統(tǒng)并沒有提供內(nèi)容流,因為它們使用的數(shù)據(jù)庫不支持這一特性。雖然內(nèi)容都保存在數(shù)據(jù)庫里,但大量的API調(diào)用相當(dāng)耗時,而且會給API服務(wù)帶來不可預(yù)料的負(fù)載。

基于日志的架構(gòu)

我們即將介紹的是一種基于日志的架構(gòu)。Martin Kleppmann在“Turning the database inside-out with Apache Samza”中提到了這一架構(gòu)方案,后來在“Designing Data-Intensive Applications”中有了更為詳細(xì)的描述?!癓og: What every software engineer should know about real-time data's unifying abstraction”則提到了將日志作為一種通用的數(shù)據(jù)結(jié)構(gòu)的說法。對于我們來說,我們的日志就是Kafka,所有發(fā)布的內(nèi)容按照時間順序添加到Kafka主題上,其他服務(wù)通過消費(fèi)日志來訪問這些內(nèi)容。

傳統(tǒng)的應(yīng)用使用數(shù)據(jù)庫保存數(shù)據(jù),盡管數(shù)據(jù)庫也有很多優(yōu)點(diǎn),但從長遠(yuǎn)來看,管理數(shù)據(jù)庫會成為一種負(fù)擔(dān)。首先,變更數(shù)據(jù)庫schema就很棘手。增加或移除字段并不難,但這些變更需要以暫停服務(wù)為代價。我們也無法自由地更換數(shù)據(jù)庫。大部分?jǐn)?shù)據(jù)庫不支持流式變更,盡管我們可以獲得數(shù)據(jù)庫快照,但這些快照很快就會過時。也就是說,我們難以創(chuàng)建衍生存儲,比如搜索引擎使用的索引,因為索引里必須包含所有文章內(nèi)容,而且一旦有新內(nèi)容發(fā)布就要重建索引。雖說我們可以讓客戶端同時將內(nèi)容發(fā)送給多個存儲系統(tǒng),但這樣仍然無法解決一致性問題,因為有的寫入會失敗。

從長遠(yuǎn)來看,數(shù)據(jù)庫最終會變成一個復(fù)雜的單體。

基于日志的架構(gòu)可以解決這些問題。一般來說,數(shù)據(jù)庫保存的是事件的結(jié)果或狀態(tài),而日志保存的是事件本身。我們可以基于日志創(chuàng)建任何我們想要的數(shù)據(jù)存儲,這些數(shù)據(jù)存儲就是日志的物化視圖,它們包含的是派生的內(nèi)容,而非原始內(nèi)容。如果要更改數(shù)據(jù)存儲的schema,只要創(chuàng)建一個新的數(shù)據(jù)存儲,然后從頭到尾再消費(fèi)一遍所有的日志就可以了,然后把舊的數(shù)據(jù)存儲扔掉。

一旦使用日志作為事實的來源,就沒有必要再使用中心數(shù)據(jù)庫了。每一個系統(tǒng)都可以創(chuàng)建屬于自己的數(shù)據(jù)存儲,或者說物化視圖,它只包含該系統(tǒng)所必需的數(shù)據(jù),而且為該系統(tǒng)提供了特定的格式。這就簡化了數(shù)據(jù)庫在架構(gòu)中的角色,更貼合每一個應(yīng)用的需求。

另外,基于日志的架構(gòu)也簡化了訪問內(nèi)容流的方式。對于傳統(tǒng)的數(shù)據(jù)庫來說,訪問整個數(shù)據(jù)轉(zhuǎn)儲(比如快照)和訪問“實時”數(shù)據(jù)(比如種子)是兩種不一樣的操作。而對于日志來說,它們并不存在差別。你可以從任意的偏移量處開始消費(fèi)日志,從起始位置也好,從中間開始也好,甚至從末尾也可以。也就是說,如果你要重新創(chuàng)建數(shù)據(jù)存儲,只要根據(jù)需要重新消費(fèi)日志即可。

基于日志架構(gòu)的系統(tǒng)在部署方面也有很多優(yōu)勢。在虛擬機(jī)里進(jìn)行無狀態(tài)服務(wù)的不變模式部署已經(jīng)成為一種常見的方式。重新部署整個實例可以避免很多問題。因為有了日志,我們現(xiàn)在可以進(jìn)行有狀態(tài)系統(tǒng)的不變模式部署。因為我們可以從日志中重新創(chuàng)建數(shù)據(jù)存儲,所以每次在部署變更的時候都可以獲得新的數(shù)據(jù)存儲。

為什么Google的PubSub或AWS SNS/SQS/Kinesis無法解決這些問題

Kafka一般有兩種應(yīng)用場景。

Kafka常被用作消息代理,用于數(shù)據(jù)分析和數(shù)據(jù)集成。Kafka在這方面有很多優(yōu)勢,不過Google PubSub、AWS SNS/SQS/Kinesis也能解決這些的問題。這些服務(wù)都支持多個消費(fèi)者和多個生產(chǎn)者,可以跟蹤消費(fèi)者的消費(fèi)狀態(tài),消費(fèi)者在宕機(jī)的時候不會出現(xiàn)數(shù)據(jù)丟失。在這些場景里,日志只是消息代理的一種具體實現(xiàn)而已。

但在基于日志的架構(gòu)里,情況就不一樣了。這個時候日志就不只是單純的實現(xiàn)細(xì)節(jié)那么簡單了,而是變成了核心功能。我們有以下兩點(diǎn)需求:

  1. 我們需要通過日志永久地保留所有事件,否則就無法隨意創(chuàng)建我們需要的數(shù)據(jù)存儲。

  2. 我們需要按照一定順序消費(fèi)日志,因為亂序處理關(guān)聯(lián)性事件會得到錯誤的結(jié)果。

目前也只有Kafka能夠滿足這兩個需求。

Monolog

Monolog是我們的新內(nèi)容發(fā)布源,其他系統(tǒng)把創(chuàng)建的內(nèi)容以追加的方式寫到Monolog。創(chuàng)建的內(nèi)容在進(jìn)入Monolog前會經(jīng)過一個網(wǎng)關(guān),網(wǎng)關(guān)會檢查流經(jīng)的內(nèi)容是否符合我們定義的schema。

如何使用Kafka保存紐約時報并進(jìn)行推送

Monolog里包含了自1851年以來發(fā)行的所有內(nèi)容,它們按照發(fā)行時間進(jìn)行排序。也就是說,消費(fèi)者可以從任意時間點(diǎn)開始消費(fèi)這些內(nèi)容。如果需要消費(fèi)所有的內(nèi)容,就從頭開始(也就是從1851年開始),或者根據(jù)需要只消費(fèi)那些更新過的部分。

舉個例子,我們有一個服務(wù)負(fù)責(zé)提供內(nèi)容清單,比如某個作者發(fā)布過的內(nèi)容、與某個科學(xué)主題相關(guān)的內(nèi)容,等等。這個服務(wù)會從起始位置開始消費(fèi)Monolog,然后構(gòu)建內(nèi)容清單。我們還有另外一個服務(wù),它只提供最新發(fā)布的內(nèi)容清單,所以它不需要永久的數(shù)據(jù)存儲,它只需要過去幾個小時的日志數(shù)據(jù)。它會在啟動的時候消費(fèi)最近幾個小時的日志,并在內(nèi)存里維護(hù)一個最新內(nèi)容的清單。

我們按照規(guī)范化形式將內(nèi)容發(fā)送給Monolog,每一部分內(nèi)容都被當(dāng)成一個單獨(dú)的消息寫入Kafka。例如,圖片和文章是分開發(fā)送的,因為多篇不同的文章可能包含同一張圖片。

如何使用Kafka保存紐約時報并進(jìn)行推送

這與關(guān)系型數(shù)據(jù)庫里的規(guī)范化模型很相似,圖片與文章之間是多對多關(guān)系。

在上一例子中,我們有兩篇文章引用了其他內(nèi)容。例如,標(biāo)題行是單獨(dú)發(fā)布的,然后又被其他兩篇文章引用。所有的內(nèi)容都使用nyt://article/577d0341-9a0a-46df-b454-ea0718026d30這種格式的URI來標(biāo)識。我們有一個原生瀏覽器可以查看這些URI,只要單擊這些URI就可以看到它們的JSON表示,而內(nèi)容本身則以protobuf格式保存在Monolog上。

Monolog實際上是Kafka上的一個主題,它只包含一個分區(qū),因為我們想要保持消息的全局順序。這樣可以保證頂層內(nèi)容的內(nèi)部一致性——如果我們在一篇文章里添加了一張圖片,同時又添加了一些文字,這些文字引用了這張圖片,那么我們就要確保圖片的位置應(yīng)該在新增文字之前。

實際上,內(nèi)容是按照拓?fù)涞姆绞竭M(jìn)行排序的,如下圖所示。

如何使用Kafka保存紐約時報并進(jìn)行推送

因為主題只包含了一個分區(qū),所以所有內(nèi)容都保存在同一個磁盤上(Kafka的存儲機(jī)制就是這樣的)。不過這對于我們來說不是問題,因為我們所有的內(nèi)容都是文字,到現(xiàn)在總量都沒超過100GB。

規(guī)范化日志和Kafka Streams API

Monolog滿足了部分應(yīng)用程序的需求,這些應(yīng)用需要規(guī)范化的數(shù)據(jù)視圖,但對于其他一些應(yīng)用程序來說就不是這么回事了。比如,為了將數(shù)據(jù)索引至Elasticsearch,就需要非規(guī)范化的數(shù)據(jù),因為Elasticsearch不支持多對多的關(guān)系映射。如果要通過圖片說明來搜索文章,這些圖片的說明性文字就必須被包含在文章對象里。

為了支持這種數(shù)據(jù)視圖,我們也準(zhǔn)備了一套非規(guī)范化的日志。在這些日志里,頂層的內(nèi)容及其所有依賴項都被打包發(fā)布。例如,在發(fā)布Article 1的時候,日志消息里不僅包含了這篇文章,也包含了相關(guān)的圖片和標(biāo)簽。

如何使用Kafka保存紐約時報并進(jìn)行推送

Kafka消費(fèi)者客戶端從日志里消費(fèi)消息,再添加到Elasticsearch索引里。在發(fā)布Ariticle 2的時候,這篇文章的所有相關(guān)內(nèi)容也會被打包在一起,即使有些圖片可能已經(jīng)在Ariticle 1里出現(xiàn)過。

如何使用Kafka保存紐約時報并進(jìn)行推送

如果文章的依賴項發(fā)生變化,整篇內(nèi)容就會被重新發(fā)布。比如,如果更新了Image 2,那么Article 1就會再次被添加到日志里。

如何使用Kafka保存紐約時報并進(jìn)行推送

我們使用一個叫作Denormalizer的組件來創(chuàng)建非規(guī)范化日志。

Denormalizer是一個使用了Kafka Streams API的Java應(yīng)用程序。它消費(fèi)Monolog的消息,并在本地為每一篇文章保留了一份最新的版本,包括對文章的引用。隨著內(nèi)容不斷地發(fā)布和更新,本地存儲也會持續(xù)更新。一旦有頂層內(nèi)容發(fā)布,Denormalizer就會從本地存儲中收集所有的依賴項,把它們打包寫到非規(guī)范化日志中。如果某個頂層內(nèi)容的依賴項發(fā)生了變化,Denormalizer就會重新發(fā)布整個包。

非規(guī)范化日志不需要全局的順序保證,我們只要確保同一篇文章的不同版本是按照一定順序?qū)懭肴罩揪涂梢粤恕K晕覀兛梢允褂梅謪^(qū),讓多個消費(fèi)者同時消費(fèi)這些分區(qū)。

Elasticsearch示例

下圖展示了我們構(gòu)建的后端搜索服務(wù),我們使用了Elasticsearch。

如何使用Kafka保存紐約時報并進(jìn)行推送

整個數(shù)據(jù)流程是這樣的。

  • CMS發(fā)布或更新內(nèi)容。

  • 內(nèi)容以protobuf二進(jìn)制的方式發(fā)送到網(wǎng)關(guān)。

  • 網(wǎng)關(guān)驗證內(nèi)容,并把它寫入Monolog。

  • Denormalizer從Monolog消費(fèi)日志,如果是頂層內(nèi)容,就從本地存儲中收集所有依賴項,再打包寫入非規(guī)范化日志中。如果是被引用的內(nèi)容,那么所有與之相關(guān)的頂層內(nèi)容也會被寫入非規(guī)范化日志。

  • Kafka分區(qū)器根據(jù)頂層內(nèi)容的URI來分區(qū)。

  • 所有的搜索節(jié)點(diǎn)通過調(diào)用Kafka Streams API來訪問非規(guī)范化日志,每個節(jié)點(diǎn)讀取一個分區(qū),把消息包裝成JSON對象,再添加到Elasticsearch索引里,最后再寫到指定的Elasticsearch節(jié)點(diǎn)上。在進(jìn)行索引重建的時候,我們把復(fù)制功能關(guān)閉,這樣可以加快索引速度,在構(gòu)建好索引后再把復(fù)制功能打開。

實現(xiàn)

我們的發(fā)布管道部署在Google Cloud Platform上。我不打算在這里描述具體的細(xì)節(jié),不過下圖給出了它的整體架構(gòu)情況。我們在GCP Compute上運(yùn)行Kafka和ZooKeeper,其他的組件——網(wǎng)關(guān)、Kafka副本節(jié)點(diǎn)和Denormalizer則運(yùn)行在容器里。我們使用了基于gRPC和Cloud Endpoint的API,并使用SSL認(rèn)證和授權(quán)確保Kafka的安全。

如何使用Kafka保存紐約時報并進(jìn)行推送

我們花了將近一年時間在我們的新架構(gòu)上,現(xiàn)在它已經(jīng)運(yùn)行在生產(chǎn)環(huán)境中。不過現(xiàn)在還只是個開始,我們還有很多其他系統(tǒng)也需要遷移到這個架構(gòu)里。新架構(gòu)有很多優(yōu)勢,但這對于開發(fā)者來說也是一次重大的思維轉(zhuǎn)變,他們需要從傳統(tǒng)數(shù)據(jù)庫和發(fā)布訂閱模型轉(zhuǎn)向新的數(shù)據(jù)流模型。為了讓這些優(yōu)勢發(fā)揚(yáng)光大,我們需要改變我們的開發(fā)方式,還要花很多精力構(gòu)建工具和基礎(chǔ)設(shè)施,讓開發(fā)變得更簡單。

看完上述內(nèi)容,你們掌握如何使用Kafka保存紐約時報并進(jìn)行推送的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI