"""
dataframe 存入HDFS方法:
data_frame: 需要存入的dataframe
hdfs_dir: HDFS侧存储文件的文件夹路径
result: HDFS文件夹内生成若干个0_******.csv小文件,sep="|",每个10w行
"""
def write_dataframe_to_hdfs(data_frame, hdfs_dir):
# chunk_size分隔后小文件行数
chunk_size = 100000
user_length = len(data_frame)
chunk_num = math.ceil(user_length / chunk_size)
pool = Pool(10)
for i in range(chunk_num):
print("processing: ", i)
tmp_frame = data_frame[i * chunk_size:(i + 1) * chunk_size]
if i == (chunk_num - 1):
tmp_frame = data_frame[i * chunk_size:user_length]
hdfs_path = os.path.join(hdfs_dir, f"0_{str(i).zfill(6)}.csv")
print("hdfs_path1: ", hdfs_path)
pool.apply_async(frame_to_hdfs, (tmp_frame, hdfs_path))
pool.close()
pool.join()