1、安装
执行如下命令即可安装
pip install pyhdfs
注意,在windows上对虚拟机上的HDFS进行远程操作时,需要在本机的hosts文件中填写ip和主机名的映射关系如这里:
2、pyhdfs与HDFS常用的交互操作
- 在HDFS上指定目录下创建一个文件夹,然后查看此文件夹是否存在
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
fs.mkdirs("/test_01")
file_or_dirs=fs.listdir("/")
print(file_or_dirs)
执行结果为:
['test_01']
- 在HDFS上获取用户的根目录
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
home_dir=fs.get_home_directory()
print(home_dir)
执行结果如下:
/user/root
- 获取可用的节点
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
active_nodes=fs.get_active_namenode()
print(active_nodes)
执行结果如下:这里因为是单机版环境
192.168.1.204:50070
- 在HDFS上创建一个文件,并写入内容:
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
all_files = fs.listdir("/")
print(all_files)
fs.create("/demo_01.txt",b"hello world",override=True)
all_files=fs.listdir("/")
print(all_files)
执行结果如下:
['test_01']
['demo_01.txt', 'test_01']
- 删除文件
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
all_files = fs.listdir("/")
print(all_files)
fs.delete("/demo_01.txt")
all_files=fs.listdir("/")
print(all_files)
执行结果如下:
['demo_01.txt', 'test_01']
['test_01']
- 查看文件是否存在
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
print(fs.exists("/demo_01.txt"))
all_files = fs.listdir("/")
print(all_files)
fs.create("/demo_01.txt",b"hello world")
print(fs.exists("/demo_01.txt"))
all_files=fs.listdir("/")
print(all_files)
执行结果如下:
False
['test_01']
True
['demo_01.txt', 'test_01']
- 读取文件内容:
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
f=fs.open("/demo_01.txt")
ctx=f.read().decode("utf-8")
print(ctx)
f.close()
执行内容如下:
hello world
- 向文件中增加内容
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
f=fs.open("/demo_01.txt")
ctx=f.read().decode("utf-8")
print(ctx)
f.close()
fs.append("/demo_01.txt",b"\nhello hadoop!")
print("-------------------------")
f = fs.open("/demo_01.txt")
ctx = f.read().decode("utf-8")
print(ctx)
f.close()
执行结果如下:
hello world
hello hadoop!
-------------------------
hello world
hello hadoop!
hello hadoop!
- 查看文件属性
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
status=fs.get_file_status("/demo_01.txt")
print(status)
执行结果如下:
FileStatus(accessTime=1623756299241, blockSize=134217728, childrenNum=0, fileId=16389, group='supergroup', length=39, modificationTime=1623756729466, owner='root', pathSuffix='', permission='644', replication=1, storagePolicy=0, type='FILE')
- 查看当前文件的状态
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
status=fs.list_status("/demo_01.txt")
print(status)
执行结果如下:
[FileStatus(accessTime=1623756299241, blockSize=134217728, childrenNum=0, fileId=16389, group='supergroup', length=39, modificationTime=1623756729466, owner='root', pathSuffix='', permission='644', replication=1, storagePolicy=0, type='FILE')]
list_status 和get_file_status的区别是list_status可以查看路径
- 重命名文件或文件夹
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
print(fs.listdir("/"))
fs.rename("/test_01","/test_02")
print(fs.listdir("/"))
执行结果如下:
['demo_01.txt', 'test_01']
['demo_01.txt', 'test_02']
- 将本地文件拷贝至HDFS
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
print(fs.listdir("/"))
fs.copy_from_local("demo_02.txt","/demo_02.txt")
print(fs.listdir("/"))
执行结果如下:
['demo_01.txt', 'test_02']
['demo_01.txt', 'demo_02.txt', 'test_02']
- 将HDFS上文件拷贝到本地
import pyhdfs
import os
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
print(os.listdir("."))
fs.copy_to_local("/demo_01.txt","demo_01.txt")
print(os.listdir("."))
执行结果如下:
['app1', 'db.sqlite3', 'demo_02.txt', 'manage.py', 'mysite1', 'static', 'templates', 'test.py']
['app1', 'db.sqlite3', 'demo_01.txt', 'demo_02.txt', 'manage.py', 'mysite1', 'static', 'templates', 'test.py']
- 获取路径的总览信息(目录,文件个数等)
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
summary=fs.get_content_summary("/")
print(summary)
执行结果如下:
ContentSummary(directoryCount=2, ecPolicy='', fileCount=2, length=56, quota=9223372036854775807, snapshotDirectoryCount=0, snapshotFileCount=0, snapshotLength=0, snapshotSpaceConsumed=0, spaceConsumed=56, spaceQuota=-1, typeQuota={})
- 查看文件的校验和
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
check_sum=fs.get_file_checksum("/demo_01.txt")
print(check_sum)
执行结果如下:
FileChecksum(algorithm='MD5-of-0MD5-of-512CRC32C', bytes='00000200000000000000000036e110bcf9265d7f5fdd153161b162d800000000', length=28)
- 设置文件所有者
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
print(fs.get_file_status("/demo_01.txt"))
fs.set_owner("/demo_01.txt",owner="hdfs")
print(fs.get_file_status("/demo_01.txt"))
执行结果如下:
FileStatus(accessTime=1623761641609, blockSize=134217728, childrenNum=0, fileId=16389, group='supergroup', length=39, modificationTime=1623756729466, owner='root', pathSuffix='', permission='644', replication=1, storagePolicy=0, type='FILE')
FileStatus(accessTime=1623761641609, blockSize=134217728, childrenNum=0, fileId=16389, group='supergroup', length=39, modificationTime=1623756729466, owner='hdfs', pathSuffix='', permission='644', replication=1, storagePolicy=0, type='FILE')
- 设置文件的副本数
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
print(fs.get_file_status("/demo_01.txt"))
fs.set_replication("/demo_01.txt",replication=5)
print(fs.get_file_status("/demo_01.txt"))
执行结果如下:
FileStatus(accessTime=1623761641609, blockSize=134217728, childrenNum=0, fileId=16389, group='supergroup', length=39, modificationTime=1623756729466, owner='root', pathSuffix='', permission='644', replication=1, storagePolicy=0, type='FILE')
FileStatus(accessTime=1623761641609, blockSize=134217728, childrenNum=0, fileId=16389, group='supergroup', length=39, modificationTime=1623756729466, owner='root', pathSuffix='', permission='644', replication=5, storagePolicy=0, type='FILE')
- 设置文件的修改时间和访问时间,修改时间和访问时间均为长整形,指从1970年1月1日开始的milliseconds
import pyhdfs
if __name__=="__main__":
fs=pyhdfs.HdfsClient(hosts="192.168.1.204:50070",user_name="root")
print(fs.get_file_status("/demo_01.txt"))
fs.set_times("/demo_01.txt",modificationTime=1623756729499,accessTime=1623761641699)
print(fs.get_file_status("/demo_01.txt"))
执行结果如下:
FileStatus(accessTime=1623761641609, blockSize=134217728, childrenNum=0, fileId=16389, group='supergroup', length=39, modificationTime=1623756729466, owner='root', pathSuffix='', permission='644', replication=5, storagePolicy=0, type='FILE')
FileStatus(accessTime=1623761641699, blockSize=134217728, childrenNum=0, fileId=16389, group='supergroup', length=39, modificationTime=1623756729499, owner='root', pathSuffix='', permission='644', replication=5, storagePolicy=0, type='FILE')
以上基本是pyhdfs的全部操作