溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

ES學(xué)習(xí)筆記-elasticsearch-hadoop導(dǎo)入hive數(shù)據(jù)到es的實現(xiàn)探究

發(fā)布時間:2020-08-25 15:35:31 來源:網(wǎng)絡(luò) 閱讀:7002 作者:sbp810050504 欄目:大數(shù)據(jù)

各個業(yè)務(wù)數(shù)據(jù)“匯總到hive, 經(jīng)過ETL處理后, 導(dǎo)出到數(shù)據(jù)庫“是大數(shù)據(jù)產(chǎn)品的典型業(yè)務(wù)流程。這其中,sqoop(離線)和kafka(實時)幾乎是數(shù)據(jù)總線的標配了。

但是有些業(yè)務(wù)也有不標準的,比如hive數(shù)據(jù)導(dǎo)入到ES. hive數(shù)據(jù)導(dǎo)入到ES, 官方組件是elasticsearch-hadoop. 其用法在前面的博客中已有介紹。 那么其實現(xiàn)原理是怎樣的呢? 或者說, es-hadoop這家伙到底是怎么把hive表的數(shù)據(jù)弄到es中去的? 為了弄清楚這個問題, 我們首先需要有一個本地的源碼環(huán)境。

s1: 下載elasticsearch-hadoop源碼。

git clone https://github.com/elastic/elasticsearch-hadoop.git

s2: 編譯源碼。直接編譯master即可。

gradlew distZip

s3: 編譯成功后,導(dǎo)入到intellij。 這里注意導(dǎo)入build.gradle文件,就像maven項目導(dǎo)入pom文件一樣。

s4: 在intellij中編譯一次項目。

s5: 在本地啟動一個es, 默認的端口即可。

s6: 運行測試用例AbstractHiveSaveTest.testBasicSave()。 直接運行是會報錯的, 需要略微修改一下代碼,添加一個類的屬性:

    @Cla***ule
    public static ExternalResource hive = HiveSuite.hive;

如果是在windows環(huán)境下,需要新建packageorg.apache.hadoop.io.nativeio, 然后在該package下建立NativeIO.java類。 修改代碼如下:

// old
    public static boolean access(String path, Acce***ight desiredAccess)
        throws IOException {
      return access0(path, desiredAccess.acce***ight());
    }

// new 
    public static boolean access(String path, Acce***ight desiredAccess)
        throws IOException {
      return true;
    }

這樣就運行起來了一個本地的hive到es的代碼??梢詃ebug,了解詳細流程了。

在elasticsearch-hadoop這個比較龐大的項目中,修改代碼也比較麻煩,因此可以單獨建立一個項目hive-shgy, 然后改造這個測試類, 跑通testBasicSave()。

由于對gradle不熟悉, 還是建立maven項目, 項目的依賴如下:

    <repositories>
        <repository>
            <id>spring-libs</id>
            <url>http://repo.spring.io/libs-milestone/</url>
        </repository>
    </repositories>
    <dependencies>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-1.2-api</artifactId>
            <version>2.6.2</version>
            <scope>test</scope>
        </dependency>

        <dependency>     <!-- 橋接:告訴Slf4j使用Log4j2 -->
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.6.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.6</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-cli</artifactId>
            <version>1.2.1</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-slf4j-impl</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-slf4j-impl</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop</artifactId>
            <version>6.3.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

這里用到了log4j2, 所以日志類放在前面。

接下來遷移測試代碼。遷移的原則是 若無必要,不新增類。 如果只用到了類的一個方法,那么只遷移一個方法。 這里的測試代碼遷移,其實就是圍繞HiveEmbeddedServer2來構(gòu)建的。個人感覺這里比較巧妙的是,通過HiveEmbeddedServer2啟動了一個嵌入式的hive實例。能夠執(zhí)行hive sql, 而且是在一個jvm中,對于研究hive的實現(xiàn)原理來說,太酷了。

基礎(chǔ)的環(huán)境搭建好后,就可以研究elasticsearch-hadoop的源碼了, 先看源碼的結(jié)構(gòu):

elasticsearch-hadoop/hive/src/main/java/org/elasticsearch/hadoop/hive$ tree .
.
├── EsHiveInputFormat.java
├── EsHiveOutputFormat.java
├── EsSerDe.java
├── EsStorageHandler.java
├── HiveBytesArrayWritable.java
├── HiveBytesConverter.java
├── HiveConstants.java
├── HiveFieldExtractor.java
├── HiveType.java
├── HiveUtils.java
├── HiveValueReader.java
├── HiveValueWriter.java
├── HiveWritableValueWriter.java
└── package-info.java

0 directories, 14 files

這里簡要描述一下elasticsearch-hadoop將hive數(shù)據(jù)同步到es的原理, Hive開放了StorageHandler的接口。通過StoreageHandler, 可以使用SQL將數(shù)據(jù)寫入到es,同時也可以使用SQL讀取ES中的數(shù)據(jù)。 所以, 整個es-hive, 其入口類為EsStorageHandler, 這就是整個功能的框架。 了解了EsStorageHandler后,接下來很重要的一個類就是EsSerDe, 是序列化反序列化的功能組件。它是一個橋梁,通過它實現(xiàn)ES數(shù)據(jù)類型和Hive數(shù)據(jù)類型的轉(zhuǎn)換。 核心類就是這兩個了。

了解了代碼的原理及結(jié)構(gòu),就可以自己仿照實現(xiàn)hive數(shù)據(jù)同步到mongo, hive數(shù)據(jù)同步到redis 等其他的功能了。 這樣做的好處是業(yè)務(wù)無關(guān), 一次開發(fā),多次使用。方便管理維護。

最后總結(jié)一下,本文沒有直接給出答案, 而是記錄了尋找答案的過程。 通過這個過程,學(xué)會將hive數(shù)據(jù)同步到其他NoSQL中,這個實踐比理解源碼更重要。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI