您好,登錄后才能下訂單哦!
[TOC]
1)客戶端通過本地通過RPC與namenode建立rpc通信,然后請求上傳文件
2)namenode收到請求后,會(huì)檢查是否能創(chuàng)建該文件(比如校驗(yàn)用戶是否有該權(quán)限,文件是否已經(jīng)存在等)。如果檢查通過,namenode就會(huì)開始記錄該新文件的元信息(先寫入到edits文件,然后更新內(nèi)存中的metadata),并響應(yīng)client可以開始上傳。
3)client 在本地將文件進(jìn)行切塊(按照指定的block大?。?。然后請求namemode上傳第一個(gè)block。
4)namenode根據(jù)策略以及每個(gè)datanode的情況,返回3個(gè)datanode地址給client(這里默認(rèn)3副本)。
5)client與請求namenode返回的3個(gè)datanode建立pipeline,即 client請求dn1,dn1請求dn2,dn2請求dn3,這樣一個(gè)串行通道。
6)3個(gè)datanode逐級響應(yīng),最終響應(yīng)給client。表示可以傳輸數(shù)據(jù)
7)client會(huì)將每個(gè)block還會(huì)分割成一個(gè)個(gè)packet,然后放入 data queue中,等待上傳。每傳輸一個(gè)packet,就會(huì)將packet加入到另外一個(gè) ack queue中,等到pipeline中的datanode響應(yīng)傳輸完成后,就會(huì)講相應(yīng)的packet從ack queue中移除。
8)后面就是重復(fù)上面的流程,直到client關(guān)閉通道,并將所有的queue中的packet刷寫到pipeline中之后,datanode就會(huì)標(biāo)記文件已完成。
注意:client完成寫入之后,此時(shí)block 才是可見的,正在寫的block是不可見的。當(dāng)調(diào)用sync方法時(shí)(將緩沖區(qū)數(shù)據(jù)刷寫到磁盤中),client才確認(rèn)寫入已經(jīng)完成。client關(guān)閉流時(shí)調(diào)用 的close方法,底層就會(huì)調(diào)用sync。是否需要手動(dòng)調(diào)用取決你根據(jù)程序需 要在數(shù)據(jù)健壯性和吞吐率之間的權(quán)衡。
問題:傳輸過程中,某個(gè)datanode發(fā)生錯(cuò)誤,hdfs是怎么解決?
1)pipeline關(guān)閉掉
2)為了防止丟包,ack queue中的packet會(huì)同步到data queue中。重新進(jìn)行下一次傳輸。
3)把產(chǎn)生錯(cuò)誤的datanode上當(dāng)前在寫,但未完成的block刪除掉
4)剩下的block寫到剩余兩個(gè)正常的datanode中。
5)namenode會(huì)自動(dòng)尋找另外合適的一個(gè)datanode復(fù)制另外兩個(gè)datanode中刷寫的block,完成3副本的寫入。當(dāng)然,這個(gè)操作namenode的內(nèi)部機(jī)制,對client來說是無感知的。
namenode使用兩種文件保存元數(shù)據(jù),fsimag和edits文件。
fsimage:元數(shù)據(jù)鏡像文件,存儲(chǔ)某一時(shí)間段內(nèi)的namenode的內(nèi)存元數(shù)據(jù)信息
edits:操作日志文件。
fstime:保存最近一次checkpoint的時(shí)間。
更詳細(xì)的 fsimage和edits文件講解,請看 “hdfs體系架構(gòu)”
? namenode所有的元數(shù)據(jù)信息從啟動(dòng)時(shí)就已經(jīng)全部加載到內(nèi)存中(為了提高查詢性能),用于處理讀請求的查詢操作。到有寫操作時(shí),namenode會(huì)先向edits文件中寫入操作日志,完成后才會(huì)修改內(nèi)存中的metadata,這個(gè)主要是保證元數(shù)據(jù)已經(jīng)存儲(chǔ)到磁盤中不丟失。
? hdfs內(nèi)部維護(hù)的fsimage文件其實(shí)就是內(nèi)存中的metadata的鏡像,但是兩者并不是實(shí)時(shí)一致的。fsimage的更新是通過合并edits來實(shí)現(xiàn)的。而這個(gè)合并操作是 secondaryNameNode完成的,主要流程如下:
1)首先是 SNN通知 NN切換edits文件,主要是保證合并過程有新的寫入操作時(shí)能夠正常寫入edits文件。
2)SNN通過http請求從NN獲取 fsimage和edits文件。
3)SNN將fsiamge載入內(nèi)存,開始合并edits到fsimage,生成新的fsimage
4)SNN將新的fsimage發(fā)送給NN
5)NN用新的fsimage,替換舊的fsimage。
? 寫入操作時(shí),默認(rèn)3副本,那么副本分布在哪些datanode節(jié)點(diǎn)上,會(huì)影響寫入速度。在hdfs的網(wǎng)絡(luò)拓?fù)渲校心敲此姆N物理范圍,同一節(jié)點(diǎn)、同一機(jī)架上的不同節(jié)點(diǎn)、同一機(jī)房中不同節(jié)點(diǎn)、不同機(jī)房中的不同節(jié)點(diǎn)。這4中物理范圍表示節(jié)點(diǎn)間的距離逐漸增大。這種物理距離越遠(yuǎn)會(huì)影響副本之間所在節(jié)點(diǎn)之間的傳輸效率,即傳輸效率越低。
上面說到副本的選擇的節(jié)點(diǎn)的位置會(huì)影響寫效率,那么hdfs是如何選擇節(jié)點(diǎn)位置的。
(1)舊版本的方式
路徑是 r1/n1 --> r2/n1 --> r2/n2
(2)新版本方式
路徑是 r1/n1 --> r1/n2 --> r2/n2(后面這個(gè)其實(shí)任意都行,主要處于不同機(jī)架就好)
這種方式比第一種要好,因?yàn)檫@種方式數(shù)據(jù)經(jīng)過的總路徑更短了,只要一個(gè)副本需要跨機(jī)架傳輸,而上面的則有兩個(gè)副本需要跨機(jī)架傳輸。
下面的分析過程基于 hadoop2.8.4 的源碼分析的。
一般來說,會(huì)先通過 FileSystem.get() 獲取到操作hdfs 的客戶端對象,后面所有的操作都通過調(diào)用該對象的方法完成的。
FileSystem client = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf);
接著我們看看 FileSystem.get() 的實(shí)現(xiàn)
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) {
return get(conf);
} else {
if (scheme != null && authority == null) {
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
return get(defaultUri, conf);
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
/*
這里是關(guān)鍵代碼,表示進(jìn)入 CACHE.get() 方法
*/
return conf.getBoolean(disableCacheName, false) ? createFileSystem(uri, conf) : CACHE.get(uri, conf);
}
}
CACHE是FileSystem的一個(gè)靜態(tài)內(nèi)部類Cache 的對象。繼續(xù)看看 CACHE.get()方法
FileSystem get(URI uri, Configuration conf) throws IOException {
FileSystem.Cache.Key key = new FileSystem.Cache.Key(uri, conf);
//進(jìn)入CACHE對象的 getInternal() 方法
return this.getInternal(uri, conf, key);
}
進(jìn)入CACHE對象的 getInternal() 方法
private FileSystem getInternal(URI uri, Configuration conf, FileSystem.Cache.Key key) throws IOException {
FileSystem fs;
synchronized(this) {
/*
獲取map中的filesytem對象,表示之前已經(jīng)初始化了filesystem對象,并存儲(chǔ)到map集合中,現(xiàn)在直接從map中獲取就好。
*/
fs = (FileSystem)this.map.get(key);
}
if (fs != null) {
//如果fs存在,就直接返回存在的filesytem實(shí)例即可
return fs;
} else {
//如果是初次使用filesystem,就得創(chuàng)建并初始化
fs = FileSystem.createFileSystem(uri, conf);
synchronized(this) {
FileSystem oldfs = (FileSystem)this.map.get(key);
if (oldfs != null) {
fs.close();
return oldfs;
} else {
if (this.map.isEmpty() && !ShutdownHookManager.get().isShutdownInProgress()) {
ShutdownHookManager.get().addShutdownHook(this.clientFinalizer, 10);
}
fs.key = key;
this.map.put(key, fs);
if (conf.getBoolean("fs.automatic.close", true)) {
this.toAutoClose.add(key);
}
return fs;
}
}
}
}
我們看到了上面有兩種方式,一種是如果filesytem對象已存在,那么直接從map獲取并返回對象即可。如果不存在,就調(diào)用 FileSystem.createFileSystem() 方法創(chuàng)建,創(chuàng)建完成后返回fs。下面看看這個(gè)方法.
private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {
Tracer tracer = FsTracer.get(conf);
TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
scope.addKVAnnotation("scheme", uri.getScheme());
FileSystem var6;
try {
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
//這是關(guān)鍵性的代碼,看名字就知道,對filesytem 進(jìn)行初始化
fs.initialize(uri, conf);
var6 = fs;
} finally {
scope.close();
}
return var6;
}
我們要注意,F(xiàn)ileSystem這個(gè)類是抽象類,它的實(shí)現(xiàn)子類是 DistributedFileSystem,所以雖然 fs是FileSystem類型的,但是對象本身是DistributedFileSystem類型的,也就是java 的多態(tài)特性。所以fs.initialize() 調(diào)用的實(shí)際上是 DistributedFileSystem中initialize()方法。下面看看這個(gè)方法
/*
DistributedFileSystem.class
*/
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
this.setConf(conf);
String host = uri.getHost();
if (host == null) {
throw new IOException("Incomplete HDFS URI, no host: " + uri);
} else {
this.homeDirPrefix = conf.get("dfs.user.home.dir.prefix", "/user");
//這是關(guān)鍵性代碼,創(chuàng)建了一個(gè)DFSClient對象,顧名思義就是RPC的客戶端
this.dfs = new DFSClient(uri, conf, this.statistics);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.workingDir = this.getHomeDirectory();
this.storageStatistics = (DFSOpsCountStatistics)GlobalStorageStatistics.INSTANCE.put("DFSOpsCountStatistics", new StorageStatisticsProvider() {
public StorageStatistics provide() {
return new DFSOpsCountStatistics();
}
});
}
}
看到上面創(chuàng)建了一個(gè) DFSClient() 對象,賦值給了 this.dfs。下面看看這個(gè)類的構(gòu)造方法。
/*
DFSClient.class
*/
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, Statistics stats) throws IOException {
.............................
/*源碼比較長,所以截取重要的部分顯示*/
//這是一個(gè)關(guān)鍵性變量,其實(shí)就是namenode代理對象,只不過還沒有創(chuàng)建對象
ProxyAndInfo<ClientProtocol> proxyInfo = null;
...............................
//下面開始創(chuàng)建namenode代理對象
if (proxyInfo != null) {
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = (ClientProtocol)proxyInfo.getProxy();
} else if (rpcNamenode != null) {
Preconditions.checkArgument(nameNodeUri == null);
this.namenode = rpcNamenode;
this.dtService = null;
} else {
Preconditions.checkArgument(nameNodeUri != null, "null URI");
//這里創(chuàng)建代理對象信息
proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf, nameNodeUri, nnFallbackToSimpleAuth);
this.dtService = proxyInfo.getDelegationTokenService();
//這里可以看到直接通過 proxyInfo.getProxy()獲取namenode代理對象,并將引用賦值給 this.namenode。并且類型是 ClientProtocol 類型的。
this.namenode = (ClientProtocol)proxyInfo.getProxy();
}
/*下面省略一堆代碼*/
}
可以看到上面已經(jīng)通過 this.namenode = (ClientProtocol)proxyInfo.getProxy(); 獲取到了 namenode 的代理對象,也就是rpc的客戶端對象。下面看看 ClientProtocol 這個(gè)是啥東西,因?yàn)榇韺ο笫沁@個(gè)類型的。
/*
ClientProtocol.class
這是個(gè)接口
*/
public interface ClientProtocol {
long versionID = 69L;
/*
下面主要是定義很多個(gè)抽象方法,主要就是用于對hdfs進(jìn)行操作的接口,比如,open,create等這些常用方法。
*/
}
下面看看 proxyInfo創(chuàng)建代理對象的方法
/*
NameNodeProxiesClient
*/
public static NameNodeProxiesClient.ProxyAndInfo<ClientProtocol> createProxyWithClientProtocol(Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth) throws IOException {
AbstractNNFailoverProxyProvider<ClientProtocol> failoverProxyProvider = createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, true, fallbackToSimpleAuth);
if (failoverProxyProvider == null) {
//創(chuàng)建無HA的代理對象
InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
Text dtService = SecurityUtil.buildTokenService(nnAddr);
//創(chuàng)建proxy對象
ClientProtocol proxy = createNonHAProxyWithClientProtocol(nnAddr, conf, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
//ProxyAndInfo是一個(gè)靜態(tài)內(nèi)部類,將前面的proxy通過該類封裝后返回,可通過該類的 getProxy 方法返回已創(chuàng)建的proxy對象
return new NameNodeProxiesClient.ProxyAndInfo(proxy, dtService, nnAddr);
} else {
//創(chuàng)建有HA的代理對象
return createHAProxy(conf, nameNodeUri, ClientProtocol.class, failoverProxyProvider);
}
}
可以看到上面是已經(jīng)創(chuàng)建了 proxy對象,并返回,而且我們也可以看到,創(chuàng)建的proxy對象就是clientProtocol類型的。下面看看創(chuàng)建proxy對象的方法 createNonHAProxyWithClientProtocol()
/*
NameNodeProxiesClient
*/
public static ClientProtocol createNonHAProxyWithClientProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(conf, "dfs.client.retry.policy.enabled", false, "dfs.client.retry.policy.spec", "10000,6,60000,10", SafeModeException.class.getName());
long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
//這里是核心代碼,可以明顯看到調(diào)用 RPC 模塊中的方法創(chuàng)建proxy對象
ClientNamenodeProtocolPB proxy = (ClientNamenodeProtocolPB)RPC.getProtocolProxy(ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf), defaultPolicy, fallbackToSimpleAuth).getProxy();
if (withRetries) {
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap();
ClientProtocol translatorProxy = new ClientNamenodeProtocolTranslatorPB(proxy);
return (ClientProtocol)RetryProxy.create(ClientProtocol.class, new DefaultFailoverProxyProvider(ClientProtocol.class, translatorProxy), methodNameToPolicyMap, defaultPolicy);
} else {
return new ClientNamenodeProtocolTranslatorPB(proxy);
}
}
所以至此我們可以發(fā)現(xiàn),客戶端和namenode之間通信的方式就是通過RPC實(shí)現(xiàn)的。
總結(jié)來說,方法的調(diào)用時(shí)序圖如下:
一般來說,上傳操作,首先得
OutputStream os = fs.create(new Path("xxxx"));
即創(chuàng)建文件,然后再上傳文件數(shù)據(jù)。上傳數(shù)據(jù)的流程和普通的流操作沒什么區(qū)別。
下面看看這個(gè) create方法。
/*
FileSystem.class
*/
public abstract FSDataOutputStream create(Path var1, FsPermission var2, boolean var3, int var4, short var5, long var6, Progressable var8) throws IOException;
可以看到這是個(gè)抽象方法,前面也說到,它的實(shí)現(xiàn)子類是 DistributedFileSystem,這里這里是調(diào)用子類的 create方法,繼續(xù)看
/*
DistributedFileSystem.class
*/
public FSDataOutputStream create(Path f, final FsPermission permission, final EnumSet<CreateFlag> cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException {
this.statistics.incrementWriteOps(1);
this.storageStatistics.incrementOpCounter(OpType.CREATE);
Path absF = this.fixRelativePart(f);
return (FSDataOutputStream)(new FileSystemLinkResolver<FSDataOutputStream>() {
public FSDataOutputStream doCall(Path p) throws IOException {
//這里是核心代碼,this.dfs前面說到了就是存儲(chǔ)了DFSClient對象的引用的??梢酝ㄟ^該客戶端調(diào)用很多操作hdfs的方法。這里調(diào)用create方法,創(chuàng)建了一個(gè) DFSOutputStream 對象。輸出流對象
DFSOutputStream dfsos = DistributedFileSystem.this.dfs.create(DistributedFileSystem.this.getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt);
//這里將上面創(chuàng)建的dfsos進(jìn)行包裝并返回
return DistributedFileSystem.this.dfs.createWrappedOutputStream(dfsos, DistributedFileSystem.this.statistics);
}
public FSDataOutputStream next(FileSystem fs, Path p) throws IOException {
return fs.create(p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt);
}
}).resolve(this, absF);
}
可以看見上面創(chuàng)建返回了 DFSOutputStream 輸出流對象。下面看看DFSClient.create方法的實(shí)現(xiàn)代碼。
/*
DFSClient.class
*/
public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException {
this.checkOpen();
FsPermission masked = this.applyUMask(permission);
LOG.debug("{}: masked={}", src, masked);
//創(chuàng)建輸出流對象
DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, this.dfsClientConf.createChecksum(checksumOpt), this.getFavoredNodesStr(favoredNodes));
this.beginFileLease(result.getFileId(), result);
return result;
}
繼續(xù)看 DFSOutputStream.newStreamForCreate 這個(gè)方法.
/*
DistributedFileSystem.class
*/
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException {
TraceScope ignored = dfsClient.newPathTraceScope("newStreamForCreate", src);
Throwable var12 = null;
try {
HdfsFileStatus stat = null;
boolean shouldRetry = true;
int retryCount = 10;
while(true) {
if (shouldRetry) {
shouldRetry = false;
try {
//這里是核心代碼,可以看見是調(diào)用 dfsclient.namenode這個(gè)代理對象中的create方法創(chuàng)建文件,并返回狀態(tài)
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS);
} catch (RemoteException var27) {
IOException e = var27.unwrapRemoteException(new Class[]{AccessControlException.class, DSQuotaExceededException.class, QuotaByStorageTypeExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class});
if (e instanceof RetryStartFileException) {
if (retryCount <= 0) {
throw new IOException("Too many retries because of encryption zone operations", e);
}
shouldRetry = true;
--retryCount;
continue;
}
throw e;
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
//這里將上面創(chuàng)建文件的狀態(tài)傳入輸出流作為參數(shù)
DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
//看見一個(gè)神奇的方法
out.start();
DFSOutputStream var30 = out;
//返回輸出流
return var30;
}
} catch (Throwable var28) {
var12 = var28;
throw var28;
} finally {
if (ignored != null) {
if (var12 != null) {
try {
ignored.close();
} catch (Throwable var26) {
var12.addSuppressed(var26);
}
} else {
ignored.close();
}
}
}
}
上面看到 DFSOutputStream 對象居然有一個(gè) start方法,來看看先。
/*
DFSOutputStream.class
*/
protected synchronized void start() {
this.getStreamer().start();
}
// 繼續(xù)看 this.getStreamer() 這個(gè)方法,可以看到這個(gè)方法返回的是DataStreamer,繼續(xù)看這個(gè)類
protected DataStreamer getStreamer() {
return this.streamer;
}
/*
DataStreamer.class
*/
//可以看到這個(gè)類繼承了 Daemon類,而Daemon本身是繼承了 Thread類
class DataStreamer extends Daemon { }
由此可得知,DFSOutputStream 這個(gè)類本身并沒有繼承 Thread類,但是使用DataStreamer這個(gè)繼承了 Thread類的來新建線程傳輸數(shù)據(jù),不占用當(dāng)前線程。而在 DataStreamer 這個(gè)類中,重寫了 Thread標(biāo)志性的 run 方法。傳輸數(shù)據(jù)就是在這里完成的。前面說到的 hdfs的 pipeline 也是這個(gè)run方法中實(shí)現(xiàn)的,里面是一個(gè)while死循環(huán),知道傳輸完數(shù)據(jù)為止,或者客戶端關(guān)閉。代碼過長,就不看了。反正看到這里已經(jīng)成功獲取了 client的輸出流對象,后面就是傳統(tǒng)的輸入流和輸出流的對接了,這里不細(xì)講了。
方法時(shí)序圖如下:
1、FileSystem初始化,Client拿到NameNodeRpcServer代理對象,建立與NameNode的RPC通信
2、調(diào)用FileSystem的create()方法,由于實(shí)現(xiàn)類為DistributedFileSystem,所有是調(diào)用該類中的create()方法
3、DistributedFileSystem持有DFSClient的引用,繼續(xù)調(diào)用DFSClient中的create()方法
4、DFSOutputStream提供的靜態(tài)newStreamForCreate()方法中調(diào)用NameNodeRpcServer服務(wù)端的create()方法并創(chuàng)建DFSOutputStream輸出流對象返回
5、通過hadoop提供的IOUtil工具類將輸出流輸出到本地
1、基本流程
1)客戶端向namenode請求下載文件,namenode在內(nèi)存的metadata查找對應(yīng)的文件的元數(shù)據(jù),如果無則返回?zé)o,有則返回對應(yīng)文件的block位置信息。而且,namenode會(huì)根據(jù)客戶端所在的位置,根據(jù)datanode以及client之間的距離大小,將返回的 block 的副本的datanode節(jié)點(diǎn)從距離小到大排序,距離最近的datanode則排在第一位。
2)client通過機(jī)架感知策略,選擇最近的datanode進(jìn)行block請求讀取
3)datanode開始傳輸數(shù)據(jù)給client,以packet為單位,并做校驗(yàn)
4)客戶端接收packet之后,本地緩存,然后再往本地路徑寫入該block。
5)第二塊,第三塊block重復(fù)以上過程
注意:
如果在讀數(shù)據(jù)的時(shí)候, DFSInputStream和datanode的通訊發(fā)生異常,就會(huì)嘗試正在讀的block的排序第二近的datanode,并且會(huì)記錄哪個(gè) datanode發(fā)生錯(cuò)誤,剩余的blocks讀的時(shí)候就會(huì)直接跳過該datanode。 DFSInputStream也會(huì)檢查block數(shù)據(jù)校驗(yàn)和,如果發(fā)現(xiàn)一個(gè)壞的block,就會(huì)先報(bào)告到namenode節(jié)點(diǎn),然后 DFSInputStream在其他的datanode上讀該block的鏡像。
client的初始化代碼是一樣的,這里不重復(fù)分析了。直接看下載
首先通過 open方法獲取目標(biāo)文件的輸入流對象。
FSDataInputStream fis = client.open(getPath);
下面看看這個(gè)open方法
/*
FileSystem.class
*/
public FSDataInputStream open(Path f) throws IOException {
return this.open(f, this.getConf().getInt("io.file.buffer.size", 4096));
}
public abstract FSDataInputStream open(Path var1, int var2) throws IOException;
可以看到,依舊是抽象方法,所以依舊是調(diào)用 DistributedFileSystem的open方法。
/*
DistributedFileSystem.class
*/
public FSDataInputStream open(Path f, final int bufferSize) throws IOException {
this.statistics.incrementReadOps(1);
this.storageStatistics.incrementOpCounter(OpType.OPEN);
Path absF = this.fixRelativePart(f);
return (FSDataInputStream)(new FileSystemLinkResolver<FSDataInputStream>() {
public FSDataInputStream doCall(Path p) throws IOException {
//核心代碼,這里調(diào)用dfsclient的open方法穿件輸入流
DFSInputStream dfsis = DistributedFileSystem.this.dfs.open(DistributedFileSystem.this.getPathName(p), bufferSize, DistributedFileSystem.this.verifyChecksum);
return DistributedFileSystem.this.dfs.createWrappedInputStream(dfsis);
}
public FSDataInputStream next(FileSystem fs, Path p) throws IOException {
return fs.open(p, bufferSize);
}
}).resolve(this, absF);
}
熟悉的套路,依舊調(diào)用 dfsclient的open方法,創(chuàng)建輸入流,下面看看這個(gè)open方法
/*
DFSClient.class
*/
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) throws IOException {
this.checkOpen();
TraceScope ignored = this.newPathTraceScope("newDFSInputStream", src);
Throwable var5 = null;
DFSInputStream var6;
try {
//這里直接創(chuàng)建一個(gè)輸入流對象,如果按照上面上傳文件的套路來說,應(yīng)該是 dfsclient.namenode.open(xxx)才對的,這里并沒有
var6 = new DFSInputStream(this, src, verifyChecksum, (LocatedBlocks)null);
} catch (Throwable var15) {
var5 = var15;
throw var15;
} finally {
if (ignored != null) {
if (var5 != null) {
try {
ignored.close();
} catch (Throwable var14) {
var5.addSuppressed(var14);
}
} else {
ignored.close();
}
}
}
return var6;
}
上面并沒有調(diào)用DFSClient.open,而是將DFSClient作為參數(shù)傳入DFSInputStream。下面看看 DFSInputStream 這個(gè)神奇的類。
/*
DFSInputStream.class
*/
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, LocatedBlocks locatedBlocks) throws IOException {
//將dfsclinet保存到當(dāng)前類中
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.src = src;
synchronized(this.infoLock) {
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
}
this.locatedBlocks = locatedBlocks;
//核心方法,開始獲取block信息,如有多少個(gè)block,以及每個(gè)block所在的datanode節(jié)點(diǎn)名
this.openInfo(false);
}
下面看看 openInfo() 方法
/*
DFSInputStream.class
*/
void openInfo(boolean refreshLocatedBlocks) throws IOException {
DfsClientConf conf = this.dfsClient.getConf();
synchronized(this.infoLock) {
//獲取block的位置信息以及最后一個(gè)block的長度(因?yàn)樽詈笠粋€(gè)block肯定不是完整的128M的長度)
this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
int retriesForLastBlockLength;
for(retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength(); retriesForLastBlockLength > 0 && this.lastBlockBeingWrittenLength == -1L; --retriesForLastBlockLength) {
DFSClient.LOG.warn("Last block locations not available. Datanodes might not have reported blocks completely. Will retry for " + retriesForLastBlockLength + " times");
this.waitFor(conf.getRetryIntervalForGetLastBlockLength());
this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength(true);
}
if (this.lastBlockBeingWrittenLength == -1L && retriesForLastBlockLength == 0) {
throw new IOException("Could not obtain the last block locations.");
}
}
}
下面看看 fetchLocatedBlocksAndGetLastBlockLength 這個(gè)獲取block信息的方法
/*
DFSInputStream.class
*/
private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh) throws IOException {
LocatedBlocks newInfo = this.locatedBlocks;
if (this.locatedBlocks == null || refresh) {
//可以看到這里是調(diào)用 dfsclient中的方法倆獲取block信息
newInfo = this.dfsClient.getLocatedBlocks(this.src, 0L);
}
DFSClient.LOG.debug("newInfo = {}", newInfo);
if (newInfo == null) {
throw new IOException("Cannot open filename " + this.src);
} else {
if (this.locatedBlocks != null) {
Iterator<LocatedBlock> oldIter = this.locatedBlocks.getLocatedBlocks().iterator();
Iterator newIter = newInfo.getLocatedBlocks().iterator();
while(oldIter.hasNext() && newIter.hasNext()) {
if (!((LocatedBlock)oldIter.next()).getBlock().equals(((LocatedBlock)newIter.next()).getBlock())) {
throw new IOException("Blocklist for " + this.src + " has changed!");
}
}
}
this.locatedBlocks = newInfo;
long lastBlockBeingWrittenLength = 0L;
if (!this.locatedBlocks.isLastBlockComplete()) {
LocatedBlock last = this.locatedBlocks.getLastLocatedBlock();
if (last != null) {
if (last.getLocations().length == 0) {
if (last.getBlockSize() == 0L) {
return 0L;
}
return -1L;
}
long len = this.readBlockLength(last);
last.getBlock().setNumBytes(len);
lastBlockBeingWrittenLength = len;
}
}
this.fileEncryptionInfo = this.locatedBlocks.getFileEncryptionInfo();
return lastBlockBeingWrittenLength;
}
}
看到上面又回到調(diào)用 dfsClient.getLocatedBlocks,看看這個(gè)方法
/*
DFSClient.class
*/
public LocatedBlocks getLocatedBlocks(String src, long start) throws IOException {
return this.getLocatedBlocks(src, start, this.dfsClientConf.getPrefetchSize());
}
//繼續(xù)調(diào)用下面這個(gè)方法
public LocatedBlocks getLocatedBlocks(String src, long start, long length) throws IOException {
TraceScope ignored = this.newPathTraceScope("getBlockLocations", src);
Throwable var7 = null;
LocatedBlocks var8;
try {
//調(diào)用這個(gè)靜態(tài)方法獲取 block位置信息
var8 = callGetBlockLocations(this.namenode, src, start, length);
} catch (Throwable var17) {
var7 = var17;
throw var17;
} finally {
if (ignored != null) {
if (var7 != null) {
try {
ignored.close();
} catch (Throwable var16) {
var7.addSuppressed(var16);
}
} else {
ignored.close();
}
}
}
return var8;
}
//繼續(xù)看
static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, String src, long start, long length) throws IOException {
try {
//熟悉的味道,通過 namenode 的代理對象獲取block信息
return namenode.getBlockLocations(src, start, length);
} catch (RemoteException var7) {
throw var7.unwrapRemoteException(new Class[]{AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class});
}
}
上面可以看到,仍舊是通過 namenode代理對象發(fā)起操作,下面看看 namenode.getBlockLocations。因?yàn)榇韺ο蟮念愋褪?ClientProtocol類型的,是個(gè)接口,所以得到實(shí)現(xiàn)子類中查看 ,ClientNamenodeProtocolTranslatorPB這個(gè)類。
/*
ClientNamenodeProtocolTranslatorPB.class
*/
public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException {
GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto.newBuilder().setSrc(src).setOffset(offset).setLength(length).build();
try {
//熟悉的味道,調(diào)用 rcpProxy 向namenode server 發(fā)起操作。
GetBlockLocationsResponseProto resp = this.rpcProxy.getBlockLocations((RpcController)null, req);
return resp.hasLocations() ? PBHelperClient.convert(resp.getLocations()) : null;
} catch (ServiceException var8) {
throw ProtobufHelper.getRemoteException(var8);
}
}
看到這里,下面就是RPC底層的操作了。
方法時(shí)序圖如下:
1、FileSystem初始化,Client拿到NameNodeRpcServer代理對象,建立與NameNode的RPC通信(與前面一樣)
2、調(diào)用FileSystem的open()方法,由于實(shí)現(xiàn)類為DistributedFileSystem,所有是調(diào)用該類中的open()方法
3、DistributedFileSystem持有DFSClient的引用,繼續(xù)調(diào)用DFSClient中的open()方法
4、實(shí)例化DFSInputStream輸入流
5、調(diào)用openinfo()方法
6、調(diào)用fetchLocatedBlocksAndGetLastBlockLength()方法,抓取block信息并獲取最后block長度
7、調(diào)用DFSClient中的getLocatedBlocks()方法,獲取block信息
8、在callGetBlockLocations()方法中通過NameNode代理對象調(diào)用NameNodeRpcServer的getBlockLocations()方法
9、將block信息寫入輸出流,在8中會(huì)將 block 位置信息保存到DFSInputStream輸入流對象中的成員變量中
10、交給IOUtil,下載文件到本地
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。