调用JAVA API 对 HDFS 进行文件的读取、写入、上传、下载、删除等操作

title: 调用JAVA API 对 HDFS 进行文件的读取、写入、上传、下载、删除等操作
categories: java HADOOP
tags:

  • java HADOOP
    abbrlink: 18170
    date: 2018-02-22 00:00:00

[ 调用JAVA API 对 HDFS 进行文件的读取、写入、上传、下载、删除等操作

](/qq_37279279/article/details/79291461)

Hadoop文件系统
基本的文件系统命令操作, 通过hadoop fs -help可以获取所有的命令的详细帮助文件。

Java抽象类org.apache.hadoop.fs.FileSystem定义了hadoop的一个文件系统接口。该类是一个抽象类,通过以下两种静态工厂方法可以过去FileSystem实例:
public static FileSystem.get(Configuration conf) throws IOException
public static FileSystem.get(URI uri, Configuration conf) throws IOException

具体方法实现:
1、public boolean mkdirs(Path f) throws IOException
一次性新建所有目录(包括父目录), f是完整的目录路径。

2、public FSOutputStream create(Path f) throws IOException
创建指定path对象的一个文件,返回一个用于写入数据的输出流
create()有多个重载版本,允许我们指定是否强制覆盖已有的文件、文件备份数量、写入文件缓冲区大小、文件块大小以及文件权限。

3、public boolean copyFromLocal(Path src, Path dst) throws IOException
将本地文件拷贝到文件系统

4、public boolean exists(Path f) throws IOException
检查文件或目录是否存在

5、public boolean delete(Path f, Boolean recursive)
永久性删除指定的文件或目录,如果f是一个空目录或者文件,那么recursive的值就会被忽略。只有recursive=true时,一个非空目录及其内容才会被删除。

6、FileStatus类封装了文件系统中文件和目录的元数据,包括文件长度、块大小、备份、修改时间、所有者以及权限信息。

[java] view plain copy

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.*;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.io.IOUtils;
    1. import java.io.*;
  5. import java.net.URI;
  6. import java.net.URISyntaxException;
  7. import java.util.ArrayList;
  8. import java.util.List;
    1. public class HdfsOperate {
  9. ArrayList hdfsfiles;
    1. public HdfsOperate() {
  10. this .hdfsfiles = new ArrayList();
  11. }
    1. /**
    • 获取hdfs路径下的文件列表
  12. *
    • @param srcpath
    • @return
  13. */
  14. public String[] getFileList(String srcpath) {
  15. try {
  16. Configuration conf = new Configuration();
  17. Path path = new Path(srcpath);
  18. FileSystem fs = path.getFileSystem(conf);
  19. List files = new ArrayList();
  20. if (fs.exists(path) && fs.isDirectory(path)) {
  21. for (FileStatus status : fs.listStatus(path)) {
  22. files.add(status.getPath().toString());
  23. }
  24. }
  25. //fs.close();
  26. return files.toArray( new String[]{});
  27. } catch (IOException e) {
  28. } catch (Exception e) {
  29. }
  30. return null ;
  31. }
    1. /**
    • 给定文件名和文件内容,创建hdfs文件
  32. *
    • @param dst
    • @param contents
    • @throws IOException
  33. */
  34. public void createFile(String dst, byte [] contents)
  35. throws IOException {
  36. Configuration conf = new Configuration();
  37. Path dstPath = new Path(dst);
  38. FileSystem fs = dstPath.getFileSystem(conf);
    1. FSDataOutputStream outputStream = fs.create(dstPath);
  39. outputStream.write(contents);
  40. outputStream.close();
  41. System.out.println( “create file “ + dst + “ success!” );
  42. //fs.close();
  43. }
    1. /**
    • 删除hdfs文件
  44. *
    • @param filePath
    • @throws IOException
  45. */
  46. public void delete(String filePath) throws IOException {
  47. Configuration conf = new Configuration();
  48. Path path = new Path(filePath);
  49. FileSystem fs = path.getFileSystem(conf);
    1. boolean isok = fs.deleteOnExit(path);
  50. if (isok) {
  51. System.out.println( “delete file “ + filePath + “ success!” );
  52. } else {
  53. System.out.println( “delete file “ + filePath + “ failure” );
  54. }
  55. //fs.close();
  56. }
    1. /**
    • 创建hdfs目录
  57. *
    • @param path
    • @throws IOException
  58. */
  59. public void mkdir(String path) throws IOException {
  60. Configuration conf = new Configuration();
  61. Path srcPath = new Path(path);
  62. FileSystem fs = srcPath.getFileSystem(conf);
    1. boolean isok = fs.mkdirs(srcPath);
  63. if (isok) {
  64. System.out.println( “create dir ok!” );
  65. } else {
  66. System.out.println( “create dir failure” );
  67. }
  68. //fs.close();
  69. }
    1. /**
    • 读取hdfs文件内容,并在控制台打印出来
  70. *
    • @param filePath
    • @throws IOException
  71. */
  72. public void readFile(String filePath) throws IOException {
  73. Configuration conf = new Configuration();
  74. Path srcPath = new Path(filePath);
  75. FileSystem fs = null ;
  76. URI uri;
  77. try {
  78. uri = new URI(filePath);
  79. fs = FileSystem.get(uri, conf);
  80. } catch (URISyntaxException e) {
  81. e.printStackTrace();
  82. }
  83. InputStream in = null ;
  84. try {
  85. in = fs.open(srcPath);
  86. IOUtils.copyBytes(in, System.out, 4096 , false );
  87. } finally {
  88. IOUtils.closeStream(in);
  89. }
  90. }
    1. /**
    • 下载hdfs文件到本地目录
  91. *
    • @param dstPath
    • @param srcPath
    • @throws Exception
  92. */
  93. public void downloadFile(String dstPath, String srcPath) throws Exception {
  94. Path path = new Path(srcPath);
  95. Configuration conf = new Configuration();
  96. FileSystem hdfs = path.getFileSystem(conf);
    1. File rootfile = new File(dstPath);
  97. if (!rootfile.exists()) {
  98. rootfile.mkdirs();
  99. }
    1. if (hdfs.isFile(path)) {
  100. //只下载非txt文件
  101. String fileName = path.getName();
  102. if (!fileName.toLowerCase().endsWith( “txt” )) {
  103. FSDataInputStream in = null ;
  104. FileOutputStream out = null ;
  105. try {
  106. in = hdfs.open(path);
  107. File srcfile = new File(rootfile, path.getName());
  108. if (!srcfile.exists()) srcfile.createNewFile();
  109. out = new FileOutputStream(srcfile);
  110. IOUtils.copyBytes(in, out, 4096 , false );
  111. } finally {
  112. IOUtils.closeStream(in);
  113. IOUtils.closeStream(out);
  114. }
  115. //下载完后,在hdfs上将原文件删除
  116. this .delete(path.toString());
  117. }
  118. } else if (hdfs.isDirectory(path)) {
  119. File dstDir = new File(dstPath);
  120. if (!dstDir.exists()) {
  121. dstDir.mkdirs();
  122. }
  123. //在本地目录上加一层子目录
  124. String filePath = path.toString(); //目录
  125. String subPath[] = filePath.split( “/“ );
  126. String newdstPath = dstPath + subPath[subPath.length - 1 ] + “/“ ;
  127. System.out.println( “newdstPath=======” + newdstPath);
  128. if (hdfs.exists(path) && hdfs.isDirectory(path)) {
  129. FileStatus[] srcFileStatus = hdfs.listStatus(path);
  130. if (srcFileStatus != null ) {
  131. for (FileStatus status : hdfs.listStatus(path)) {
  132. //下载子目录下文件
  133. downloadFile(newdstPath, status.getPath().toString());
  134. }
  135. }
  136. }
  137. }
  138. }
    1. /**
    • 下载hdfs文件内容,保存到内存对象中
  139. *
    • @param srcPath
    • @throws Exception
  140. */
  141. public void downloadFileByte(String srcPath) throws Exception {
  142. Path path = new Path(srcPath);
  143. FileSystem hdfs = null ;
  144. Configuration conf = new Configuration();
  145. hdfs = FileSystem.get(URI.create(srcPath), conf);
  146. if (hdfs.exists(path)) {
  147. if (hdfs.isFile(path)) {
  148. //如果是文件
  149. FSDataInputStream in = null ;
  150. FileOutputStream out = null ;
  151. try {
  152. in = hdfs.open( new Path(srcPath));
  153. byte [] t = new byte [in.available()];
  154. in.read(t);
  155. hdfsfiles.add( new HdfsFile(path.getName(), srcPath, t));
  156. } finally {
  157. IOUtils.closeStream(in);
  158. IOUtils.closeStream(out);
  159. }
  160. } else {
  161. //如果是目录
  162. FileStatus[] srcFileStatus = hdfs.listStatus( new Path(srcPath));
  163. for ( int i = 0 ; i < srcFileStatus.length; i++) {
  164. String srcFile = srcFileStatus[i].getPath().toString();
  165. downloadFileByte(srcFile);
  166. }
  167. }
  168. }
  169. }
    1. public ArrayList getHdfsfiles() {
  170. return hdfsfiles;
  171. }
    1. /**
    • 将本地目录或文件上传的hdfs
  172. *
    • @param localSrc
    • @param dst
    • @throws Exception
  173. */
  174. public void uploadFile(String localSrc, String dst) throws Exception {
    1. Configuration conf = new Configuration();
  175. File srcFile = new File(localSrc);
  176. if (srcFile.isDirectory()) {
  177. copyDirectory(localSrc, dst, conf);
  178. } else {
  179. copyFile(localSrc, dst, conf);
  180. }
  181. }
    1. /**
    • 拷贝本地文件hdfs目录下
  182. *
    • @param localSrc
    • @param dst
    • @param conf
    • @return
    • @throws Exception
  183. */
  184. private boolean copyFile(String localSrc, String dst, Configuration conf) throws Exception {
  185. File file = new File(localSrc);
  186. dst = dst + file.getName();
  187. Path path = new Path(dst);
  188. FileSystem fs = path.getFileSystem(conf); //FileSystem.get(conf);
  189. fs.exists(path);
  190. InputStream in = new BufferedInputStream( new FileInputStream(file));
  191. OutputStream out = fs.create( new Path(dst));
  192. IOUtils.copyBytes(in, out, 4096 , true );
  193. in.close();
  194. return true ;
  195. }
    1. /**
    • 拷贝本地目录到hdfs
    • @param src
    • @param dst
    • @param conf
    • @return
    • @throws Exception
  196. */
  197. private boolean copyDirectory(String src, String dst, Configuration conf) throws Exception {
  198. Path path = new Path(dst);
  199. FileSystem fs = path.getFileSystem(conf);
  200. if (!fs.exists(path)) {
  201. fs.mkdirs(path);
  202. }
  203. File file = new File(src);
    1. File[] files = file.listFiles();
  204. for ( int i = 0 ; i < files.length; i++) {
  205. File f = files[i];
  206. if (f.isDirectory()) {
  207. String fname = f.getName();
  208. if (dst.endsWith( “/“ )) {
  209. copyDirectory(f.getPath(), dst + fname + “/“ , conf);
  210. } else {
  211. copyDirectory(f.getPath(), dst + “/“ + fname + “/“ , conf);
  212. }
  213. } else {
  214. copyFile(f.getPath(), dst, conf);
  215. }
  216. }
  217. return true ;
  218. }
  219. }
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2018-2020 丁振莹
  • 访问人数: | 浏览次数:

你的每一分支持,是我努力下去的最大的力量 ٩(๑❛ᴗ❛๑)۶

支付宝
微信