溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

詳解Python中API如何操作Hadoop hdfs

發(fā)布時(shí)間:2020-07-18 10:55:26 來源:億速云 閱讀:1053 作者:小豬 欄目:開發(fā)技術(shù)

小編這次要給大家分享的是詳解Python中API如何操作Hadoop hdfs,文章內(nèi)容豐富,感興趣的小伙伴可以來了解一下,希望大家閱讀完這篇文章之后能夠有所收獲。

1:安裝

由于是windows環(huán)境(linux其實(shí)也一樣),只要有pip或者setup_install安裝起來都是很方便的

>pip install hdfs

2:Client——創(chuàng)建集群連接

> from hdfs import *
> client = Client("http://s100:50070")

其他參數(shù)說明:

classhdfs.client.Client(url, root=None, proxy=None, timeout=None, session=None)

url:ip:端口

root:制定的hdfs根目錄

proxy:制定登陸的用戶身份

timeout:設(shè)置的超時(shí)時(shí)間

session:連接標(biāo)識

client = Client("http://127.0.0.1:50070",root="/",timeout=100,session=False)
>>> client.list("/")
[u'home',u'input', u'output', u'tmp']

3:dir——查看支持的方法

>dir(client)

4:status——獲取路徑的具體信息

其他參數(shù):

status(hdfs_path, strict=True)

hdfs_path:就是hdfs路徑

strict:設(shè)置為True時(shí),如果hdfs_path路徑不存在就會拋出異常,如果設(shè)置為False,如果路徑為不存在,則返回None

5:list——獲取指定路徑的子目錄信息

>client.list("/")
[u'home',u'input', u'output', u'tmp']

其他參數(shù):

list(hdfs_path, status=False)

status:為True時(shí),也返回子目錄的狀態(tài)信息,默認(rèn)為Flase

6:makedirs——創(chuàng)建目錄

>client.makedirs("/123")

其他參數(shù):makedirs(hdfs_path, permission=None)

permission:設(shè)置權(quán)限

>client.makedirs("/test",permission=777)

7: rename—重命名

>client.rename("/123","/test")

8:delete—刪除

>client.delete("/test")

其他參數(shù):

delete(hdfs_path, recursive=False)

recursive:刪除文件和其子目錄,設(shè)置為False如果不存在,則會拋出異常,默認(rèn)為False

9:upload——上傳數(shù)據(jù)

>client.upload("/test","F:\[PPT]Google Protocol Buffers.pdf");

其他參數(shù):

upload(hdfs_path, local_path, overwrite=False, n_threads=1, temp_dir=None,

chunk_size=65536,progress=None, cleanup=True, **kwargs)

overwrite:是否是覆蓋性上傳文件

n_threads:啟動的線程數(shù)目

temp_dir:當(dāng)overwrite=true時(shí),遠(yuǎn)程文件一旦存在,則會在上傳完之后進(jìn)行交換

chunk_size:文件上傳的大小區(qū)間

progress:回調(diào)函數(shù)來跟蹤進(jìn)度,為每一chunk_size字節(jié)。它將傳遞兩個(gè)參數(shù),文件上傳的路徑和傳輸?shù)淖止?jié)數(shù)。一旦完成,-1將作為第二個(gè)參數(shù)

cleanup:如果在上傳任何文件時(shí)發(fā)生錯(cuò)誤,則刪除該文件

10:download——下載

>client.download("/test/NOTICE.txt","/home")

11:read——讀取文件

withclient.read("/test/[PPT]Google Protocol Buffers.pdf") as reader:
print reader.read()

其他參數(shù):

read(*args, **kwds)

hdfs_path:hdfs路徑

offset:設(shè)置開始的字節(jié)位置

length:讀取的長度(字節(jié)為單位)

buffer_size:用于傳輸數(shù)據(jù)的字節(jié)的緩沖區(qū)的大小。默認(rèn)值設(shè)置在HDFS配置。

encoding:制定編碼

chunk_size:如果設(shè)置為正數(shù),上下文管理器將返回一個(gè)發(fā)生器產(chǎn)生的每一chunk_size字節(jié)而不是一個(gè)類似文件的對象

delimiter:如果設(shè)置,上下文管理器將返回一個(gè)發(fā)生器產(chǎn)生每次遇到分隔符。此參數(shù)要求指定的編碼。

progress:回調(diào)函數(shù)來跟蹤進(jìn)度,為每一chunk_size字節(jié)(不可用,如果塊大小不是指定)。它將傳遞兩個(gè)參數(shù),文件上傳的路徑和傳輸?shù)淖止?jié)數(shù)。稱為一次與- 1作為第二個(gè)參數(shù)。

問題:

1.

hdfs.util.HdfsError: Permission denied: user=dr.who, access=WRITE, inode="/test":root:supergroup:drwxr-xr-x

解決辦法是:在配置文件hdfs-site.xml中加入

<property> 
 <name>dfs.permissions</name> 
 <value>false</value> 
</property>

/usr/local/hadoop-2.6.4/bin/hadoopjar /usr/local/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar\-input <輸入目錄> \ # 可以指定多個(gè)輸入路徑,例如:-input '/user/foo/dir1' -input '/user/foo/dir2'

-inputformat<輸入格式 JavaClassName> \-output <輸出目錄>\-outputformat <輸出格式 JavaClassName> \-mapper <mapper executable orJavaClassName> \-reducer <reducer executable or JavaClassName>\-combiner <combiner executable or JavaClassName> \-partitioner<JavaClassName> \-cmdenv <name=value> \ # 可以傳遞環(huán)境變量,可以當(dāng)作參數(shù)傳入到任務(wù)中,可以配置多個(gè)

-file <依賴的文件> \ #配置文件,字典等依賴

-D<name=value> \ # 作業(yè)的屬性配置

Map.py:

#!/usr/local/bin/python
import sys
for line in sys.stdin:
 ss = line.strip().split(' ')
 for s in ss:
 if s.strip()!= "":
  print "%s\t%s"% (s, 1)

Reduce.py:

#!/usr/local/bin/python

import sys
current_word = None
count_pool = []
sum = 0
for line in sys.stdin:
 word, val = line.strip().split('\t')
 if current_word== None:
 current_word = word
 if current_word!= word:
 for count in count_pool:
  sum += count
 print "%s\t%s"% (current_word, sum)
 current_word = word
 count_pool = []
 sum = 0
 count_pool.append(int(val))
for count in count_pool:
 sum += count
print "%s\t%s"% (current_word, str(sum))
Run.sh:

HADOOP_CMD="/data/hadoop-2.7.0/bin/hadoop"
STREAM_JAR_PATH="/data/hadoop-2.7.0/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar"
INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
OUTPUT_PATH="/output"
$HADOOP_CMD fs -rmr-skipTrash $OUTPUT_PATH

# Step 1.

$HADOOP_CMD jar$STREAM_JAR_PATH \
 -input $INPUT_FILE_PATH_1 \
 -output $OUTPUT_PATH \
 -mapper"python map.py" \
 -reducer "pythonred.py" \
 -file ./map.py \
 -file ./red.py

目的:通過python模擬mr,計(jì)算每年的最高氣溫。

1. 查看數(shù)據(jù)文件,需要截取年份和氣溫,生成key-value對。

[tianyc@TeletekHbase python]$ cat test.dat 
0067011990999991950051507004...9999999N9+00001+99999999999... 
0043011990999991950051512004...9999999N9+00221+99999999999... 
0043011990999991950051518004...9999999N9-00111+99999999999... 
0043012650999991949032412004...0500001N9+01111+99999999999... 
0043012650999991949032418004...0500001N9+00781+99999999999...

2. 編寫map,打印key-value對

[tianyc@TeletekHbase python]$ cat map.py 
import re
import sys
for line in sys.stdin:
 val=line.strip()
 (year,temp)=(val[15:19],val[40:45])
 print "%s\t%s" % (year,temp)

[tianyc@TeletekHbase python]$ cat test.dat|python map.py 
1950 +0000
1950 +0022
1950 -0011
1949 +0111
1949 +0078

3. 將結(jié)果排序

[tianyc@TeletekHbase python]$ cat test.dat|python map.py |sort
1949 +0078
1949 +0111
1950 +0000
1950 -0011
1950 +0022

4. 編寫redurce,對map中間結(jié)果進(jìn)行處理,生成最終結(jié)果

[tianyc@TeletekHbase python]$ cat red.py 
import sys
(last_key,max_val)=(None,0)
for line in sys.stdin:
 (key,val)=line.strip().split('\t')
 if last_key and last_key!=key:
 print '%s\t%s' % (last_key, max_val)
 (last_key, max_val)=(key,int(val))
else:
 (last_key, max_val)=(key,max(max_val,int(val)))
if last_key:
 print '%s\t%s' % (last_key, max_val)

5. 執(zhí)行。

[tianyc@TeletekHbase python]$ cat test.dat|python map.py |sort|python red.py 
1949 111
1950 22

使用python語言進(jìn)行MapReduce程序開發(fā)主要分為兩個(gè)步驟,一是編寫程序,二是用Hadoop Streaming命令提交任務(wù)。

還是以詞頻統(tǒng)計(jì)為例

一、程序開發(fā)

1、Mapper

for line in sys.stdin:
 filelds = line.strip.split(' ')
 for item in fileds:
 print item+' '+'1'

2、Reducer

import sys
result={}
for line in sys.stdin:
 kvs = line.strip().split(' ')
 k = kvs[0]
 v = kvs[1]
 if k in result:
  result[k]+=1
 else:
  result[k] = 1
 for k,v in result.items():
 print k+' '+v
....

寫完發(fā)現(xiàn)其實(shí)只用map就可以處理了...reduce只用cat就好了

3、運(yùn)行腳本

1)Streaming簡介

Hadoop的MapReduce和HDFS均采用Java進(jìn)行實(shí)現(xiàn),默認(rèn)提供Java編程接口,用戶通過這些編程接口,可以定義map、reduce函數(shù)等等?!?/p>

但是如果希望使用其他語言編寫map、reduce函數(shù)怎么辦呢?

Hadoop提供了一個(gè)框架Streaming,Streaming的原理是用Java實(shí)現(xiàn)一個(gè)包裝用戶程序的MapReduce程序,該程序負(fù)責(zé)調(diào)用hadoop提供的Java編程接口。

2)運(yùn)行命令

/.../bin/hadoop streaming
-input /..../input
-output /..../output
-mapper "mapper.py"
-reducer "reducer.py"
-file mapper.py
-file reducer.py
-D mapred.job.name ="wordcount"
-D mapred.reduce.tasks = "1"

3)Streaming常用命令

(1)-input <path>:指定作業(yè)輸入,path可以是文件或者目錄,可以使用*通配符,-input選項(xiàng)可以使用多次指定多個(gè)文件或目錄作為輸入。

(2)-output <path>:指定作業(yè)輸出目錄,path必須不存在,而且執(zhí)行作業(yè)的用戶必須有創(chuàng)建該目錄的權(quán)限,-output只能使用一次。

(3)-mapper:指定mapper可執(zhí)行程序或Java類,必須指定且唯一。

(4)-reducer:指定reducer可執(zhí)行程序或Java類,必須指定且唯一。

(5)-file, -cacheFile, -cacheArchive:分別用于向計(jì)算節(jié)點(diǎn)分發(fā)本地文件、HDFS文件和HDFS壓縮文件,具體使用方法參考文件分發(fā)與打包。

(6)numReduceTasks:指定reducer的個(gè)數(shù),如果設(shè)置-numReduceTasks 0或者-reducer NONE則沒有reducer程序,mapper的輸出直接作為整個(gè)作業(yè)的輸出。

(7)-jobconf | -D NAME=VALUE:指定作業(yè)參數(shù),NAME是參數(shù)名,VALUE是參數(shù)值,可以指定的參數(shù)參考hadoop-default.xml。

-jobconf mapred.job.name='My Job Name'設(shè)置作業(yè)名

-jobconf mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW設(shè)置作業(yè)優(yōu)先級

-jobconf mapred.job.map.capacity=M設(shè)置同時(shí)最多運(yùn)行M個(gè)map任務(wù)

-jobconf mapred.job.reduce.capacity=N設(shè)置同時(shí)最多運(yùn)行N個(gè)reduce任務(wù)

-jobconf mapred.map.tasks 設(shè)置map任務(wù)個(gè)數(shù)

-jobconf mapred.reduce.tasks 設(shè)置reduce任務(wù)個(gè)數(shù)

-jobconf mapred.compress.map.output 設(shè)置map的輸出是否壓縮

-jobconf mapred.map.output.compression.codec 設(shè)置map的輸出壓縮方式

-jobconf mapred.output.compress 設(shè)置reduce的輸出是否壓縮

-jobconf mapred.output.compression.codec 設(shè)置reduce的輸出壓縮方式

-jobconf stream.map.output.field.separator 設(shè)置map輸出分隔符

例子:

-D stream.map.output.field.separator=: \ 以冒號進(jìn)行分隔

-D stream.num.map.output.key.fields=2 \ 指定在第二個(gè)冒號處進(jìn)行分隔,也就是第二個(gè)冒號之前的作為key,之后的作為value

(8)-combiner:指定combiner Java類,對應(yīng)的Java類文件打包成jar文件后用-file分發(fā)。

(9)-partitioner:指定partitioner Java類,Streaming提供了一些實(shí)用的partitioner實(shí)現(xiàn),參考KeyBasedFiledPartitoner和IntHashPartitioner。

(10)-inputformat, -outputformat:指定inputformat和outputformat Java類,用于讀取輸入數(shù)據(jù)和寫入輸出數(shù)據(jù),分別要實(shí)現(xiàn)InputFormat和OutputFormat接口。如果不指定,默認(rèn)使用TextInputFormat和TextOutputFormat。

(11)cmdenv NAME=VALUE:給mapper和reducer程序傳遞額外的環(huán)境變量,NAME是變量名,VALUE是變量值。

(12)-mapdebug, -reducedebug:分別指定mapper和reducer程序失敗時(shí)運(yùn)行的debug程序。

(13)-verbose:指定輸出詳細(xì)信息,例如分發(fā)哪些文件,實(shí)際作業(yè)配置參數(shù)值等,可以用于調(diào)試。

看完這篇關(guān)于詳解Python中API如何操作Hadoop hdfs的文章,如果覺得文章內(nèi)容寫得不錯(cuò)的話,可以把它分享出去給更多人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI