溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何使用Python實(shí)現(xiàn)Hadoop MapReduce程序

發(fā)布時(shí)間:2021-11-10 18:47:51 來源:億速云 閱讀:207 作者:柒染 欄目:云計(jì)算

如何使用Python實(shí)現(xiàn)Hadoop MapReduce程序,相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。

筆者的機(jī)器運(yùn)行效果如下(輸入數(shù)據(jù)是find的幫助手冊(cè),和筆者預(yù)期一樣,the是最多的):如何使用Python實(shí)現(xiàn)Hadoop MapReduce程序

--------------------------------------以下是原帖---------------------------------

  • 在這個(gè)實(shí)例中,我將會(huì)向大家介紹如何使用Python 為 Hadoop編寫一個(gè)簡(jiǎn)單的MapReduce

    程序。
    盡管Hadoop 框架是使用Java編寫的但是我們?nèi)匀恍枰褂孟馛++、Python等語言來實(shí)現(xiàn) Hadoop程序。盡管Hadoop官方網(wǎng)站給的示例程序是使用Jython編寫并打包成Jar文件,這樣顯然造成了不便,其實(shí),不一定非要這樣來實(shí)現(xiàn),我們可以使用Python與Hadoop 關(guān)聯(lián)進(jìn)行編程,看看位于/src/examples/python/WordCount.py  的例子,你將了解到我在說什么。

    我們想要做什么?

    我們將編寫一個(gè)簡(jiǎn)單的 MapReduce 程序,使用的是C-Python,而不是Jython編寫后打包成jar包的程序。
    我們的這個(gè)例子將模仿 WordCount 并使用Python來實(shí)現(xiàn),例子通過讀取文本文件來統(tǒng)計(jì)出單詞的出現(xiàn)次數(shù)。結(jié)果也以文本形式輸出,每一行包含一個(gè)單詞和單詞出現(xiàn)的次數(shù),兩者中間使用制表符來想間隔。

    先決條件

    編寫這個(gè)程序之前,你學(xué)要架設(shè)好Hadoop 集群,這樣才能不會(huì)在后期工作抓瞎。如果你沒有架設(shè)好,那么在后面有個(gè)簡(jiǎn)明教程來教你在Ubuntu Linux 上搭建(同樣適用于其他發(fā)行版linux、unix)

    如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立單節(jié)點(diǎn)的 Hadoop 集群

    如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立多節(jié)點(diǎn)的 Hadoop 集群


    Python的MapReduce代碼

    使用Python編寫MapReduce代碼的技巧就在于我們使用了 HadoopStreaming 來幫助我們?cè)贛ap 和 Reduce間傳遞數(shù)據(jù)通過STDIN (標(biāo)準(zhǔn)輸入)和STDOUT (標(biāo)準(zhǔn)輸出).我們僅僅使用Python的sys.stdin來輸入數(shù)據(jù),使用sys.stdout輸出數(shù)據(jù),這樣做是因?yàn)镠adoopStreaming會(huì)幫我們辦好其他事。這是真的,別不相信!

    Map: mapper.py


    將下列的代碼保存在/home/hadoop/mapper.py中,他將從STDIN讀取數(shù)據(jù)并將單詞成行分隔開,生成一個(gè)列表映射單詞與發(fā)生次數(shù)的關(guān)系:
    注意:要確保這個(gè)腳本有足夠權(quán)限(chmod +x /home/hadoop/mapper.py)。

    #!/usr/bin/env python
     
    import sys
     
    # input comes from STDIN (standard input)
    for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()
        # split the line into words
        words = line.split()
        # increase counters
        for word in words:
            # write the results to STDOUT (standard output);
            # what we output here will be the input for the
            # Reduce step, i.e. the input for reducer.py
            #
            # tab-delimited; the trivial word count is 1
            print '%s\\t%s' % (word, 1)

    在這個(gè)腳本中,并不計(jì)算出單詞出現(xiàn)的總數(shù),它將輸出 "<word> 1">

    Reduce: reducer.py


    將代碼存儲(chǔ)在/home/hadoop/reducer.py 中,這個(gè)腳本的作用是從mapper.py 的STDIN中讀取結(jié)果,然后計(jì)算每個(gè)單詞出現(xiàn)次數(shù)的總和,并輸出結(jié)果到STDOUT。
    同樣,要注意腳本權(quán)限:chmod +x /home/hadoop/reducer.py

    #!/usr/bin/env python
     
    from operator import itemgetter
    import sys
     
    # maps words to their counts
    word2count = {}
     
    # input comes from STDIN
    for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()
     
        # parse the input we got from mapper.py
        word, count = line.split('\\t', 1)
        # convert count (currently a string) to int
        try:
            count = int(count)
            word2count[word] = word2count.get(word, 0) + count
        except ValueError:
            # count was not a number, so silently
            # ignore/discard this line
            pass
     
    # sort the words lexigraphically;
    #
    # this step is NOT required, we just do it so that our
    # final output will look more like the official Hadoop
    # word count examples
    sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
     
    # write the results to STDOUT (standard output)
    for word, count in sorted_word2count:
        print '%s\\t%s'% (word, count)


    測(cè)試你的代碼(cat data | map | sort | reduce)


    我建議你在運(yùn)行MapReduce job測(cè)試前嘗試手工測(cè)試你的mapper.py 和 reducer.py腳本,以免得不到任何返回結(jié)果
    這里有一些建議,關(guān)于如何測(cè)試你的Map和Reduce的功能:

    ——————————————————————————————————————————————

    \r\n

     # very basic test
     hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py
     foo     1
     foo     1
     quux    1
     labs    1
     foo     1
     bar     1
    ——————————————————————————————————————————————
     hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py | sort | /home/hadoop/reducer.py
     bar     1
     foo     3
     labs    1
    ——————————————————————————————————————————————
    
     # using on[object Object]e of the ebooks as example input
     # (see below on where to get the ebooks)
     hadoop@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hadoop/mapper.py
     The     1
     Project 1
     Gutenberg       1
     EBook   1
     of      1
     [...] 
     (you get the idea)
    
     quux    2
    
     quux    1


    ——————————————————————————————————————————————
    
    
    
    為了這個(gè)例子,我們將需要三種電子書:

    下載他們,并使用us-ascii編碼存儲(chǔ) 解壓后的文件,保存在臨時(shí)目錄,比如/tmp/gutenberg.

     hadoop@ubuntu:~$ ls -l /tmp/gutenberg/
     total 3592
     -rw-r--r-- 1 hadoop hadoop  674425 2007-01-22 12:56 20417-8.txt
     -rw-r--r-- 1 hadoop hadoop 1423808 2006-08-03 16:36 7ldvc10.txt
     -rw-r--r-- 1 hadoop hadoop 1561677 2004-11-26 09:48 ulyss12.txt
     hadoop@ubuntu:~$
    在我們運(yùn)行MapReduce job 前,我們需要將本地的文件復(fù)制到HDFS中:
    
     hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg gutenberg
     hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
     Found 1 items
     /user/hadoop/gutenberg  <dir>
     hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg
     Found 3 items
     /user/hadoop/gutenberg/20417-8.txt      <r 1>   674425
     /user/hadoop/gutenberg/7ldvc10.txt      <r 1>   1423808
     /user/hadoop/gutenberg/ulyss12.txt      <r 1>   1561677
    
    
    
    現(xiàn)在,一切準(zhǔn)備就緒,我們將在運(yùn)行Python MapReduce job 在Hadoop集群上。像我上面所說的,我們使用的是
     幫助我們傳遞數(shù)據(jù)在Map和Reduce間并通過STDIN和STDOUT,進(jìn)行標(biāo)準(zhǔn)化輸入輸出。
    
     
     hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
     -mapper /home/hadoop/mapper.py -reducer /home/hadoop/reducer.py -input gutenberg/* 
    -output gutenberg-output
    在運(yùn)行中,如果你想更改Hadoop的一些設(shè)置,如增加Reduce任務(wù)的數(shù)量,你可以使用“-hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar 
     -mapper ...
    
    一個(gè)重要的備忘是關(guān)于 
    這個(gè)任務(wù)將會(huì)讀取HDFS目錄下的HDFS目錄下的
    目錄。
    之前執(zhí)行的結(jié)果如下:
    
    hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar 
    -mapper /home/hadoop/mapper.py -reducer /home/hadoop/reducer.py -input gutenberg/* 
    -output gutenberg-output
     
    additionalConfSpec_:null
     null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
     packageJobJar: [/usr/local/hadoop-datastore/hadoop-hadoop/hadoop-unjar54543/]
     [] /tmp/streamjob54544.jar tmpDir=null
     [...] INFO mapred.FileInputFormat: Total input paths to process : 7
     [...] INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-hadoop/mapred/local]
     [...] INFO streaming.StreamJob: Running job: job_200803031615_0021
     [...]
     [...] INFO streaming.StreamJob:  map 0%  reduce 0%
     [...] INFO streaming.StreamJob:  map 43%  reduce 0%
     [...] INFO streaming.StreamJob:  map 86%  reduce 0%
     [...] INFO streaming.StreamJob:  map 100%  reduce 0%
     [...] INFO streaming.StreamJob:  map 100%  reduce 33%
     [...] INFO streaming.StreamJob:  map 100%  reduce 70%
     [...] INFO streaming.StreamJob:  map 100%  reduce 77%
     [...] INFO streaming.StreamJob:  map 100%  reduce 100%
     [...] INFO streaming.StreamJob: Job complete: job_200803031615_0021
    
    
     [...] INFO streaming.StreamJob: Output: gutenberg-output  hadoop@ubuntu:/usr/local/hadoop$ 
    
    
    正如你所見到的上面的輸出結(jié)果,Hadoop 同時(shí)還提供了一個(gè)基本的WEB接口顯示統(tǒng)計(jì)結(jié)果和信息。
    當(dāng)Hadoop集群在執(zhí)行時(shí),你可以使用瀏覽器訪問   ,如圖:
    
    
    
    
    檢查結(jié)果是否輸出并存儲(chǔ)在HDFS目錄下的中:
    
     hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg-output
     Found 1 items
     /user/hadoop/gutenberg-output/part-00000     <r 1>   903193  2007-09-21 13:00
     hadoop@ubuntu:/usr/local/hadoop$ 
    
    可以使用 命令檢查文件目錄
    
     hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat gutenberg-output/part-00000
     "(Lo)cra"       1
     "1490   1
     "1498," 1
     "35"    1
     "40,"   1
     "A      2
     "AS-IS".        2
     "A_     1
     "Absoluti       1
     [...]
     hadoop@ubuntu:/usr/local/hadoop$
    
    注意比輸出,上面結(jié)果的(")符號(hào)不是Hadoop插入的。

看完上述內(nèi)容,你們掌握如何使用Python實(shí)現(xiàn)Hadoop MapReduce程序的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI