溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

flume中hdfssink如何自定義EventSerializer序列化類

發(fā)布時間:2021-12-09 09:56:38 來源:億速云 閱讀:229 作者:小新 欄目:云計算

這篇文章將為大家詳細講解有關flume中hdfssink如何自定義EventSerializer序列化類,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

    因為之前做了hbasesink的序列化類,覺得寫hdfs的應該會很簡單,可是沒想到竟然不一樣。hdfs并沒有直接配置序列化類的選項需要根據(jù)fileType來選擇對相應序列化類,我們使用的datastream的類型,對應的類是HDFSDataStream,這個類默認的序列化類TEXT(這是個枚舉類型)

serializerType = context.getString("serializer", "TEXT");

枚舉的類如下:

public enum EventSerializerType {
  TEXT(BodyTextEventSerializer.Builder.class),
  HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class),
  AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class),
  CUSTOM(CUSTOMEventSerializer.Builder.class),//自定義的序列化類
  OTHER(null);

  private final Class<? extends EventSerializer.Builder> builderClass;

  EventSerializerType(Class<? extends EventSerializer.Builder> builderClass) {
    this.builderClass = builderClass;
  }

  public Class<? extends EventSerializer.Builder> getBuilderClass() {
    return builderClass;
  }

}

在里面加了自定義的類型和枚舉,在配置agent的時候配置好filetype和serializer即可,同樣需要編譯上傳。

自定義的序列化類如下:

public class CUSTOMEventSerializer implements EventSerializer {
	private final static Logger logger = LoggerFactory.getLogger(CUSTOMEventSerializer.class);
	private final String SPLITCHAR = "\001";//列分隔符
	// for legacy reasons, by default, append a newline to each event written
	// out
	private final String APPEND_NEWLINE = "appendNewline";
	private final boolean APPEND_NEWLINE_DFLT = true;

	private final OutputStream out;
	private final boolean appendNewline;

	private CUSTOMEventSerializer(OutputStream out, Context ctx) {
		this.appendNewline = ctx.getBoolean(APPEND_NEWLINE, APPEND_NEWLINE_DFLT);
		this.out = out;
	}

	@Override
	public boolean supportsReopen() {
		return true;
	}

	@Override
	public void afterCreate() {
		// noop
	}

	@Override
	public void afterReopen() {
		// noop
	}

	@Override
	public void beforeClose() {
		// noop
	}

	@Override
	public void write(Event e) throws IOException {
		// 獲取日志信息
		String log = new String(e.getBody(), StandardCharsets.UTF_8);
		logger.info("-----------logs-------" + log);
		// headers包含日志中項目編號和host信息
		Map<String, String> headers = e.getHeaders();
		String parsedLog = parseJson2Value(log, headers);
		out.write(parsedLog.getBytes());
		logger.info("-----------values-------" + parsedLog);
		logger.info("-----------valueSSSSSS-------" + parsedLog.getBytes());
		out.write('\n');
	}
	/**
	 * 
	 * @Title: parseJson2Value 
	 * @Description: 解析出json日志中的value。 
	 * @param log json格式日志
	 * @param headers event頭信息
	 * @return  
	 * @return String 解析后的日志
	 * @throws
	 */
	private String parseJson2Value(String log, Map<String, String> headers) {
		log.replace("\\", "/");
		String time = "";
		String path = "";
		Object value = "";
		StringBuilder values = new StringBuilder();
		ObjectMapper objectMapper = new ObjectMapper();
		try {
			Map<String,Object> m = objectMapper.readValue(log, Map.class);
			for(String key:m.keySet()){
				value = m.get(key);
				if (key.equals("uri")){
					//解析訪問路徑
					path = pasreUriToPath(value.toString());
				}
				if(key.equals("time")){
					time = value.toString().substring(10);
				}
				values.append(value).append(this.SPLITCHAR);
			}
		} catch (JsonParseException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JsonMappingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		// 解析headers中的項目編號和服務host
		String pcode = headers.get("pcode");
		String host = headers.get("host");
		values.append(path).append(this.SPLITCHAR).
		append(pcode).append(this.SPLITCHAR).
		append(host).append(this.SPLITCHAR).
		append(time).append(this.SPLITCHAR);
		//value字符串
		return values.toString();
	}

	@Override
	public void flush() throws IOException {
		// noop
	}

	public static class Builder implements EventSerializer.Builder {

		@Override
		public EventSerializer build(Context context, OutputStream out) {
			CUSTOMEventSerializer s = new CUSTOMEventSerializer(out, context);
			return s;
		}

	}
	/**
	 * 把請求uri轉換成具體的訪問路徑
	 * 
	 * @param uri 請求uri
	 * @return   訪問路徑
	 */
	protected String pasreUriToPath(String uri){
		if(uri == null || "".equals(uri.trim())){
			return uri;
		}
		int index = uri.indexOf("/");
		if(index > -1){
			uri = uri.substring(index);
		}
		index = uri.indexOf("?");
		if(index > -1){
			uri = uri.substring(0, index);
		}
		index = uri.indexOf(";");
		if(index > -1){
			uri = uri.substring(0, index);
		}
		index = uri.indexOf(" HTTP/1.1");
		if(index > -1){
			uri = uri.substring(0, index);
		}
		index = uri.indexOf("HTTP/1.1");
		if(index > -1){
			uri = uri.substring(0, index);
		}
		return uri;
	}
}

關于“flume中hdfssink如何自定義EventSerializer序列化類”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。

AI