溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flume NG 學(xué)習(xí)筆記(十) Transaction、Sink、Source和Channel開發(fā)

發(fā)布時間:2020-07-27 22:42:46 來源:網(wǎng)絡(luò) 閱讀:2184 作者:jackwxh 欄目:開發(fā)技術(shù)

目錄(?)[+]

一、Transaction interface

Transaction接口是基于flume的穩(wěn)定性考慮的。所有主要的組件(sources、sinks、channels)都必須使用Flume Transaction。我們也可以理解Transaction接口就是flume的事務(wù),sources和sinks的發(fā)送數(shù)據(jù)與接受數(shù)據(jù)都是在一個Transaction里完成的。


從上圖中可以看出,一個Transaction在Channel實(shí)現(xiàn)內(nèi)實(shí)現(xiàn)。每一個連接到channel的source和sink都要獲取一個Transaction對象。這Sources實(shí)際上使用了一個ChannelSelector接口來封裝Transaction。存放事件到channel和從channel中提取事件的操作是在一個活躍的Transaction內(nèi)執(zhí)行的。

下面是官網(wǎng)例子


[java] view plain copy

  1. Channel ch = new MemoryChannel();  

  2. Transaction txn = ch.getTransaction();  

  3. txn.begin();  

  4. try {  

  5.   // This try clause includes whatever Channel operations you want to do  

  6.   

  7.   Event eventToStage = EventBuilder.withBody("Hello Flume!",  

  8.                        Charset.forName("UTF-8"));  

  9.   ch.put(eventToStage);  

  10.   // Event takenEvent = ch.take();  

  11.   // ...  

  12.   txn.commit();  

  13. catch (Throwable t) {  

  14.   txn.rollback();  

  15.   

  16.   // Log exception, handle individual exceptions as needed  

  17.   

  18.   // re-throw all Errors  

  19.   if (t instanceof Error) {  

  20.     throw (Error)t;  

  21.   }  

  22. finally {  

  23.   txn.close();  

  24. }  


上面的代碼是一個很簡單的Transaction示例,在自定義Source與自定義Sink中都要使用。

二、自定義Sink開發(fā)

Sink提取event數(shù)據(jù)從channel中,然后直接將數(shù)據(jù)發(fā)送到下一個flume agent中或者存儲到外部庫中。

Sink和channel的關(guān)聯(lián)關(guān)系可以在配置文件中配置。有一個SinkRunner實(shí)例與每一個已配置的Sink關(guān)聯(lián),當(dāng)Flume框架調(diào)用SinkRunner.start()方法時候,將創(chuàng)建一個新的線程來驅(qū)動這Sink。

這個線程將管理這個Sink的生命周期。Sink需要實(shí)現(xiàn)LifecycleAware接口的start()和stop()方法。start()方法用于初始化數(shù)據(jù);stop()用于釋放資源;process()是從channel中提取event數(shù)據(jù)和轉(zhuǎn)發(fā)數(shù)據(jù)的核心方法。

這Sink需要實(shí)現(xiàn)Configurable接口以便操作配置文件。

下面是官網(wǎng)例子:

[java] view plain copy

  1. public class MySink extends AbstractSink implements Configurable {  

  2.   private String myProp;  

  3.   

  4.   @Override  

  5.   public void configure(Context context) {  

  6.     String myProp = context.getString("myProp""defaultValue");  

  7.   

  8.     // Process the myProp value (e.g. validation)  

  9.   

  10.     // Store myProp for later retrieval by process() method  

  11.     this.myProp = myProp;  

  12.   }  

  13.   

  14.   @Override  

  15.   public void start() {  

  16.     // Initialize the connection to the external repository (e.g. HDFS) that  

  17.     // this Sink will forward Events to ..  

  18.   }  

  19.   

  20.   @Override  

  21.   public void stop () {  

  22.     // Disconnect from the external respository and do any  

  23.     // additional cleanup (e.g. releasing resources or nulling-out  

  24.     // field values) ..  

  25.   }  

  26.   

  27.   @Override  

  28.   public Status process() throws EventDeliveryException {  

  29.     Status status = null;  

  30.   

  31.     // Start transaction  

  32.     Channel ch = getChannel();  

  33.     Transaction txn = ch.getTransaction();  

  34.     txn.begin();  

  35.     try {  

  36.       // This try clause includes whatever Channel operations you want to do  

  37.   

  38.       Event event = ch.take();  

  39.   

  40.       // Send the Event to the external repository.  

  41.       // storeSomeData(e);  

  42.   

  43.       txn.commit();  

  44.       status = Status.READY;  

  45.     } catch (Throwable t) {  

  46.       txn.rollback();  

  47.   

  48.       // Log exception, handle individual exceptions as needed  

  49.   

  50.       status = Status.BACKOFF;  

  51.   

  52.       // re-throw all Errors  

  53.       if (t instanceof Error) {  

  54.         throw (Error)t;  

  55.       }  

  56.     } finally {  

  57.       txn.close();  

  58.     }  

  59.     return status;  

  60.   }  

  61. }  

下面是測試?yán)樱?br />

[java] view plain copy

  1. import org.apache.flume.Channel;  

  2. import org.apache.flume.Context;  

  3. import org.apache.flume.Event;  

  4. import org.apache.flume.EventDeliveryException;  

  5. import org.apache.flume.Transaction;  

  6. import org.apache.flume.conf.Configurable;  

  7.   

  8. import org.apache.flume.sink.AbstractSink;  

  9.   

  10.   

  11. public class Custom_Sink extends AbstractSink implements Configurable {  

  12.       private String myProp;  

  13.      @Override  

  14.       public void configure(Context context) {  

  15.         String myProp = context.getString("myProp""defaultValue");  

  16.   

  17.         // Process the myProp value (e.g. validation)  

  18.   

  19.         // Store myProp for later retrieval by process() method  

  20.         this.myProp = myProp;  

  21.       }  

  22.   

  23.       @Override  

  24.       public void start() {  

  25.         // Initialize the connection to the external repository (e.g. HDFS) that  

  26.         // this Sink will forward Events to ..  

  27.       }  

  28.   

  29.       @Override  

  30.       public void stop () {  

  31.         // Disconnect from the external respository and do any  

  32.         // additional cleanup (e.g. releasing resources or nulling-out  

  33.         // field values) ..  

  34.       }  

  35.   

  36.       @Override  

  37.       public Status process() throws EventDeliveryException {  

  38.         Status status = null;  

  39.   

  40.         // Start transaction  

  41.         Channel ch = getChannel();  

  42.         Transaction txn = ch.getTransaction();  

  43.         txn.begin();  

  44.         try {  

  45.           // This try clause includes whatever Channel operations you want to do  

  46.             

  47.           Event event = ch.take();  

  48.           String out = new String(event.getBody());   

  49.           // Send the Event to the external repository.  

  50.           // storeSomeData(e);  

  51.           System.out.println(out);  

  52.             

  53.           txn.commit();  

  54.           status = Status.READY;  

  55.         } catch (Throwable t) {  

  56.           txn.rollback();  

  57.   

  58.           // Log exception, handle individual exceptions as needed  

  59.   

  60.           status = Status.BACKOFF;  

  61.   

  62.           // re-throw all Errors  

  63.           if (t instanceof Error) {  

  64.             throw (Error)t;  

  65.           }  

  66.         } finally {  

  67.           txn.close();  

  68.         }  

  69.         return status;  

  70.       }  

  71.   

  72. }  

上面的測試?yán)又惠敵鍪录腂ODY信息,這里說明下直接用代碼event.getBody().tostring() 輸出是亂碼。因?yàn)樗衧ink都是在Transaction里完成的,因此自定義開發(fā)sink是需要加上Transaction相關(guān)設(shè)置。

 

然后是測試配置,這里是自定義的jar 包是flumedev.Custom_Sink。注意,打包之后請放在目錄$FLUME_HOME/lib下

[html] view plain copy

  1. #配置文件:custom_sink_case23.conf  

  2. # Name the components on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1  

  5. a1.channels = c1  

  6.   

  7. # Describe/configure the source  

  8. a1.sources.r1.type = syslogtcp  

  9. a1.sources.r1.port = 50000  

  10. a1.sources.r1.bind = 192.168.233.128  

  11. a1.sources.r1.channels = c1  

  12.   

  13. # Describe the sink  

  14. a1.sinks.k1.channel = c1  

  15. a1.sinks.k1.type = flumedev.Custom_Sink  

  16. #a1.sinks.k1.type =logger  

  17.   

  18. # Use a channel which buffers events in memory  

  19. a1.channels.c1.type = memory  

  20. a1.channels.c1.capacity = 1000  

  21. a1.channels.c1.transactionCapacity = 100  

#敲命令

flume-ng agent -cconf -f conf/custom_sink_case23.conf -n a1 -Dflume.root.logger=INFO,console

啟動成功后

打開另一個終端輸入,往偵聽端口送數(shù)據(jù)

echo "testcustom_sink" | nc 192.168.233.128 50000

#在啟動的終端查看console輸出

Flume NG 學(xué)習(xí)筆記(十) Transaction、Sink、Source和Channel開發(fā)

可以看到數(shù)據(jù)正常輸出。


三、自定義Source開發(fā)

Source從外面接收數(shù)據(jù)并把數(shù)據(jù)存入Channel中。很少有人用。

下面是官網(wǎng)的例子

[java] view plain copy

  1. public class MySource extends AbstractSource implements Configurable, PollableSource {  

  2.   private String myProp;  

  3.   

  4.   @Override  

  5.   public void configure(Context context) {  

  6.     String myProp = context.getString("myProp""defaultValue");  

  7.   

  8.     // Process the myProp value (e.g. validation, convert to another type, ...)  

  9.   

  10.     // Store myProp for later retrieval by process() method  

  11.     this.myProp = myProp;  

  12.   }  

  13.   

  14.   @Override  

  15.   public void start() {  

  16.     // Initialize the connection to the external client  

  17.   }  

  18.   

  19.   @Override  

  20.   public void stop () {  

  21.     // Disconnect from external client and do any additional cleanup  

  22.     // (e.g. releasing resources or nulling-out field values) ..  

  23.   }  

  24.   

  25.   @Override  

  26.   public Status process() throws EventDeliveryException {  

  27.     Status status = null;  

  28.   

  29.     // Start transaction  

  30.     Channel ch = getChannel();  

  31.     Transaction txn = ch.getTransaction();  

  32.     txn.begin();  

  33.     try {  

  34.       // This try clause includes whatever Channel operations you want to do  

  35.   

  36.       // Receive new data  

  37.       Event e = getSomeData();  

  38.   

  39.       // Store the Event into this Source's associated Channel(s)  

  40.       getChannelProcessor().processEvent(e)  

  41.   

  42.       txn.commit();  

  43.       status = Status.READY;  

  44.     } catch (Throwable t) {  

  45.       txn.rollback();  

  46.   

  47.       // Log exception, handle individual exceptions as needed  

  48.   

  49.       status = Status.BACKOFF;  

  50.   

  51.       // re-throw all Errors  

  52.       if (t instanceof Error) {  

  53.         throw (Error)t;  

  54.       }  

  55.     } finally {  

  56.       txn.close();  

  57.     }  

  58.     return status;  

  59.   }  

  60. }  


測試的話,主要針對Event e 這里進(jìn)行傳輸數(shù)據(jù),這里就不測試了。

 

四、自定義Channel開發(fā)

官網(wǎng)說待定。

下面是美團(tuán)網(wǎng)的自定義Channel 開發(fā),下面是鏈接

http://tech.meituan.com/mt-log-system-optimization.html

……

Flume本身提供了MemoryChannel和FileChannel。MemoryChannel處理速度快,但緩存大小有限,且沒有持久化;FileChannel則剛好相反。我們希望利用兩者的優(yōu)勢,在Sink處理速度夠快,Channel沒有緩存過多日志的時候,就使用MemoryChannel,當(dāng)Sink處理速度跟不上,又需要Channel能夠緩存下應(yīng)用端發(fā)送過來的日志時,就使用FileChannel,由此我們開發(fā)了DualChannel,能夠智能的在兩個Channel之間切換。

其具體的邏輯如下:

[java] view plain copy

  1. /*** 

  2.  * putToMemChannel indicate put event to memChannel or fileChannel 

  3.  * takeFromMemChannel indicate take event from memChannel or fileChannel 

  4.  * */  

  5. private AtomicBoolean putToMemChannel = new AtomicBoolean(true);  

  6. private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);  

  7.   

  8. void doPut(Event event) {  

  9.         if (switchon && putToMemChannel.get()) {  

  10.               //往memChannel中寫數(shù)據(jù)  

  11.               memTransaction.put(event);  

  12.   

  13.               if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {  

  14.                 putToMemChannel.set(false);  

  15.               }  

  16.         } else {  

  17.               //往fileChannel中寫數(shù)據(jù)  

  18.               fileTransaction.put(event);  

  19.         }  

  20.   }  

  21.   

  22. Event doTake() {  

  23.     Event event = null;  

  24.     if ( takeFromMemChannel.get() ) {  

  25.         //從memChannel中取數(shù)據(jù)  

  26.         event = memTransaction.take();  

  27.         if (event == null) {  

  28.             takeFromMemChannel.set(false);  

  29.         }   

  30.     } else {  

  31.         //從fileChannel中取數(shù)據(jù)  

  32.         event = fileTransaction.take();  

  33.         if (event == null) {  

  34.             takeFromMemChannel.set(true);  

  35.   

  36.             putToMemChannel.set(true);  

  37.         }   

  38.     }  

  39.     return event;  

  40. }  

這里要說明下,官網(wǎng)是建議使用file channel,雖然它的效率比較低,但是它能保證數(shù)據(jù)完整性,而memory channel效率高,但是只能對數(shù)據(jù)丟失和重復(fù)不太敏感的業(yè)務(wù)使用


向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI