溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何實(shí)現(xiàn)Flune Client 開(kāi)發(fā)

發(fā)布時(shí)間:2022-01-10 11:47:26 來(lái)源:億速云 閱讀:118 作者:柒染 欄目:開(kāi)發(fā)技術(shù)

如何實(shí)現(xiàn)Flune Client 開(kāi)發(fā),很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。

由于在實(shí)際工作中,數(shù)據(jù)的生產(chǎn)方式極具多樣性,F(xiàn)lume 雖然包含了一些內(nèi)置的機(jī)制來(lái)采集數(shù)據(jù),但是更多的時(shí)候用戶更希望能將應(yīng)用程序和flume直接相通。所以這邊運(yùn)行用戶開(kāi)發(fā)應(yīng)用程序,通過(guò)IPC或者RPC連接flume并往flume發(fā)送數(shù)據(jù)。

一、RPC client interface

Flume的RpcClient實(shí)現(xiàn)了Flume的RPC機(jī)制。用戶的應(yīng)用程序可以很簡(jiǎn)單的調(diào)用Flume Client SDK的append(Event) 或者appendBatch(List<Event>) 方法發(fā)送數(shù)據(jù),不用擔(dān)心底層信息交換的細(xì)節(jié)。用戶可以提供所需的event通過(guò)直接實(shí)現(xiàn)Event接口,例如可以使用簡(jiǎn)單的方便的實(shí)現(xiàn)SimpleEvent類或者使用EventBuilder的writeBody()靜態(tài)輔助方法。

自Flume 1.4.0起,Avro是默認(rèn)的RPC協(xié)議。NettyAvroRpcClient和ThriftRpcClient實(shí)現(xiàn)了RpcClient接口。實(shí)現(xiàn)中我們需要知道我們將要連接的目標(biāo)flume agent的host和port用于創(chuàng)建client實(shí)例,然后使用RpcClient發(fā)送數(shù)據(jù)到flume agent。

官網(wǎng)給了一個(gè)Avro RPCclients的例子,這邊直接拿來(lái)做實(shí)際測(cè)試?yán)印?/p>

這里我們把client.init("host.example.org",41414);

改成 client.init("192.168.233.128",50000);  與我們的主機(jī)對(duì)接

[java] view plain copy

  1. import org.apache.flume.Event;  

  2. import org.apache.flume.EventDeliveryException;  

  3. import org.apache.flume.api.RpcClient;  

  4. import org.apache.flume.api.RpcClientFactory;  

  5. import org.apache.flume.event.EventBuilder;  

  6. import java.nio.charset.Charset;  

  7.    

  8. public class MyApp {  

  9.   public static voidmain(String[] args) {  

  10.    MyRpcClientFacade client = new MyRpcClientFacade();  

  11.    // Initializeclient with the remote Flume agent's host and port  

  12. //client.init("host.example.org",41414);  

  13. client.init("192.168.233.128",50000);  

  14.    

  15.    // Send 10events to the remote Flume agent. That agent should be  

  16.    // configured tolisten with an AvroSource.  

  17.    String sampleData = "Hello Flume!";  

  18.    for (int i =0; i < 10; i++) {  

  19.      client.sendDataToFlume(sampleData);  

  20.    }  

  21.    

  22.    client.cleanUp();  

  23.   }  

  24. }  

  25.    

  26. class MyRpcClientFacade {  

  27.   private RpcClient client;  

  28.   private String hostname;  

  29.   private int port;  

  30.    

  31.   public void init(String hostname, int port) {  

  32.    // Setup the RPCconnection  

  33.    this.hostname = hostname;  

  34.    this.port = port;  

  35.    this.client = RpcClientFactory.getDefaultInstance(hostname, port);  

  36.    // Use thefollowing method to create a thrift client (instead of the above line):  

  37.     // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  38.   }  

  39.    

  40.   public void sendDataToFlume(String data) {  

  41.    // Create aFlume Event object that encapsulates the sample data  

  42.    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));  

  43.    

  44.    // Send theevent  

  45.    try {  

  46.      client.append(event);  

  47.    } catch (EventDeliveryException e) {  

  48.      // clean up andrecreate the client  

  49.      client.close();  

  50.      client = null;  

  51.      client = RpcClientFactory.getDefaultInstance(hostname, port);  

  52.      // Use thefollowing method to create a thrift client (instead of the above line):  

  53.      // this.client =RpcClientFactory.getThriftInstance(hostname, port);  

  54.    }  

  55.   }  

  56.    

  57.   public void cleanUp() {  

  58.    // Close the RPCconnection  

  59.    client.close();  

  60.   }  

  61.    

  62. }  

這邊代碼不解釋了,主要是將HelloFlume 發(fā)送10遍給flume,同時(shí)記得將flume 安裝主目錄下的lib 文件都添加進(jìn)項(xiàng)目,才能正常運(yùn)行程序。

下面是代理配置:

[html] view plain copy

  1. #配置文件:avro_client_case20.conf  

  2. # Name the components on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1  

  5. a1.channels = c1  

  6.    

  7. # Describe/configure the source  

  8. a1.sources.r1.type = avro  

  9. a1.sources.r1.port = 50000  

  10. a1.sources.r1.host = 192.168.233.128  

  11. a1.sources.r1.channels = c1  

  12.    

  13. # Describe the sink  

  14. a1.sinks.k1.channel = c1  

  15. a1.sinks.k1.type = logger  

  16.    

  17. # Use a channel which buffers events inmemory  

  18. a1.channels.c1.type = memory  

  19. a1.channels.c1.capacity = 1000  

  20. a1.channels.c1.transactionCapacity = 100  

這里要注意下,之前說(shuō)了,在接收端需要AvroSource或者Thrift Source來(lái)監(jiān)聽(tīng)接口。所以配置代理的時(shí)候要把a(bǔ)1.sources.r1.type 寫(xiě)成avro或者thrift

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

啟動(dòng)成功后

在eclipse 里運(yùn)行Java程序,當(dāng)然也可以打包后在服務(wù)器上運(yùn)行JAVA程序。

#在啟動(dòng)源發(fā)送的代理終端查看console輸出

如何實(shí)現(xiàn)Flune Client 開(kāi)發(fā)

可以看到10條數(shù)據(jù)正常發(fā)送。

這里要說(shuō)明下,開(kāi)發(fā)代碼中client.append(event)不僅僅可以發(fā)送一條數(shù)據(jù),也可以發(fā)送一個(gè)List(string) 的數(shù)據(jù)信息,也就是批量發(fā)送。這邊就不做演示了。

二、Failover Client

這個(gè)類包封裝了Avro RPCclient的類默認(rèn)提供故障處理能力。hosts采用空格分開(kāi)host:port所代表的flume agent,構(gòu)成一個(gè)故障處理組。這Failover RPC Client目前不支持thrift。如果當(dāng)前選擇的host agent有問(wèn)題,這個(gè)failover client會(huì)自動(dòng)負(fù)載到組中下一個(gè)host中。

下面是官網(wǎng)開(kāi)發(fā)例子:

[java] view plain copy

  1. // Setup properties for the failover  

  2. Properties props = new Properties();  

  3. props.put("client.type""default_failover");  

  4.   

  5. // List of hosts (space-separated list of user-chosen host aliases)  

  6. props.put("hosts""h2 h3 h4");  

  7.   

  8. // host/port pair for each host alias  

  9. String host1 = "host1.example.org:41414";  

  10. String host2 = "host2.example.org:41414";  

  11. String host3 = "host3.example.org:41414";  

  12. props.put("hosts.h2", host1);  

  13. props.put("hosts.h3", host2);  

  14. props.put("hosts.h4", host3);  

  15.   

  16. // create the client with failover properties  

  17. RpcClient client = RpcClientFactory.getInstance(props);  

下面是測(cè)試的開(kāi)發(fā)例子

[java] view plain copy

  1. import org.apache.flume.Event;  

  2. import org.apache.flume.EventDeliveryException;  

  3. import org.apache.flume.api.RpcClient;  

  4. import org.apache.flume.api.RpcClientFactory;  

  5. import org.apache.flume.event.EventBuilder;  

  6.   

  7. import java.nio.charset.Charset;  

  8. import java.util.Properties;  

  9.   

  10. public class Failover_Client {  

  11.     public static void main(String[] args) {  

  12.         MyRpcClientFacade2 client = new MyRpcClientFacade2();  

  13.         // Initialize client with the remote Flume agent's host and port  

  14.         client.init();  

  15.   

  16.         // Send 10 events to the remote Flume agent. That agent should be  

  17.         // configured to listen with an AvroSource.  

  18.         String sampleData = "Hello Flume!";  

  19.         for (int i = 0; i < 10; i++) {  

  20.           client.sendDataToFlume(sampleData);  

  21.         }  

  22.   

  23.         client.cleanUp();  

  24.       }  

  25.     }  

  26.   

  27.     class MyRpcClientFacade2 {  

  28.       private RpcClient client;  

  29.       private String hostname;  

  30.       private int port;  

  31.   

  32.       public void init() {  

  33.         // Setup the RPC connection  

  34.         // Use the following method to create a thrift client (instead of the above line):  

  35.         // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  36.      // Setup properties for the failover  

  37.         Properties props = new Properties();  

  38.         props.put("client.type""default_failover");  

  39.   

  40.         // List of hosts (space-separated list of user-chosen host aliases)  

  41.         props.put("hosts""h2 h3 h4");  

  42.   

  43.         // host/port pair for each host alias  

  44.         String host1 = "192.168.233.128:50000";  

  45.         String host2 = "192.168.233.128:50001";  

  46.         String host3 = "192.168.233.128:50002";  

  47.         props.put("hosts.h2", host1);  

  48.         props.put("hosts.h3", host2);  

  49.         props.put("hosts.h4", host3);  

  50.   

  51.         // create the client with failover properties  

  52.         client = RpcClientFactory.getInstance(props);  

  53.       }  

  54.   

  55.       public void sendDataToFlume(String data) {  

  56.         // Create a Flume Event object that encapsulates the sample data  

  57.         Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));  

  58.   

  59.         // Send the event  

  60.         try {  

  61.           client.append(event);  

  62.         } catch (EventDeliveryException e) {  

  63.           // clean up and recreate the client  

  64.           client.close();  

  65.           client = null;  

  66.           client = RpcClientFactory.getDefaultInstance(hostname, port);  

  67.           // Use the following method to create a thrift client (instead of the above line):  

  68.           // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  69.         }  

  70.       }  

  71.   

  72.       public void cleanUp() {  

  73.         // Close the RPC connection  

  74.         client.close();  

  75.       }  

  76. }  

這邊代碼設(shè)三個(gè)host用于故障轉(zhuǎn)移,這里偷懶,用同一個(gè)主機(jī)的3個(gè)端口模擬。代碼還是將Hello Flume 發(fā)送10遍給第一個(gè)flume代理,當(dāng)?shù)谝粋€(gè)代理故障的時(shí)候,則發(fā)送給第二個(gè)代理,以順序進(jìn)行故障轉(zhuǎn)移。

下面是代理配置沿用之前的那個(gè),并對(duì)配置文件進(jìn)行拷貝,

cp avro_client_case20.conf avro_client_case21.conf

cp avro_client_case20.conf avro_client_case22.conf

分別修改avro_client_case21.conf與avro_client_case22.conf中的

a1.sources.r1.port= 50001 與a1.sources.r1.port = 50002

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console

啟動(dòng)成功后

在eclipse 里運(yùn)行JAVA程序Failover_Client.java,當(dāng)然也可以打包后在服務(wù)器上運(yùn)行JAVA程序。

#在啟動(dòng)源發(fā)送的3個(gè)代理終端查看console輸出

我們可以看到第一個(gè)代理終端收到了,數(shù)據(jù)而其他2個(gè)終端沒(méi)有數(shù)據(jù)。

如何實(shí)現(xiàn)Flune Client 開(kāi)發(fā)

然后我們把第一個(gè)終端的進(jìn)程關(guān)掉,再運(yùn)行一遍client程序,然后會(huì)發(fā)現(xiàn)這個(gè)時(shí)候是發(fā)生到第二個(gè)終端中。當(dāng)?shù)诙€(gè)終端也關(guān)閉的時(shí)候,再發(fā)送數(shù)據(jù),則是發(fā)送到最后一個(gè)終端。這里我們可以看到,故障轉(zhuǎn)移的代理主機(jī)轉(zhuǎn)移是采用順序序列的。

三、LoadBalancing RPC client

Flume Client SDK也支持在多個(gè)host之間使用負(fù)載均衡的Rpc Client。這種類型的client帶有一個(gè)通過(guò)空格分隔的host:port主機(jī)列表并構(gòu)成了一個(gè)負(fù)載均衡組。這個(gè)client可以指定一個(gè)負(fù)載均衡的策略,既可以隨機(jī)的選擇一個(gè)配置的host,也可以循環(huán)選擇一個(gè)host。當(dāng)然你也可以自己編寫(xiě)一個(gè)類實(shí)現(xiàn)LoadBalancingRpcClient$HostSelector接口以至于用戶可以使用自己編寫(xiě)的選擇順序。在這種情況下,用戶自定義的類需要被指定為host-selector屬性的值。LoadBalancing RPC Client當(dāng)前不支持thrift。

如果開(kāi)啟了backoff,那么client失敗將被放入黑名單中,只有過(guò)了被指定的超時(shí)之間之后這個(gè)被選擇的失敗的主機(jī)才會(huì)從黑名單中被排除。當(dāng)超時(shí)到了,如果主機(jī)還是沒(méi)有反應(yīng),那么這被認(rèn)為是一個(gè)連續(xù)的失敗并且超時(shí)時(shí)間會(huì)成倍的增長(zhǎng),以避免可能陷入對(duì)反應(yīng)遲鈍主機(jī)的長(zhǎng)時(shí)間等待中。

這backoff的最大超時(shí)時(shí)間可以通過(guò)maxBackoff屬性來(lái)配置,單位是毫秒。在默認(rèn)情況下maxBackoff的值是30秒(在orderSelector類里面指定)。

下面是官網(wǎng)例子

[java] view plain copy

  1. // Setup properties for the load balancing  

  2. Properties props = new Properties();  

  3. props.put("client.type""default_loadbalance");  

  4.   

  5. // List of hosts (space-separated list of user-chosen host aliases)  

  6. props.put("hosts""h2 h3 h4");  

  7.   

  8. // host/port pair for each host alias  

  9. String host1 = "host1.example.org:41414";  

  10. String host2 = "host2.example.org:41414";  

  11. String host3 = "host3.example.org:41414";  

  12. props.put("hosts.h2", host1);  

  13. props.put("hosts.h3", host2);  

  14. props.put("hosts.h4", host3);  

  15.   

  16. props.put("host-selector""random"); // For random host selection  

  17. // props.put("host-selector", "round_robin"); // For round-robin host  

  18. //                                            // selection  

  19. props.put("backoff""true"); // Disabled by default.  

  20.   

  21. props.put("maxBackoff""10000"); // Defaults 0, which effectively  

  22.                                   // becomes 30000 ms  

  23.   

  24. // Create the client with load balancing properties  

  25. RpcClient client = RpcClientFactory.getInstance(props);  

下面是測(cè)試的開(kāi)發(fā)例子

[java] view plain copy

  1. import java.nio.charset.Charset;  

  2.   

  3. import org.apache.flume.Event;  

  4. import org.apache.flume.EventDeliveryException;  

  5. import org.apache.flume.api.RpcClient;  

  6. import org.apache.flume.api.RpcClientFactory;  

  7. import org.apache.flume.event.EventBuilder;  

  8. import java.util.Properties;  

  9.   

  10. public class Load_Client {  

  11.     public static void main(String[] args) {  

  12.         MyRpcClientFacade3 client = new MyRpcClientFacade3();  

  13.         // Initialize client with the remote Flume agent's host and port  

  14.         client.init();  

  15.   

  16.         // Send 10 events to the remote Flume agent. That agent should be  

  17.         // configured to listen with an AvroSource.  

  18.         String sampleData = "Flume Load_Client";  

  19.         for (int i = 0; i < 10; i++) {  

  20.           client.sendDataToFlume(sampleData);  

  21.         }  

  22.   

  23.         client.cleanUp();  

  24.       }  

  25.     }  

  26.   

  27.     class MyRpcClientFacade3{  

  28.       private RpcClient client;  

  29.       private String hostname;  

  30.       private int port;  

  31.   

  32.       public void init() {  

  33.           Properties props = new Properties();  

  34.           props.put("client.type""default_loadbalance");  

  35.   

  36.           // List of hosts (space-separated list of user-chosen host aliases)  

  37.           props.put("hosts""h2 h3 h4");  

  38.   

  39.           // host/port pair for each host alias  

  40.           String host1 = "192.168.233.128:50000";  

  41.           String host2 = "192.168.233.128:50001";  

  42.           String host3 = "192.168.233.128:50002";  

  43.           props.put("hosts.h2", host1);  

  44.           props.put("hosts.h3", host2);  

  45.           props.put("hosts.h4", host3);  

  46.   

  47.           props.put("host-selector""random"); // For random host selection  

  48.           // props.put("host-selector", "round_robin"); // For round-robin host  

  49. //                                                    // selection  

  50.           props.put("backoff""true"); // Disabled by default.  

  51.   

  52.           props.put("maxBackoff""10000"); // Defaults 0, which effectively  

  53.                                             // becomes 30000 ms  

  54.   

  55.           // Create the client with load balancing properties  

  56.           client = RpcClientFactory.getInstance(props);  

  57.       }  

  58.   

  59.       public void sendDataToFlume(String data) {  

  60.         // Create a Flume Event object that encapsulates the sample data  

  61.         Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));  

  62.   

  63.         // Send the event  

  64.         try {  

  65.           client.append(event);  

  66.         } catch (EventDeliveryException e) {  

  67.           // clean up and recreate the client  

  68.           client.close();  

  69.           client = null;  

  70.           client = RpcClientFactory.getDefaultInstance(hostname, port);  

  71.           // Use the following method to create a thrift client (instead of the above line):  

  72.           // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  73.         }  

  74.       }  

  75.   

  76.       public void cleanUp() {  

  77.         // Close the RPC connection  

  78.         client.close();  

  79.       }  

  80. }  

這里采用隨機(jī)的負(fù)載均衡props.put("host-selector","random") 。測(cè)試的時(shí)候沿用之前的3個(gè)接受代理配置avro_client_case20.conf、avro_client_case21.conf和avro_client_case22.conf,并將他們起起來(lái)。

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console

啟動(dòng)成功后

在eclipse 里運(yùn)行JAVA程序Failover_Client.java,當(dāng)然也可以打包后在服務(wù)器上運(yùn)行JAVA程序。

#在啟動(dòng)源發(fā)送的3個(gè)代理終端查看console輸出

下面是Host1,收到了2條數(shù)據(jù)

如何實(shí)現(xiàn)Flune Client 開(kāi)發(fā)

下面是Host2,收到了2條數(shù)據(jù)

如何實(shí)現(xiàn)Flune Client 開(kāi)發(fā)

下面是Host3,收到了6條數(shù)據(jù)。

如何實(shí)現(xiàn)Flune Client 開(kāi)發(fā)

可以看到我們開(kāi)發(fā)例子中,host-selector選擇的是隨機(jī),因此程序也是隨機(jī)發(fā)送數(shù)據(jù)。下面我們測(cè)試輪詢r(jià)ound_robin選項(xiàng)。

程序里我們修改這句

//props.put("host-selector","random"); // For random host selection

props.put("host-selector", "round_robin");// Forround-robin host

再運(yùn)行Java 程序

下面是Host1,收到了4條數(shù)據(jù)

如何實(shí)現(xiàn)Flune Client 開(kāi)發(fā)

下面是Host2,收到了3條數(shù)據(jù)

如何實(shí)現(xiàn)Flune Client 開(kāi)發(fā)

同樣Host3,收到了3條數(shù)據(jù),這邊就不放圖了。輪詢就是按照順序放圖。

看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝您對(duì)億速云的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI