HDFS下载数据之源码分析-FileSystem.get(conf)_block01

水深无声 2022-01-13 22:29 313阅读 0赞

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

首先来看一下, FileSystem(org.apache.hadoop.fs.FileSystem), 这是一个抽象类, 是所有文件系统的父类.

而我们要从HDFS(Hadoop Distributed FileSystem)下载数据, 应该获取一个DistributedFileSystem的实例,那么如何获取一个DistributedFileSystem的实例呢?

  1. FileSystem fs = FileSystem.get(new Configuration());

在FileSystem中有3个重载的get()方法

  1. // 1.通过配置文件获取一个FileSystem实例
  2. public static FileSystem get(Configuration conf)
  3. // 2.通过指定的FileSystem的URI, 配置文件获取一个FileSystem实例
  4. public static FileSystem get(URI uri, Configuration conf)
  5. // 3.通过指定的FileSystem的URI, 配置文件, FileSystem用户名获取一个FileSystem实例
  6. public static FileSystem get(final URI uri, final Configuration conf, final String user)

先调用FileSystem.get(Configuration conf)方法,再调用重载方法FileSystem.get(URI uri, Configuration conf)

  1. public static FileSystem get(URI uri, Configuration conf) throws IOException {
  2. // schem是FileSystem具体的URI方案如: file, hdfs, Webhdfs, har等等
  3. String scheme = uri.getScheme(); // scheme = hdfs
  4. // authority是NameNode的主机名, 端口号
  5. String authority = uri.getAuthority(); // authority = node1:9000
  6. ...
  7. // disableCacheName = fs.hdfs.impl.disable.cache
  8. String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
  9. // 读取配置文件, 判断是否禁用缓存
  10. if (conf.getBoolean(disableCacheName, false)) { // 若禁用缓存
  11. return createFileSystem(uri, conf); // 直接调用创建FileSystem实例的方法
  12. }
  13. // 不禁用缓存, 先从FileSystem的静态成员变量CACHE中获取FileSystem实例
  14. return CACHE.get(uri, conf);
  15. }

再调用FileSystem$Cache.get(URI uri, Configuration conf)方法(Cache是FileSystem的静态内部类)

  1. FileSystem get(URI uri, Configuration conf) throws IOException{
  2. Key key = new Key(uri, conf); // key = (root (auth:SIMPLE))@hdfs://node1:9000
  3. return getInternal(uri, conf, key);
  4. }

再调用FileSystem$Cache.getInternal(URI uri, Configuration conf, FileSystem$Cache$Key key)方法(Key又是Cache的静态内部类)

  1. private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
  2. FileSystem fs;
  3. synchronized (this) {
  4. // map是Cache中用来缓存FileSystem实例的成员变量, 其类型为HashMap<Key, FileSystem>
  5. fs = map.get(key);
  6. }
  7. if (fs != null) { // 如果从缓存map中获取到了相应的FileSystem实例
  8. return fs; // 则返回这个实例
  9. }
  10. // 否则, 调用FileSystem.createFileSystem(URI uri, Configuration conf)方法, 创建FileSystem实例
  11. fs = createFileSystem(uri, conf);
  12. /* 分割线1, 期待着createFileSystem()方法的返回 */
  13. synchronized (this) { // refetch the lock again
  14. /*
  15. * 在多线程环境下, 可能另一个客户端(另一个线程)创建好了一个DistributedFileSystem实例, 并缓存到了map中
  16. * 所以, 这时候就把当前客户端新创建的DistributedFileSystem实例注销
  17. * 其实这是一个特殊的单例模式, 一个key映射一个DistributedFileSystem实例
  18. */
  19. FileSystem oldfs = map.get(key);
  20. if (oldfs != null) { // a file system is created while lock is releasing
  21. fs.close(); // close the new file system
  22. return oldfs; // return the old file system
  23. }
  24. /*
  25. * now insert the new file system into the map
  26. * 缓存当前新创建的DistributedFileSystem实例到map中
  27. */
  28. fs.key = key;
  29. map.put(key, fs);
  30. ...
  31. return fs;
  32. }
  33. }

来自分割线1, 先调用FileSystem.createFileSystem(URI uri, Configuration conf)方法

  1. private static FileSystem createFileSystem(URI uri, Configuration conf
  2. ) throws IOException {
  3. // 通过读取配置文件, 获取FileSystem具体的URI模式: hdfs的类对象
  4. Class<?> clazz = getFileSystemClass(uri.getScheme(), conf); // clazz = org.apache.hadoop.hdfs.DistributedFileSystem
  5. ...
  6. // 反射出一个DistributedFileSystem实例
  7. FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
  8. // 对DistributedFileSystem实例初始化
  9. fs.initialize(uri, conf);
  10. return fs;
  11. }

在调用DistributedFileSystem.initialize(URI uri, Configuration conf)方法之前, 先来看一下DistributedFileSystem类吧.

DistributedFileSystem是抽象类FileSystem的子类实现,

  1. public class DistributedFileSystem extends FileSystem {
  2. ...
  3. DFSClient dfs; // DistributedFileSystem持有一个DFSClient类型的成员变量dfs, 最重要的成员变量!
  4. ...
  5. }

调用DistributedFileSystem.initialize(URI uri, Configuration conf)方法

  1. public void initialize(URI uri, Configuration conf) throws IOException {
  2. ...
  3. // new一个DFSClient实例, 成员变量dfs引用这个DFSClient实例
  4. this.dfs = new DFSClient(uri, conf, statistics );
  5. /* 分割线2, 期待着new DFSClient()的返回 */
  6. ...
  7. }

在new DFSClient实例之前, 先来看一下DFSClient类吧! 看一下到底要为哪些成员变量赋值

  1. public class DFSClient implements java.io.Closeable, RemotePeerFactory {
  2. ...
  3. final ClientProtocol namenode; //DFSClient持有一个ClientProtocol类型的成员变量namenode, 一个RPC代理对象
  4. /* The service used for delegation tokens */
  5. private Text dtService;
  6. ...
  7. }

来自分割线2, 调用DFSClient的构造函数DFSClient(URI nameNodeUri, Configuration conf, FileSystem$Statistics statistics), 再调用重载构造函数DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem$Statistics statistics)

  1. public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf,
  2. FileSystem.Statistics stats) throws IOException {
  3. ...
  4. NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
  5. if (numResponseToDrop > 0) { // numResponseToDrop = 0
  6. // This case is used for testing.
  7. LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
  8. + " is set to " + numResponseToDrop
  9. + ", this hacked client will proactively drop responses");
  10. proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
  11. nameNodeUri, ClientProtocol.class, numResponseToDrop);
  12. }
  13. if (proxyInfo != null) { // proxyInfo = null
  14. this.dtService = proxyInfo.getDelegationTokenService();
  15. this.namenode = proxyInfo.getProxy();
  16. } else if (rpcNamenode != null) { // rpcNamenode = null
  17. // This case is used for testing.
  18. Preconditions.checkArgument(nameNodeUri == null);
  19. this.namenode = rpcNamenode;
  20. dtService = null;
  21. } else { // 前面两个if只在测试的情况下成立, 这个else的代码块才是重点
  22. ...
  23. /*
  24. * 创建一个NameNodeProxies.ProxyAndInfo<ClientProtocol>类型的对象, proxyInfo引用这个对象
  25. * createProxy(conf, nameNodeUri, ClientProtocol.class)方法是不是和RPC.getProxy(Class<T> protocol,
  26. * long clientVersion, InetSocketAddress addr, Configuration conf)很像?
  27. * 没错! 你没看错! 这说明createProxy()方法内部一定会调用RPC的相关方法
  28. * conf 都是Configuration类型的conf
  29. * nameNodeUri = hdfs://node1:9000 这不就是InetSocketAddress类型的addr的hostName和port
  30. * ClientProtocol.class 都是RPC protocol接口的类对象
  31. * ClientProtocol is used by user code via DistributedFileSystem class to communicate
  32. * with the NameNode
  33. * ClientProtocol是DistributedFileSystem用来与NameNode通信的
  34. */
  35. proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);
  36. /* 分割线3, 期待着createProxy()方法的返回 */
  37. this.dtService = proxyInfo.getDelegationTokenService();
  38. this.namenode = proxyInfo.getProxy();
  39. }
  40. ...
  41. }

来自分割线3, 调用NameNodeProxies.createProxy(Configuration conf, URI nameNodeUri, Class xface)方法

  1. /**
  2. * Creates the namenode proxy with the passed protocol. This will handle
  3. * creation of either HA- or non-HA-enabled proxy objects, depending upon
  4. * if the provided URI is a configured logical URI.
  5. * 通过传过来的protocol参数, 创建namenode的代理对象. 至于是HA还是非HA的namenode代理对象,
  6. * 这取决于实际搭建的Hadoop环境
  7. **/
  8. public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface)
  9. throws IOException {
  10. // 获取Hadoop实际环境中HA的配置
  11. Class<FailoverProxyProvider<T>> failoverProxyProviderClass =
  12. getFailoverProxyProviderClass(conf, nameNodeUri, xface);
  13. if (failoverProxyProviderClass == null) { // 非HA,这里是Hadoop的伪分布式搭建
  14. // Non-HA case, 创建一个非HA的namenode代理对象
  15. return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
  16. UserGroupInformation.getCurrentUser(), true);
  17. } else { // HA
  18. // HA case
  19. FailoverProxyProvider<T> failoverProxyProvider = NameNodeProxies
  20. .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface,
  21. nameNodeUri);
  22. Conf config = new Conf(conf);
  23. T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
  24. RetryPolicies.failoverOnNetworkException(
  25. RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
  26. config.maxRetryAttempts, config.failoverSleepBaseMillis,
  27. config.failoverSleepMaxMillis));
  28. Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
  29. // 返回一个proxy, dtService的封装对象proxyInfo
  30. return new ProxyAndInfo<T>(proxy, dtService);
  31. }
  32. }

调用NameNodeProxies.createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class xface, UserGroupInformation ugi, boolean withRetries)方法

  1. public static <T> ProxyAndInfo<T> createNonHAProxy(Configuration conf, InetSocketAddress nnAddr,
  2. Class<T> xface, UserGroupInformation ugi, boolean withRetries) throws IOException {
  3. Text dtService = SecurityUtil.buildTokenService(nnAddr); //dtService = 192.168.8.101:9000
  4. T proxy;
  5. if (xface == ClientProtocol.class) { // xface = ClientProtocol.class
  6. // 创建一个namenode代理对象
  7. proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi, withRetries);
  8. /* 分割线4, 期待着createNNProxyWithClientProtocol()方法返回 */
  9. } else if {
  10. ...
  11. }
  12. // 把proxy, dtService封装成一个ProxyAndInfo对象, 并返回
  13. return new ProxyAndInfo<T>(proxy, dtService);
  14. }

block02戳我

转载于:https://my.oschina.net/u/2503731/blog/663705

发表评论

表情:
评论列表 (有 0 条评论,313人围观)

还没有评论,来说两句吧...

相关阅读