溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

HDFS中FileSystem是什么類(lèi)

發(fā)布時(shí)間:2021-12-09 14:04:38 來(lái)源:億速云 閱讀:172 作者:小新 欄目:云計(jì)算

小編給大家分享一下HDFS中FileSystem是什么類(lèi),相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

首先來(lái)看一下, FileSystem(org.apache.hadoop.fs.FileSystem), 這是一個(gè)抽象類(lèi), 是所有文件系統(tǒng)的父類(lèi).

而我們要從HDFS(Hadoop Distributed FileSystem)下載數(shù)據(jù), 應(yīng)該獲取一個(gè)DistributedFileSystem的實(shí)例,那么如何獲取一個(gè)DistributedFileSystem的實(shí)例呢?

FileSystem fs = FileSystem.get(new Configuration());

在FileSystem中有3個(gè)重載的get()方法

// 1.通過(guò)配置文件獲取一個(gè)FileSystem實(shí)例
public static FileSystem get(Configuration conf)
// 2.通過(guò)指定的FileSystem的URI, 配置文件獲取一個(gè)FileSystem實(shí)例
public static FileSystem get(URI uri, Configuration conf)
// 3.通過(guò)指定的FileSystem的URI, 配置文件, FileSystem用戶(hù)名獲取一個(gè)FileSystem實(shí)例
public static FileSystem get(final URI uri, final Configuration conf, final String user)

先調(diào)用FileSystem.get(Configuration conf)方法,再調(diào)用重載方法FileSystem.get(URI uri, Configuration conf)

public static FileSystem get(URI uri, Configuration conf) throws IOException {
    // schem是FileSystem具體的URI方案如: file, hdfs, Webhdfs, har等等
    String scheme = uri.getScheme();    // scheme = hdfs
    // authority是NameNode的主機(jī)名, 端口號(hào)
    String authority = uri.getAuthority();    // authority = node1:9000
    ...
    // disableCacheName = fs.hdfs.impl.disable.cache
    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);    
    // 讀取配置文件, 判斷是否禁用緩存
    if (conf.getBoolean(disableCacheName, false)) {    // 若禁用緩存
      return createFileSystem(uri, conf);    // 直接調(diào)用創(chuàng)建FileSystem實(shí)例的方法
    }
    // 不禁用緩存, 先從FileSystem的靜態(tài)成員變量CACHE中獲取FileSystem實(shí)例
    return CACHE.get(uri, conf);
}

再調(diào)用FileSystem$Cache.get(URI uri, Configuration conf)方法(Cache是FileSystem的靜態(tài)內(nèi)部類(lèi))

FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);    // key = (root (auth:SIMPLE))@hdfs://node1:9000
      return getInternal(uri, conf, key);
}

再調(diào)用FileSystem$Cache.getInternal(URI uri, Configuration conf, FileSystem$Cache$Key key)方法(Key又是Cache的靜態(tài)內(nèi)部類(lèi))

private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
      FileSystem fs;
      synchronized (this) {
        // map是Cache中用來(lái)緩存FileSystem實(shí)例的成員變量, 其類(lèi)型為HashMap<Key, FileSystem>
        fs = map.get(key);
      }
      if (fs != null) {    // 如果從緩存map中獲取到了相應(yīng)的FileSystem實(shí)例
        return fs;    // 則返回這個(gè)實(shí)例
      }
      // 否則, 調(diào)用FileSystem.createFileSystem(URI uri, Configuration conf)方法, 創(chuàng)建FileSystem實(shí)例
      fs = createFileSystem(uri, conf);
      /* 分割線(xiàn)1, 期待著createFileSystem()方法的返回 */
      synchronized (this) { // refetch the lock again
        /*
         * 在多線(xiàn)程環(huán)境下, 可能另一個(gè)客戶(hù)端(另一個(gè)線(xiàn)程)創(chuàng)建好了一個(gè)DistributedFileSystem實(shí)例, 并緩存到了map中
         * 所以, 這時(shí)候就把當(dāng)前客戶(hù)端新創(chuàng)建的DistributedFileSystem實(shí)例注銷(xiāo)
         * 其實(shí)這是一個(gè)特殊的單例模式, 一個(gè)key映射一個(gè)DistributedFileSystem實(shí)例
         */
        FileSystem oldfs = map.get(key);
        if (oldfs != null) { // a file system is created while lock is releasing
          fs.close(); // close the new file system
          return oldfs;  // return the old file system
        }
        /*
         * now insert the new file system into the map
         * 緩存當(dāng)前新創(chuàng)建的DistributedFileSystem實(shí)例到map中
         */
        fs.key = key;
        map.put(key, fs);
        ...
        return fs;
      }
}

來(lái)自分割線(xiàn)1, 先調(diào)用FileSystem.createFileSystem(URI uri, Configuration conf)方法

private static FileSystem createFileSystem(URI uri, Configuration conf
      ) throws IOException {
    // 通過(guò)讀取配置文件, 獲取FileSystem具體的URI模式: hdfs的類(lèi)對(duì)象
    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf); // clazz = org.apache.hadoop.hdfs.DistributedFileSystem
    ...
    // 反射出一個(gè)DistributedFileSystem實(shí)例
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    // 對(duì)DistributedFileSystem實(shí)例初始化
    fs.initialize(uri, conf);
    return fs;
}

在調(diào)用DistributedFileSystem.initialize(URI uri, Configuration conf)方法之前, 先來(lái)看一下DistributedFileSystem類(lèi)吧.

DistributedFileSystem是抽象類(lèi)FileSystem的子類(lèi)實(shí)現(xiàn),

public class DistributedFileSystem extends FileSystem {
  ...
  DFSClient dfs;    // DistributedFileSystem持有一個(gè)DFSClient類(lèi)型的成員變量dfs, 最重要的成員變量!
  ...
}

調(diào)用DistributedFileSystem.initialize(URI uri, Configuration conf)方法

public void initialize(URI uri, Configuration conf) throws IOException {
    ...
    // new一個(gè)DFSClient實(shí)例, 成員變量dfs引用這個(gè)DFSClient實(shí)例
    this.dfs = new DFSClient(uri, conf, statistics );
    /* 分割線(xiàn)2, 期待著new DFSClient()的返回 */
    ...
}

在new DFSClient實(shí)例之前, 先來(lái)看一下DFSClient類(lèi)吧! 看一下到底要為哪些成員變量賦值

public class DFSClient implements java.io.Closeable, RemotePeerFactory {
  ...
  final ClientProtocol namenode;    //DFSClient持有一個(gè)ClientProtocol類(lèi)型的成員變量namenode, 一個(gè)RPC代理對(duì)象
  /* The service used for delegation tokens */
  private Text dtService;
  ...
}

來(lái)自分割線(xiàn)2, 調(diào)用DFSClient的構(gòu)造函數(shù)DFSClient(URI nameNodeUri, Configuration conf, FileSystem$Statistics statistics), 再調(diào)用重載構(gòu)造函數(shù)DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem$Statistics statistics)

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, 
    FileSystem.Statistics stats) throws IOException {
    ...
    NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
    if (numResponseToDrop > 0) {    // numResponseToDrop = 0
      // This case is used for testing.
      LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
          + " is set to " + numResponseToDrop
          + ", this hacked client will proactively drop responses");
      proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
          nameNodeUri, ClientProtocol.class, numResponseToDrop);
    }
    
    if (proxyInfo != null) { // proxyInfo = null
      this.dtService = proxyInfo.getDelegationTokenService();
      this.namenode = proxyInfo.getProxy();
    } else if (rpcNamenode != null) { // rpcNamenode = null
      // This case is used for testing.
      Preconditions.checkArgument(nameNodeUri == null);
      this.namenode = rpcNamenode;
      dtService = null;
    } else {    // 前面兩個(gè)if只在測(cè)試的情況下成立, 這個(gè)else的代碼塊才是重點(diǎn)
      ...
      /*
       * 創(chuàng)建一個(gè)NameNodeProxies.ProxyAndInfo<ClientProtocol>類(lèi)型的對(duì)象, proxyInfo引用這個(gè)對(duì)象 
       * createProxy(conf, nameNodeUri, ClientProtocol.class)方法是不是和RPC.getProxy(Class<T> protocol,
       * long clientVersion, InetSocketAddress addr, Configuration conf)很像?
       * 沒(méi)錯(cuò)! 你沒(méi)看錯(cuò)! 這說(shuō)明createProxy()方法內(nèi)部一定會(huì)調(diào)用RPC的相關(guān)方法
       * conf    都是Configuration類(lèi)型的conf
       * nameNodeUri = hdfs://node1:9000    這不就是InetSocketAddress類(lèi)型的addr的hostName和port
       * ClientProtocol.class    都是RPC protocol接口的類(lèi)對(duì)象
       * ClientProtocol is used by user code via DistributedFileSystem class to communicate 
       * with the NameNode
       * ClientProtocol是DistributedFileSystem用來(lái)與NameNode通信的
       */
      proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);
      /* 分割線(xiàn)3, 期待著createProxy()方法的返回 */
      this.dtService = proxyInfo.getDelegationTokenService();
      this.namenode = proxyInfo.getProxy();
    }
    ...
}

來(lái)自分割線(xiàn)3, 調(diào)用NameNodeProxies.createProxy(Configuration conf, URI nameNodeUri, Class<T> xface)方法

/**
   * Creates the namenode proxy with the passed protocol. This will handle
   * creation of either HA- or non-HA-enabled proxy objects, depending upon
   * if the provided URI is a configured logical URI.
   * 通過(guò)傳過(guò)來(lái)的protocol參數(shù), 創(chuàng)建namenode的代理對(duì)象. 至于是HA還是非HA的namenode代理對(duì)象, 
   * 這取決于實(shí)際搭建的Hadoop環(huán)境
   **/
public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface)
    throws IOException {
    // 獲取Hadoop實(shí)際環(huán)境中HA的配置
    Class<FailoverProxyProvider<T>> failoverProxyProviderClass =
        getFailoverProxyProviderClass(conf, nameNodeUri, xface);

    if (failoverProxyProviderClass == null) {    // 非HA,這里是Hadoop的偽分布式搭建
      // Non-HA case, 創(chuàng)建一個(gè)非HA的namenode代理對(duì)象
      return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
          UserGroupInformation.getCurrentUser(), true);
    } else {    // HA
      // HA case
      FailoverProxyProvider<T> failoverProxyProvider = NameNodeProxies
          .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface,
              nameNodeUri);
      Conf config = new Conf(conf);
      T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
          RetryPolicies.failoverOnNetworkException(
              RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
              config.maxRetryAttempts, config.failoverSleepBaseMillis,
              config.failoverSleepMaxMillis));
      
      Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
      // 返回一個(gè)proxy, dtService的封裝對(duì)象proxyInfo
      return new ProxyAndInfo<T>(proxy, dtService);
    }
}

調(diào)用NameNodeProxies.createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface, UserGroupInformation ugi, boolean withRetries)方法

public static <T> ProxyAndInfo<T> createNonHAProxy(Configuration conf, InetSocketAddress nnAddr,
    Class<T> xface, UserGroupInformation ugi, boolean withRetries) throws IOException {
    Text dtService = SecurityUtil.buildTokenService(nnAddr);    //dtService = 192.168.8.101:9000
    T proxy;
    if (xface == ClientProtocol.class) {    // xface = ClientProtocol.class
      // 創(chuàng)建一個(gè)namenode代理對(duì)象
      proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi, withRetries);
      /* 分割線(xiàn)4, 期待著createNNProxyWithClientProtocol()方法返回 */
    } else if {
      ...
    }
    // 把proxy, dtService封裝成一個(gè)ProxyAndInfo對(duì)象, 并返回
    return new ProxyAndInfo<T>(proxy, dtService);
  }

以上是“HDFS中FileSystem是什么類(lèi)”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI