溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

七、HDFS上傳和下載原理(有源碼解析)

發(fā)布時(shí)間:2020-06-05 17:20:20 來源:網(wǎng)絡(luò) 閱讀:1291 作者:隔壁小白 欄目:大數(shù)據(jù)

[TOC]

一、HDFS文件上傳基本原理

1、基本流程

七、HDFS上傳和下載原理(有源碼解析)

1)客戶端通過本地通過RPC與namenode建立rpc通信,然后請求上傳文件
2)namenode收到請求后,會(huì)檢查是否能創(chuàng)建該文件(比如校驗(yàn)用戶是否有該權(quán)限,文件是否已經(jīng)存在等)。如果檢查通過,namenode就會(huì)開始記錄該新文件的元信息(先寫入到edits文件,然后更新內(nèi)存中的metadata),并響應(yīng)client可以開始上傳。
3)client 在本地將文件進(jìn)行切塊(按照指定的block大?。?。然后請求namemode上傳第一個(gè)block。
4)namenode根據(jù)策略以及每個(gè)datanode的情況,返回3個(gè)datanode地址給client(這里默認(rèn)3副本)。
5)client與請求namenode返回的3個(gè)datanode建立pipeline,即 client請求dn1,dn1請求dn2,dn2請求dn3,這樣一個(gè)串行通道。
6)3個(gè)datanode逐級響應(yīng),最終響應(yīng)給client。表示可以傳輸數(shù)據(jù)
7)client會(huì)將每個(gè)block還會(huì)分割成一個(gè)個(gè)packet,然后放入 data queue中,等待上傳。每傳輸一個(gè)packet,就會(huì)將packet加入到另外一個(gè) ack queue中,等到pipeline中的datanode響應(yīng)傳輸完成后,就會(huì)講相應(yīng)的packet從ack queue中移除。
8)后面就是重復(fù)上面的流程,直到client關(guān)閉通道,并將所有的queue中的packet刷寫到pipeline中之后,datanode就會(huì)標(biāo)記文件已完成。

注意:client完成寫入之后,此時(shí)block 才是可見的,正在寫的block是不可見的。當(dāng)調(diào)用sync方法時(shí)(將緩沖區(qū)數(shù)據(jù)刷寫到磁盤中),client才確認(rèn)寫入已經(jīng)完成。client關(guān)閉流時(shí)調(diào)用 的close方法,底層就會(huì)調(diào)用sync。是否需要手動(dòng)調(diào)用取決你根據(jù)程序需 要在數(shù)據(jù)健壯性和吞吐率之間的權(quán)衡。

2、datanode發(fā)生錯(cuò)誤的解決方式

問題:傳輸過程中,某個(gè)datanode發(fā)生錯(cuò)誤,hdfs是怎么解決?
1)pipeline關(guān)閉掉
2)為了防止丟包,ack queue中的packet會(huì)同步到data queue中。重新進(jìn)行下一次傳輸。
3)把產(chǎn)生錯(cuò)誤的datanode上當(dāng)前在寫,但未完成的block刪除掉
4)剩下的block寫到剩余兩個(gè)正常的datanode中。
5)namenode會(huì)自動(dòng)尋找另外合適的一個(gè)datanode復(fù)制另外兩個(gè)datanode中刷寫的block,完成3副本的寫入。當(dāng)然,這個(gè)操作namenode的內(nèi)部機(jī)制,對client來說是無感知的。

3、元數(shù)據(jù)存儲(chǔ)

namenode使用兩種文件保存元數(shù)據(jù),fsimag和edits文件。
fsimage:元數(shù)據(jù)鏡像文件,存儲(chǔ)某一時(shí)間段內(nèi)的namenode的內(nèi)存元數(shù)據(jù)信息
edits:操作日志文件。
fstime:保存最近一次checkpoint的時(shí)間。
更詳細(xì)的 fsimage和edits文件講解,請看 “hdfs體系架構(gòu)”

4、元數(shù)據(jù)的合并

? namenode所有的元數(shù)據(jù)信息從啟動(dòng)時(shí)就已經(jīng)全部加載到內(nèi)存中(為了提高查詢性能),用于處理讀請求的查詢操作。到有寫操作時(shí),namenode會(huì)先向edits文件中寫入操作日志,完成后才會(huì)修改內(nèi)存中的metadata,這個(gè)主要是保證元數(shù)據(jù)已經(jīng)存儲(chǔ)到磁盤中不丟失。
? hdfs內(nèi)部維護(hù)的fsimage文件其實(shí)就是內(nèi)存中的metadata的鏡像,但是兩者并不是實(shí)時(shí)一致的。fsimage的更新是通過合并edits來實(shí)現(xiàn)的。而這個(gè)合并操作是 secondaryNameNode完成的,主要流程如下:
七、HDFS上傳和下載原理(有源碼解析)

1)首先是 SNN通知 NN切換edits文件,主要是保證合并過程有新的寫入操作時(shí)能夠正常寫入edits文件。
2)SNN通過http請求從NN獲取 fsimage和edits文件。
3)SNN將fsiamge載入內(nèi)存,開始合并edits到fsimage,生成新的fsimage
4)SNN將新的fsimage發(fā)送給NN
5)NN用新的fsimage,替換舊的fsimage。

4、寫入時(shí)的網(wǎng)絡(luò)拓?fù)溥x擇

? 寫入操作時(shí),默認(rèn)3副本,那么副本分布在哪些datanode節(jié)點(diǎn)上,會(huì)影響寫入速度。在hdfs的網(wǎng)絡(luò)拓?fù)渲校心敲此姆N物理范圍,同一節(jié)點(diǎn)、同一機(jī)架上的不同節(jié)點(diǎn)、同一機(jī)房中不同節(jié)點(diǎn)、不同機(jī)房中的不同節(jié)點(diǎn)。這4中物理范圍表示節(jié)點(diǎn)間的距離逐漸增大。這種物理距離越遠(yuǎn)會(huì)影響副本之間所在節(jié)點(diǎn)之間的傳輸效率,即傳輸效率越低。

5、機(jī)架感知

上面說到副本的選擇的節(jié)點(diǎn)的位置會(huì)影響寫效率,那么hdfs是如何選擇節(jié)點(diǎn)位置的。
(1)舊版本的方式
七、HDFS上傳和下載原理(有源碼解析)

路徑是 r1/n1 --> r2/n1 --> r2/n2

(2)新版本方式
七、HDFS上傳和下載原理(有源碼解析)

路徑是 r1/n1 --> r1/n2 --> r2/n2(后面這個(gè)其實(shí)任意都行,主要處于不同機(jī)架就好)
這種方式比第一種要好,因?yàn)檫@種方式數(shù)據(jù)經(jīng)過的總路徑更短了,只要一個(gè)副本需要跨機(jī)架傳輸,而上面的則有兩個(gè)副本需要跨機(jī)架傳輸。

二、HDFS上傳文件源碼分析

下面的分析過程基于 hadoop2.8.4 的源碼分析的。

1、client初始化源碼分析

一般來說,會(huì)先通過 FileSystem.get() 獲取到操作hdfs 的客戶端對象,后面所有的操作都通過調(diào)用該對象的方法完成的。

FileSystem client = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf);

接著我們看看 FileSystem.get() 的實(shí)現(xiàn)

public static FileSystem get(URI uri, Configuration conf) throws IOException {
        String scheme = uri.getScheme();
        String authority = uri.getAuthority();
        if (scheme == null && authority == null) {
            return get(conf);
        } else {
            if (scheme != null && authority == null) {
                URI defaultUri = getDefaultUri(conf);
                if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
                    return get(defaultUri, conf);
                }
            }

            String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);

  /*
  這里是關(guān)鍵代碼,表示進(jìn)入 CACHE.get() 方法
  */
            return conf.getBoolean(disableCacheName, false) ? createFileSystem(uri, conf) : CACHE.get(uri, conf);
        }
    }

CACHE是FileSystem的一個(gè)靜態(tài)內(nèi)部類Cache 的對象。繼續(xù)看看 CACHE.get()方法

FileSystem get(URI uri, Configuration conf) throws IOException {
            FileSystem.Cache.Key key = new FileSystem.Cache.Key(uri, conf);
            //進(jìn)入CACHE對象的 getInternal() 方法
            return this.getInternal(uri, conf, key);
        }

進(jìn)入CACHE對象的 getInternal() 方法

 private FileSystem getInternal(URI uri, Configuration conf, FileSystem.Cache.Key key) throws IOException {
            FileSystem fs;
            synchronized(this) {
            /*
            獲取map中的filesytem對象,表示之前已經(jīng)初始化了filesystem對象,并存儲(chǔ)到map集合中,現(xiàn)在直接從map中獲取就好。      
            */
                fs = (FileSystem)this.map.get(key);
            }

            if (fs != null) {
                //如果fs存在,就直接返回存在的filesytem實(shí)例即可
                return fs;
            } else {
            //如果是初次使用filesystem,就得創(chuàng)建并初始化
                fs = FileSystem.createFileSystem(uri, conf);
                synchronized(this) {
                    FileSystem oldfs = (FileSystem)this.map.get(key);
                    if (oldfs != null) {
                        fs.close();
                        return oldfs;
                    } else {
                        if (this.map.isEmpty() && !ShutdownHookManager.get().isShutdownInProgress()) {
                            ShutdownHookManager.get().addShutdownHook(this.clientFinalizer, 10);
                        }

                        fs.key = key;
                        this.map.put(key, fs);
                        if (conf.getBoolean("fs.automatic.close", true)) {
                            this.toAutoClose.add(key);
                        }

                        return fs;
                    }
                }
            }
        }

我們看到了上面有兩種方式,一種是如果filesytem對象已存在,那么直接從map獲取并返回對象即可。如果不存在,就調(diào)用 FileSystem.createFileSystem() 方法創(chuàng)建,創(chuàng)建完成后返回fs。下面看看這個(gè)方法.

private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {
        Tracer tracer = FsTracer.get(conf);
        TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
        scope.addKVAnnotation("scheme", uri.getScheme());

        FileSystem var6;
        try {
            Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
            FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);

            //這是關(guān)鍵性的代碼,看名字就知道,對filesytem 進(jìn)行初始化
            fs.initialize(uri, conf);
            var6 = fs;
        } finally {
            scope.close();
        }

        return var6;
    }

我們要注意,F(xiàn)ileSystem這個(gè)類是抽象類,它的實(shí)現(xiàn)子類是 DistributedFileSystem,所以雖然 fs是FileSystem類型的,但是對象本身是DistributedFileSystem類型的,也就是java 的多態(tài)特性。所以fs.initialize() 調(diào)用的實(shí)際上是 DistributedFileSystem中initialize()方法。下面看看這個(gè)方法

/*
DistributedFileSystem.class
*/

public void initialize(URI uri, Configuration conf) throws IOException {
        super.initialize(uri, conf);
        this.setConf(conf);
        String host = uri.getHost();
        if (host == null) {
            throw new IOException("Incomplete HDFS URI, no host: " + uri);
        } else {
            this.homeDirPrefix = conf.get("dfs.user.home.dir.prefix", "/user");

            //這是關(guān)鍵性代碼,創(chuàng)建了一個(gè)DFSClient對象,顧名思義就是RPC的客戶端
            this.dfs = new DFSClient(uri, conf, this.statistics);
            this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
            this.workingDir = this.getHomeDirectory();
            this.storageStatistics = (DFSOpsCountStatistics)GlobalStorageStatistics.INSTANCE.put("DFSOpsCountStatistics", new StorageStatisticsProvider() {
                public StorageStatistics provide() {
                    return new DFSOpsCountStatistics();
                }
            });
        }
    }

看到上面創(chuàng)建了一個(gè) DFSClient() 對象,賦值給了 this.dfs。下面看看這個(gè)類的構(gòu)造方法。

/*
DFSClient.class
*/

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, Statistics stats) throws IOException {
        .............................
        /*源碼比較長,所以截取重要的部分顯示*/

        //這是一個(gè)關(guān)鍵性變量,其實(shí)就是namenode代理對象,只不過還沒有創(chuàng)建對象
        ProxyAndInfo<ClientProtocol> proxyInfo = null;
        ...............................

        //下面開始創(chuàng)建namenode代理對象
        if (proxyInfo != null) {
            this.dtService = proxyInfo.getDelegationTokenService();
            this.namenode = (ClientProtocol)proxyInfo.getProxy();
        } else if (rpcNamenode != null) {
            Preconditions.checkArgument(nameNodeUri == null);
            this.namenode = rpcNamenode;
            this.dtService = null;
        } else {
            Preconditions.checkArgument(nameNodeUri != null, "null URI");
            //這里創(chuàng)建代理對象信息
            proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf, nameNodeUri, nnFallbackToSimpleAuth);
            this.dtService = proxyInfo.getDelegationTokenService();

            //這里可以看到直接通過 proxyInfo.getProxy()獲取namenode代理對象,并將引用賦值給 this.namenode。并且類型是 ClientProtocol 類型的。
            this.namenode = (ClientProtocol)proxyInfo.getProxy();
        }

    /*下面省略一堆代碼*/    

    }

可以看到上面已經(jīng)通過 this.namenode = (ClientProtocol)proxyInfo.getProxy(); 獲取到了 namenode 的代理對象,也就是rpc的客戶端對象。下面看看 ClientProtocol 這個(gè)是啥東西,因?yàn)榇韺ο笫沁@個(gè)類型的。

/*
ClientProtocol.class

這是個(gè)接口
*/
public interface ClientProtocol {
    long versionID = 69L;
    /*

    下面主要是定義很多個(gè)抽象方法,主要就是用于對hdfs進(jìn)行操作的接口,比如,open,create等這些常用方法。
    */
}

下面看看 proxyInfo創(chuàng)建代理對象的方法

/*
NameNodeProxiesClient
*/

    public static NameNodeProxiesClient.ProxyAndInfo<ClientProtocol> createProxyWithClientProtocol(Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth) throws IOException {
        AbstractNNFailoverProxyProvider<ClientProtocol> failoverProxyProvider = createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, true, fallbackToSimpleAuth);
        if (failoverProxyProvider == null) {

        //創(chuàng)建無HA的代理對象
            InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
            Text dtService = SecurityUtil.buildTokenService(nnAddr);
            //創(chuàng)建proxy對象
            ClientProtocol proxy = createNonHAProxyWithClientProtocol(nnAddr, conf, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);

            //ProxyAndInfo是一個(gè)靜態(tài)內(nèi)部類,將前面的proxy通過該類封裝后返回,可通過該類的 getProxy 方法返回已創(chuàng)建的proxy對象
            return new NameNodeProxiesClient.ProxyAndInfo(proxy, dtService, nnAddr);
        } else {

        //創(chuàng)建有HA的代理對象
            return createHAProxy(conf, nameNodeUri, ClientProtocol.class, failoverProxyProvider);
        }
    }

可以看到上面是已經(jīng)創(chuàng)建了 proxy對象,并返回,而且我們也可以看到,創(chuàng)建的proxy對象就是clientProtocol類型的。下面看看創(chuàng)建proxy對象的方法 createNonHAProxyWithClientProtocol()

/*
NameNodeProxiesClient
*/

   public static ClientProtocol createNonHAProxyWithClientProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException {
        RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
        RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(conf, "dfs.client.retry.policy.enabled", false, "dfs.client.retry.policy.spec", "10000,6,60000,10", SafeModeException.class.getName());
        long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);

        //這里是核心代碼,可以明顯看到調(diào)用 RPC 模塊中的方法創(chuàng)建proxy對象
        ClientNamenodeProtocolPB proxy = (ClientNamenodeProtocolPB)RPC.getProtocolProxy(ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf), defaultPolicy, fallbackToSimpleAuth).getProxy();
        if (withRetries) {
            Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap();
            ClientProtocol translatorProxy = new ClientNamenodeProtocolTranslatorPB(proxy);
            return (ClientProtocol)RetryProxy.create(ClientProtocol.class, new DefaultFailoverProxyProvider(ClientProtocol.class, translatorProxy), methodNameToPolicyMap, defaultPolicy);
        } else {
            return new ClientNamenodeProtocolTranslatorPB(proxy);
        }
    }

所以至此我們可以發(fā)現(xiàn),客戶端和namenode之間通信的方式就是通過RPC實(shí)現(xiàn)的。

總結(jié)來說,方法的調(diào)用時(shí)序圖如下:
七、HDFS上傳和下載原理(有源碼解析)

2、上傳源碼分析

一般來說,上傳操作,首先得

OutputStream os = fs.create(new Path("xxxx"));

即創(chuàng)建文件,然后再上傳文件數(shù)據(jù)。上傳數(shù)據(jù)的流程和普通的流操作沒什么區(qū)別。
下面看看這個(gè) create方法。

/*
FileSystem.class
*/

    public abstract FSDataOutputStream create(Path var1, FsPermission var2, boolean var3, int var4, short var5, long var6, Progressable var8) throws IOException;

可以看到這是個(gè)抽象方法,前面也說到,它的實(shí)現(xiàn)子類是 DistributedFileSystem,這里這里是調(diào)用子類的 create方法,繼續(xù)看

/*
DistributedFileSystem.class
*/    

    public FSDataOutputStream create(Path f, final FsPermission permission, final EnumSet<CreateFlag> cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException {
        this.statistics.incrementWriteOps(1);
        this.storageStatistics.incrementOpCounter(OpType.CREATE);
        Path absF = this.fixRelativePart(f);
        return (FSDataOutputStream)(new FileSystemLinkResolver<FSDataOutputStream>() {
            public FSDataOutputStream doCall(Path p) throws IOException {

                //這里是核心代碼,this.dfs前面說到了就是存儲(chǔ)了DFSClient對象的引用的??梢酝ㄟ^該客戶端調(diào)用很多操作hdfs的方法。這里調(diào)用create方法,創(chuàng)建了一個(gè) DFSOutputStream 對象。輸出流對象
                DFSOutputStream dfsos = DistributedFileSystem.this.dfs.create(DistributedFileSystem.this.getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt);

                //這里將上面創(chuàng)建的dfsos進(jìn)行包裝并返回
                return DistributedFileSystem.this.dfs.createWrappedOutputStream(dfsos, DistributedFileSystem.this.statistics);
            }

            public FSDataOutputStream next(FileSystem fs, Path p) throws IOException {
                return fs.create(p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt);
            }
        }).resolve(this, absF);
    }

可以看見上面創(chuàng)建返回了 DFSOutputStream 輸出流對象。下面看看DFSClient.create方法的實(shí)現(xiàn)代碼。

/*
DFSClient.class
*/ 

public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException {
        this.checkOpen();
        FsPermission masked = this.applyUMask(permission);
        LOG.debug("{}: masked={}", src, masked);

        //創(chuàng)建輸出流對象
        DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, this.dfsClientConf.createChecksum(checksumOpt), this.getFavoredNodesStr(favoredNodes));
        this.beginFileLease(result.getFileId(), result);
        return result;
    }

繼續(xù)看 DFSOutputStream.newStreamForCreate 這個(gè)方法.

/*
DistributedFileSystem.class
*/ 

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException {
        TraceScope ignored = dfsClient.newPathTraceScope("newStreamForCreate", src);
        Throwable var12 = null;

        try {
            HdfsFileStatus stat = null;
            boolean shouldRetry = true;
            int retryCount = 10;

            while(true) {
                if (shouldRetry) {
                    shouldRetry = false;

                    try {

                    //這里是核心代碼,可以看見是調(diào)用 dfsclient.namenode這個(gè)代理對象中的create方法創(chuàng)建文件,并返回狀態(tài)
                        stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS);
                    } catch (RemoteException var27) {
                        IOException e = var27.unwrapRemoteException(new Class[]{AccessControlException.class, DSQuotaExceededException.class, QuotaByStorageTypeExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class});
                        if (e instanceof RetryStartFileException) {
                            if (retryCount <= 0) {
                                throw new IOException("Too many retries because of encryption zone operations", e);
                            }

                            shouldRetry = true;
                            --retryCount;
                            continue;
                        }

                        throw e;
                    }
                }

                Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");

                //這里將上面創(chuàng)建文件的狀態(tài)傳入輸出流作為參數(shù)
                DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
                //看見一個(gè)神奇的方法
                out.start();
                DFSOutputStream var30 = out;

                //返回輸出流
                return var30;
            }
        } catch (Throwable var28) {
            var12 = var28;
            throw var28;
        } finally {
            if (ignored != null) {
                if (var12 != null) {
                    try {
                        ignored.close();
                    } catch (Throwable var26) {
                        var12.addSuppressed(var26);
                    }
                } else {
                    ignored.close();
                }
            }

        }
    }

上面看到 DFSOutputStream 對象居然有一個(gè) start方法,來看看先。

/*
DFSOutputStream.class
*/ 
   protected synchronized void start() {
        this.getStreamer().start();
    }

// 繼續(xù)看 this.getStreamer() 這個(gè)方法,可以看到這個(gè)方法返回的是DataStreamer,繼續(xù)看這個(gè)類
 protected DataStreamer getStreamer() {
        return this.streamer;
    }

/*
DataStreamer.class
*/
//可以看到這個(gè)類繼承了 Daemon類,而Daemon本身是繼承了 Thread類
class DataStreamer extends Daemon {  }

由此可得知,DFSOutputStream 這個(gè)類本身并沒有繼承 Thread類,但是使用DataStreamer這個(gè)繼承了 Thread類的來新建線程傳輸數(shù)據(jù),不占用當(dāng)前線程。而在 DataStreamer 這個(gè)類中,重寫了 Thread標(biāo)志性的 run 方法。傳輸數(shù)據(jù)就是在這里完成的。前面說到的 hdfs的 pipeline 也是這個(gè)run方法中實(shí)現(xiàn)的,里面是一個(gè)while死循環(huán),知道傳輸完數(shù)據(jù)為止,或者客戶端關(guān)閉。代碼過長,就不看了。反正看到這里已經(jīng)成功獲取了 client的輸出流對象,后面就是傳統(tǒng)的輸入流和輸出流的對接了,這里不細(xì)講了。

方法時(shí)序圖如下:
七、HDFS上傳和下載原理(有源碼解析)

1、FileSystem初始化,Client拿到NameNodeRpcServer代理對象,建立與NameNode的RPC通信
2、調(diào)用FileSystem的create()方法,由于實(shí)現(xiàn)類為DistributedFileSystem,所有是調(diào)用該類中的create()方法
3、DistributedFileSystem持有DFSClient的引用,繼續(xù)調(diào)用DFSClient中的create()方法
4、DFSOutputStream提供的靜態(tài)newStreamForCreate()方法中調(diào)用NameNodeRpcServer服務(wù)端的create()方法并創(chuàng)建DFSOutputStream輸出流對象返回
5、通過hadoop提供的IOUtil工具類將輸出流輸出到本地

三、HDFS下載基本原理

1、基本流程
七、HDFS上傳和下載原理(有源碼解析)

1)客戶端向namenode請求下載文件,namenode在內(nèi)存的metadata查找對應(yīng)的文件的元數(shù)據(jù),如果無則返回?zé)o,有則返回對應(yīng)文件的block位置信息。而且,namenode會(huì)根據(jù)客戶端所在的位置,根據(jù)datanode以及client之間的距離大小,將返回的 block 的副本的datanode節(jié)點(diǎn)從距離小到大排序,距離最近的datanode則排在第一位。
2)client通過機(jī)架感知策略,選擇最近的datanode進(jìn)行block請求讀取
3)datanode開始傳輸數(shù)據(jù)給client,以packet為單位,并做校驗(yàn)
4)客戶端接收packet之后,本地緩存,然后再往本地路徑寫入該block。
5)第二塊,第三塊block重復(fù)以上過程

注意:

如果在讀數(shù)據(jù)的時(shí)候, DFSInputStream和datanode的通訊發(fā)生異常,就會(huì)嘗試正在讀的block的排序第二近的datanode,并且會(huì)記錄哪個(gè) datanode發(fā)生錯(cuò)誤,剩余的blocks讀的時(shí)候就會(huì)直接跳過該datanode。 DFSInputStream也會(huì)檢查block數(shù)據(jù)校驗(yàn)和,如果發(fā)現(xiàn)一個(gè)壞的block,就會(huì)先報(bào)告到namenode節(jié)點(diǎn),然后 DFSInputStream在其他的datanode上讀該block的鏡像。

四、HDFS下載源碼分析

client的初始化代碼是一樣的,這里不重復(fù)分析了。直接看下載
首先通過 open方法獲取目標(biāo)文件的輸入流對象。

FSDataInputStream fis = client.open(getPath);

下面看看這個(gè)open方法

/*
FileSystem.class
*/

public FSDataInputStream open(Path f) throws IOException {
        return this.open(f, this.getConf().getInt("io.file.buffer.size", 4096));
    }

public abstract FSDataInputStream open(Path var1, int var2) throws IOException;

可以看到,依舊是抽象方法,所以依舊是調(diào)用 DistributedFileSystem的open方法。

/*
DistributedFileSystem.class
*/
    public FSDataInputStream open(Path f, final int bufferSize) throws IOException {
        this.statistics.incrementReadOps(1);
        this.storageStatistics.incrementOpCounter(OpType.OPEN);
        Path absF = this.fixRelativePart(f);
        return (FSDataInputStream)(new FileSystemLinkResolver<FSDataInputStream>() {
            public FSDataInputStream doCall(Path p) throws IOException {

            //核心代碼,這里調(diào)用dfsclient的open方法穿件輸入流
                DFSInputStream dfsis = DistributedFileSystem.this.dfs.open(DistributedFileSystem.this.getPathName(p), bufferSize, DistributedFileSystem.this.verifyChecksum);
                return DistributedFileSystem.this.dfs.createWrappedInputStream(dfsis);
            }

            public FSDataInputStream next(FileSystem fs, Path p) throws IOException {
                return fs.open(p, bufferSize);
            }
        }).resolve(this, absF);
    }

熟悉的套路,依舊調(diào)用 dfsclient的open方法,創(chuàng)建輸入流,下面看看這個(gè)open方法

/*
DFSClient.class
*/

public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) throws IOException {
        this.checkOpen();
        TraceScope ignored = this.newPathTraceScope("newDFSInputStream", src);
        Throwable var5 = null;

        DFSInputStream var6;
        try {

         //這里直接創(chuàng)建一個(gè)輸入流對象,如果按照上面上傳文件的套路來說,應(yīng)該是  dfsclient.namenode.open(xxx)才對的,這里并沒有
            var6 = new DFSInputStream(this, src, verifyChecksum, (LocatedBlocks)null);
        } catch (Throwable var15) {
            var5 = var15;
            throw var15;
        } finally {
            if (ignored != null) {
                if (var5 != null) {
                    try {
                        ignored.close();
                    } catch (Throwable var14) {
                        var5.addSuppressed(var14);
                    }
                } else {
                    ignored.close();
                }
            }

        }

        return var6;
    }

上面并沒有調(diào)用DFSClient.open,而是將DFSClient作為參數(shù)傳入DFSInputStream。下面看看 DFSInputStream 這個(gè)神奇的類。

/*
DFSInputStream.class
*/

DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, LocatedBlocks locatedBlocks) throws IOException {
        //將dfsclinet保存到當(dāng)前類中
        this.dfsClient = dfsClient;
        this.verifyChecksum = verifyChecksum;
        this.src = src;
        synchronized(this.infoLock) {
            this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
        }

        this.locatedBlocks = locatedBlocks;
        //核心方法,開始獲取block信息,如有多少個(gè)block,以及每個(gè)block所在的datanode節(jié)點(diǎn)名
        this.openInfo(false);
    }

下面看看 openInfo() 方法

/*
DFSInputStream.class
*/

    void openInfo(boolean refreshLocatedBlocks) throws IOException {
        DfsClientConf conf = this.dfsClient.getConf();
        synchronized(this.infoLock) {

            //獲取block的位置信息以及最后一個(gè)block的長度(因?yàn)樽詈笠粋€(gè)block肯定不是完整的128M的長度)
            this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);

            int retriesForLastBlockLength;
            for(retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength(); retriesForLastBlockLength > 0 && this.lastBlockBeingWrittenLength == -1L; --retriesForLastBlockLength) {
                DFSClient.LOG.warn("Last block locations not available. Datanodes might not have reported blocks completely. Will retry for " + retriesForLastBlockLength + " times");
                this.waitFor(conf.getRetryIntervalForGetLastBlockLength());
                this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength(true);
            }

            if (this.lastBlockBeingWrittenLength == -1L && retriesForLastBlockLength == 0) {
                throw new IOException("Could not obtain the last block locations.");
            }
        }
    }

下面看看 fetchLocatedBlocksAndGetLastBlockLength 這個(gè)獲取block信息的方法

/*
DFSInputStream.class
*/

private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh) throws IOException {
        LocatedBlocks newInfo = this.locatedBlocks;
        if (this.locatedBlocks == null || refresh) {

            //可以看到這里是調(diào)用 dfsclient中的方法倆獲取block信息
            newInfo = this.dfsClient.getLocatedBlocks(this.src, 0L);
        }

        DFSClient.LOG.debug("newInfo = {}", newInfo);
        if (newInfo == null) {
            throw new IOException("Cannot open filename " + this.src);
        } else {
            if (this.locatedBlocks != null) {
                Iterator<LocatedBlock> oldIter = this.locatedBlocks.getLocatedBlocks().iterator();
                Iterator newIter = newInfo.getLocatedBlocks().iterator();

                while(oldIter.hasNext() && newIter.hasNext()) {
                    if (!((LocatedBlock)oldIter.next()).getBlock().equals(((LocatedBlock)newIter.next()).getBlock())) {
                        throw new IOException("Blocklist for " + this.src + " has changed!");
                    }
                }
            }

            this.locatedBlocks = newInfo;
            long lastBlockBeingWrittenLength = 0L;
            if (!this.locatedBlocks.isLastBlockComplete()) {
                LocatedBlock last = this.locatedBlocks.getLastLocatedBlock();
                if (last != null) {
                    if (last.getLocations().length == 0) {
                        if (last.getBlockSize() == 0L) {
                            return 0L;
                        }

                        return -1L;
                    }

                    long len = this.readBlockLength(last);
                    last.getBlock().setNumBytes(len);
                    lastBlockBeingWrittenLength = len;
                }
            }

            this.fileEncryptionInfo = this.locatedBlocks.getFileEncryptionInfo();
            return lastBlockBeingWrittenLength;
        }
    }

看到上面又回到調(diào)用 dfsClient.getLocatedBlocks,看看這個(gè)方法

/*
DFSClient.class
*/

public LocatedBlocks getLocatedBlocks(String src, long start) throws IOException {
        return this.getLocatedBlocks(src, start, this.dfsClientConf.getPrefetchSize());
    }

//繼續(xù)調(diào)用下面這個(gè)方法
    public LocatedBlocks getLocatedBlocks(String src, long start, long length) throws IOException {
        TraceScope ignored = this.newPathTraceScope("getBlockLocations", src);
        Throwable var7 = null;

        LocatedBlocks var8;
        try {

        //調(diào)用這個(gè)靜態(tài)方法獲取 block位置信息
            var8 = callGetBlockLocations(this.namenode, src, start, length);
        } catch (Throwable var17) {
            var7 = var17;
            throw var17;
        } finally {
            if (ignored != null) {
                if (var7 != null) {
                    try {
                        ignored.close();
                    } catch (Throwable var16) {
                        var7.addSuppressed(var16);
                    }
                } else {
                    ignored.close();
                }
            }

        }

        return var8;
    }

//繼續(xù)看
    static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, String src, long start, long length) throws IOException {
        try {

        //熟悉的味道,通過 namenode 的代理對象獲取block信息
            return namenode.getBlockLocations(src, start, length);
        } catch (RemoteException var7) {
            throw var7.unwrapRemoteException(new Class[]{AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class});
        }
    }

上面可以看到,仍舊是通過 namenode代理對象發(fā)起操作,下面看看 namenode.getBlockLocations。因?yàn)榇韺ο蟮念愋褪?ClientProtocol類型的,是個(gè)接口,所以得到實(shí)現(xiàn)子類中查看 ,ClientNamenodeProtocolTranslatorPB這個(gè)類。

/*
ClientNamenodeProtocolTranslatorPB.class
*/

public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException {
        GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto.newBuilder().setSrc(src).setOffset(offset).setLength(length).build();

        try {

            //熟悉的味道,調(diào)用 rcpProxy 向namenode server 發(fā)起操作。
            GetBlockLocationsResponseProto resp = this.rpcProxy.getBlockLocations((RpcController)null, req);
            return resp.hasLocations() ? PBHelperClient.convert(resp.getLocations()) : null;
        } catch (ServiceException var8) {
            throw ProtobufHelper.getRemoteException(var8);
        }
    }

看到這里,下面就是RPC底層的操作了。

方法時(shí)序圖如下:
七、HDFS上傳和下載原理(有源碼解析)1、FileSystem初始化,Client拿到NameNodeRpcServer代理對象,建立與NameNode的RPC通信(與前面一樣)
2、調(diào)用FileSystem的open()方法,由于實(shí)現(xiàn)類為DistributedFileSystem,所有是調(diào)用該類中的open()方法
3、DistributedFileSystem持有DFSClient的引用,繼續(xù)調(diào)用DFSClient中的open()方法
4、實(shí)例化DFSInputStream輸入流
5、調(diào)用openinfo()方法
6、調(diào)用fetchLocatedBlocksAndGetLastBlockLength()方法,抓取block信息并獲取最后block長度
7、調(diào)用DFSClient中的getLocatedBlocks()方法,獲取block信息
8、在callGetBlockLocations()方法中通過NameNode代理對象調(diào)用NameNodeRpcServer的getBlockLocations()方法
9、將block信息寫入輸出流,在8中會(huì)將 block 位置信息保存到DFSInputStream輸入流對象中的成員變量中
10、交給IOUtil,下載文件到本地

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI