本文共 7209 字,大约阅读时间需要 24 分钟。
hadoop版本: 2.6.5
Java抽象类org.apache.hadoop.fs.FileSystem定义了hadoop的一个文件系统接口。该类是一个抽象类,通过以下两种静态工厂方法可以过去FileSystem实例:
public static FileSystem.get(Configuration conf) throws IOException public static FileSystem.get(URI uri, Configuration conf) throws IOException
常见方法介绍:
public boolean mkdirs(Path f) throws IOException
一次性新建所有目录(包括父目录), f是完整的目录路径。 public FSOutputStream create(Path f) throws IOException
public boolean copyFromLocal(Path src, Path dst) throws IOException
public boolean exists(Path f) throws IOException
public boolean delete(Path f, Boolean recursive)
FileStatus类封装了文件系统中文件和目录的元数据,包括文件长度、块大小、备份、修改时间、所有者以及权限信息。
package com.tongfang.learn.hdfs;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.util.Progressable;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.io.*;import java.net.URI;public class HdfsTest { private static final String HDFS_URL = "hdfs://192.168.1.160:9000"; FileSystem fileSystem = null; Configuration configuration = null; /** * 创建hdfs目录 * @throws IOException */ @Test public void testMkdir() throws IOException { fileSystem.mkdirs(new Path("/hdfs/test")); } /** * 创建文件 * @throws IOException */ @Test public void testCreateFile() throws IOException { FSDataOutputStream out = fileSystem.create(new Path("/hdfs/test/a.txt")); out.write("hello, I am a file !!!".getBytes()); out.flush(); out.close(); } /** * 追加文件 * @throws IOException */ @Test public void testAppendFile() throws IOException { FSDataOutputStream out = fileSystem.append(new Path("/hdfs/test/a.txt")); out.write("append to file".getBytes()); out.flush(); out.close(); } /** * 查看文件内容 * @throws IOException */ @Test public void testCatFile() throws IOException { FSDataInputStream in = fileSystem.open(new Path("/hdfs/test/a.txt")); IOUtils.copyBytes(in, System.out, 1024); in.close(); } /** * 重命名文件 * @throws IOException */ @Test public void testRename() throws IOException { Path oldPath = new Path("/hdfs/test/a.txt"); Path newPath = new Path("/hdfs/test/b.txt"); fileSystem.rename(oldPath, newPath); } /** * 删除文件 */ @Test public void testDelete() throws IOException { fileSystem.delete(new Path("/hdfs/test"), true); } /** * 上传本地文件到HDFS * @throws IOException */ @Test public void testCopyFromLocal() throws IOException { Path localPath = new Path("D:\\input.txt"); Path hdfsPath = new Path("/hdfs/input.txt"); fileSystem.copyFromLocalFile(localPath, hdfsPath); } @Test public void testCopyFromLocalWithStream() throws IOException { InputStream in = new BufferedInputStream(new FileInputStream("D:\\input.zip")); FSDataOutputStream output = fileSystem.create(new Path("/hdfs/input.zip"), new Progressable() { @Override public void progress() { System.out.print("."); } }); IOUtils.copyBytes(in, output, 4096); } /** * 下载HDFS文件到本地 * @throws IOException */ @Test public void testCopyToLocal() throws IOException { Path localPath = new Path("D:\\local.txt"); Path hdfsPath = new Path("/hdfs/input.txt"); fileSystem.copyToLocalFile(false, hdfsPath, localPath, true); } /** * 下载Hdfs文件到本地 */ @Test public void testCopyToLocalWithStream() throws IOException { InputStream input = fileSystem.open(new Path("/hdfs/input.zip")); OutputStream output = new FileOutputStream("D:\\input.zip"); IOUtils.copyBytes(input, output, 4096, true); } /** * 查看某个目录下的所有文件 * @throws IOException */ @Test public void testListFiles() throws IOException { FileStatus[] statuses = fileSystem.listStatus(new Path("/hdfs")); for (FileStatus status : statuses) { String isDir = status.isDirectory() ? "目录" : "文件"; short replication = status.getReplication(); long len = status.getLen(); String path = status.getPath().toString(); long modtime = status.getModificationTime(); System.out.println(isDir + " " + replication + " " + len + " " + path + " " + modtime); } } /** * 获取某个文件各个块在集群主机的分布信息 * @throws IOException */ @Test public void testFileBlockLocation() throws IOException { FileStatus status = fileSystem.getFileStatus(new Path("/hdfs/input.zip")); BlockLocation[] locations = fileSystem.getFileBlockLocations(status, 0, status.getLen()); for (BlockLocation location: locations) { String[] hosts = location.getHosts(); String allHosts = ""; for (String host : hosts) { allHosts += host; allHosts += " "; } System.out.println("block: " + location + " hosts: " + allHosts); } } @Before public void setUp() throws Exception { configuration = new Configuration(); //必须添加这两个配置,否则可能导致append文件失败 configuration.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER"); configuration.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true"); fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, "hadoop"); System.out.println("testcase setup"); } @After public void tearDown() { configuration = null; fileSystem = null; System.out.println("testcase teardown"); }}
pom.xml:
4.0.0 com.tongfang.learn learn 1.0-SNAPSHOT learn http://www.example.com UTF-8 1.7 1.7 2.6.5 junit junit 4.11 test org.apache.hadoop hadoop-client ${hadoop.version} maven-clean-plugin 3.0.0 maven-resources-plugin 3.0.2 maven-compiler-plugin 3.7.0 maven-surefire-plugin 2.20.1 maven-jar-plugin 3.0.2 maven-install-plugin 2.5.2 maven-deploy-plugin 2.8.2
转载地址:http://fwjmb.baihongyu.com/