您好,登錄后才能下訂單哦!
版權(quán)聲明:本文為博主原創(chuàng)文章,未經(jīng)博主允許不得轉(zhuǎn)載。
目錄(?)[+]
Transaction接口是基于flume的穩(wěn)定性考慮的。所有主要的組件(sources、sinks、channels)都必須使用Flume Transaction。我們也可以理解Transaction接口就是flume的事務(wù),sources和sinks的發(fā)送數(shù)據(jù)與接受數(shù)據(jù)都是在一個Transaction里完成的。
從上圖中可以看出,一個Transaction在Channel實(shí)現(xiàn)內(nèi)實(shí)現(xiàn)。每一個連接到channel的source和sink都要獲取一個Transaction對象。這Sources實(shí)際上使用了一個ChannelSelector接口來封裝Transaction。存放事件到channel和從channel中提取事件的操作是在一個活躍的Transaction內(nèi)執(zhí)行的。
下面是官網(wǎng)例子
[java] view plain copy
Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event eventToStage = EventBuilder.withBody("Hello Flume!",
Charset.forName("UTF-8"));
ch.put(eventToStage);
// Event takenEvent = ch.take();
// ...
txn.commit();
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
上面的代碼是一個很簡單的Transaction示例,在自定義Source與自定義Sink中都要使用。
Sink提取event數(shù)據(jù)從channel中,然后直接將數(shù)據(jù)發(fā)送到下一個flume agent中或者存儲到外部庫中。
Sink和channel的關(guān)聯(lián)關(guān)系可以在配置文件中配置。有一個SinkRunner實(shí)例與每一個已配置的Sink關(guān)聯(lián),當(dāng)Flume框架調(diào)用SinkRunner.start()方法時候,將創(chuàng)建一個新的線程來驅(qū)動這Sink。
這個線程將管理這個Sink的生命周期。Sink需要實(shí)現(xiàn)LifecycleAware接口的start()和stop()方法。start()方法用于初始化數(shù)據(jù);stop()用于釋放資源;process()是從channel中提取event數(shù)據(jù)和轉(zhuǎn)發(fā)數(shù)據(jù)的核心方法。
這Sink需要實(shí)現(xiàn)Configurable接口以便操作配置文件。
下面是官網(wǎng)例子:
[java] view plain copy
public class MySink extends AbstractSink implements Configurable {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}
@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
// Send the Event to the external repository.
// storeSomeData(e);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}
下面是測試?yán)樱?br />
[java] view plain copy
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
public class Custom_Sink extends AbstractSink implements Configurable {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}
@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
String out = new String(event.getBody());
// Send the Event to the external repository.
// storeSomeData(e);
System.out.println(out);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}
上面的測試?yán)又惠敵鍪录腂ODY信息,這里說明下直接用代碼event.getBody().tostring() 輸出是亂碼。因?yàn)樗衧ink都是在Transaction里完成的,因此自定義開發(fā)sink是需要加上Transaction相關(guān)設(shè)置。
然后是測試配置,這里是自定義的jar 包是flumedev.Custom_Sink。注意,打包之后請放在目錄$FLUME_HOME/lib下
[html] view plain copy
#配置文件:custom_sink_case23.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.bind = 192.168.233.128
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = flumedev.Custom_Sink
#a1.sinks.k1.type =logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#敲命令
flume-ng agent -cconf -f conf/custom_sink_case23.conf -n a1 -Dflume.root.logger=INFO,console
啟動成功后
打開另一個終端輸入,往偵聽端口送數(shù)據(jù)
echo "testcustom_sink" | nc 192.168.233.128 50000
#在啟動的終端查看console輸出
可以看到數(shù)據(jù)正常輸出。
Source從外面接收數(shù)據(jù)并把數(shù)據(jù)存入Channel中。很少有人用。
下面是官網(wǎng)的例子
[java] view plain copy
public class MySource extends AbstractSource implements Configurable, PollableSource {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation, convert to another type, ...)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external client
}
@Override
public void stop () {
// Disconnect from external client and do any additional cleanup
// (e.g. releasing resources or nulling-out field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
// Receive new data
Event e = getSomeData();
// Store the Event into this Source's associated Channel(s)
getChannelProcessor().processEvent(e)
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}
測試的話,主要針對Event e 這里進(jìn)行傳輸數(shù)據(jù),這里就不測試了。
官網(wǎng)說待定。
下面是美團(tuán)網(wǎng)的自定義Channel 開發(fā),下面是鏈接
http://tech.meituan.com/mt-log-system-optimization.html
……
Flume本身提供了MemoryChannel和FileChannel。MemoryChannel處理速度快,但緩存大小有限,且沒有持久化;FileChannel則剛好相反。我們希望利用兩者的優(yōu)勢,在Sink處理速度夠快,Channel沒有緩存過多日志的時候,就使用MemoryChannel,當(dāng)Sink處理速度跟不上,又需要Channel能夠緩存下應(yīng)用端發(fā)送過來的日志時,就使用FileChannel,由此我們開發(fā)了DualChannel,能夠智能的在兩個Channel之間切換。
其具體的邏輯如下:
[java] view plain copy
/***
* putToMemChannel indicate put event to memChannel or fileChannel
* takeFromMemChannel indicate take event from memChannel or fileChannel
* */
private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);
void doPut(Event event) {
if (switchon && putToMemChannel.get()) {
//往memChannel中寫數(shù)據(jù)
memTransaction.put(event);
if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
putToMemChannel.set(false);
}
} else {
//往fileChannel中寫數(shù)據(jù)
fileTransaction.put(event);
}
}
Event doTake() {
Event event = null;
if ( takeFromMemChannel.get() ) {
//從memChannel中取數(shù)據(jù)
event = memTransaction.take();
if (event == null) {
takeFromMemChannel.set(false);
}
} else {
//從fileChannel中取數(shù)據(jù)
event = fileTransaction.take();
if (event == null) {
takeFromMemChannel.set(true);
putToMemChannel.set(true);
}
}
return event;
}
這里要說明下,官網(wǎng)是建議使用file channel,雖然它的效率比較低,但是它能保證數(shù)據(jù)完整性,而memory channel效率高,但是只能對數(shù)據(jù)丟失和重復(fù)不太敏感的業(yè)務(wù)使用
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。