Filesystem closed报错问题处理

左手的ㄟ右手 2024-03-27 11:33 160阅读 0赞

使用HDFS的时候

  1. final Configuration conf = new Configuration();
  2. final FileSystem fs = FileSystem.get(URI.create(hdfsFile), conf);
  3. final Path path = new Path(hdfsFile);
  4. if (fs.exists(path)) {
  5. final FSDataInputStream is = fs.open(path);
  6. final FileStatus stat = fs.getFileStatus(path);
  7. final byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
  8. is.readFully(0, buffer);
  9. is.close();
  10. fs.close();
  11. return buffer;
  12. } else {
  13. throw new Exception("the file is not found .");
  14. }

在高并发情况下会报错:

5fc4e797bc944482897414aa0b8db90d.png

  1. java.io.IOException: Failed on local exception: java.io.InterruptedIOException: Interrupted while waiting for IO on channel java.nio.channels.SocketChannel[connected local=/10.16.3.2:52305 remote=/10.16.3.2:59000]. 60000 millis timeout left.; Host Details : local host is: "hadoop-test/10.16.3.2"; destination host is: "hadoop-alone-test":59000;
  2. at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
  3. at org.apache.hadoop.ipc.Client.call(Client.java:1479)
  4. at org.apache.hadoop.ipc.Client.call(Client.java:1412)
  5. at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
  6. at com.sun.proxy.$Proxy109.getListing(Unknown Source)

问题原因为:多线程访问问题,线程A、B同时获取filesystem后使用,线程B使用完后调用了filesystem.close()方法,这个时候线程A还在操作filesystem,所以报错上面种种异常

二、解决办法

禁用FileSystem缓存

  1. Configuration conf = new Configuration();
  2. conf.set("fs.hdfs.impl.disable.cache", "true");

三、问题原因

FileSystem.get源码分析

那么明明使用了两个集群,为什么会使用到Cache呢,分析FileSystem.get源码便知道原因了

  1. public static FileSystem get(URI uri, Configuration conf) throws IOException {
  2. String scheme = uri.getScheme();
  3. String authority = uri.getAuthority();
  4. if (scheme == null && authority == null) { // use default FS
  5. return get(conf);
  6. }
  7. if (scheme != null && authority == null) { // no authority
  8. URI defaultUri = getDefaultUri(conf);
  9. if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
  10. && defaultUri.getAuthority() != null) { // & default has authority
  11. return get(defaultUri, conf); // return default
  12. }
  13. }
  14. String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
  15. if (conf.getBoolean(disableCacheName, false)) {
  16. LOGGER.debug("Bypassing cache to create filesystem {}", uri);
  17. return createFileSystem(uri, conf);
  18. }
  19. return CACHE.get(uri, conf);
  20. }

应用在获取FileSystem时,提供了完整的hdfs目录,同时没有设置fs.hdfs.impl.disable.cache为true,所以创建slave集群的filesystem对象时,会使用CACHE.get(uri, conf)获取,Cache内部使用一个HashMap来维护filesystem对象,很容易想到,当HashMap的key相同时,便返回了同一个filesystem对象,那么Cache中的key是什么样的呢,代码如下:

  1. FileSystem get(URI uri, Configuration conf) throws IOException{
  2. Key key = new Key(uri, conf);
  3. return getInternal(uri, conf, key);
  4. }
  5. static class Key {
  6. final String scheme;
  7. final String authority;
  8. final UserGroupInformation ugi;
  9. final long unique; // an artificial way to make a key unique
  10. Key(URI uri, Configuration conf) throws IOException {
  11. this(uri, conf, 0);
  12. }
  13. Key(URI uri, Configuration conf, long unique) throws IOException {
  14. scheme = uri.getScheme()==null ?
  15. "" : StringUtils.toLowerCase(uri.getScheme());
  16. authority = uri.getAuthority()==null ?
  17. "" : StringUtils.toLowerCase(uri.getAuthority());
  18. this.unique = unique;
  19. this.ugi = UserGroupInformation.getCurrentUser();
  20. }
  21. @Override
  22. public int hashCode() {
  23. return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
  24. }
  25. static boolean isEqual(Object a, Object b) {
  26. return a == b || (a != null && a.equals(b));
  27. }
  28. @Override
  29. public boolean equals(Object obj) {
  30. if (obj == this) {
  31. return true;
  32. }
  33. if (obj instanceof Key) {
  34. Key that = (Key)obj;
  35. return isEqual(this.scheme, that.scheme)
  36. && isEqual(this.authority, that.authority)
  37. && isEqual(this.ugi, that.ugi)
  38. && (this.unique == that.unique);
  39. }
  40. return false;
  41. }
  42. @Override
  43. public String toString() {
  44. return "("+ugi.toString() + ")@" + scheme + "://" + authority;
  45. }
  46. }
  47. }

可以看到Key由四个要素构成,其中前2个跟URI相关,我们使用的为一个hdfs://nameservice1,ugi为安全认证的用户,使用的是同一个,unique为0,因此Key相同,第二次获取filesystem对象时,直接返回了第一次创建的filesystem对象,最终造成了应用虽然使用了不同的集群配置文件,但最中获取的是同一个filesystem对象。

解决

fs.hdfs.impl.disable.cache参数本身不建议修改,修改集群的fs.defaultFS,使不同集群的fs.defaultFS不一样

参考:

多个HDFS集群的fs.defaultFS配置一样,造成应用一直连接同一个集群的问题分析 - 远去的列车 - 博客园
Filesystem closed错误排查 - 简书

java.io.IOException: Filesystem closed - 简书

java.io.IOException: Filesystem closed_bitcarmanlee的博客-CSDN博客_filesystem closed

发表评论

表情:
评论列表 (有 0 条评论,160人围观)

还没有评论,来说两句吧...

相关阅读