文本处理中的常用预处理操作-文件篇

这些年也做过不少的NLP任务,在做各种不同的NLP任务时,需要用到不同的数据处理方法,但有些方法是较为通用的。

这里把一些较为常用的NLP预处理方式做一下总结。

有些是自己写的,有些是从公开代码库借鉴。代码已上传到github上的nlp-base库

本篇主要总结用到的关于文件层次的操作,后续会继续补充

导入需要用到的包

1
2
3
4
5
6
import subprocess
import logging
import sys
import re
import os
import numpy as np

文件操作相关

读取文件

这里提供读取普通文件、停用词文件、读取idf文件、读取word2vec的文本文件的操作。word2vec文件的读取也可以选则使用gensim来读,但是操作就没那么随心了。

读取普通文件

1
2
3
4
5
def read_file(file_input):
"""读取文件,不做任何处理"""
with open(file_input, encoding="utf-8", errors="ignore") as fin:
content = [line.strip() for line in fin]
return content

读取停用词文件

也可以是其他词表之类的单个词为一行,且需要去重的文件

1
2
3
4
def load_dic(file_input):
"""读取文件,返回set"""
with open(file_input, encoding="utf-8", errors="ignore") as fin:
return set([i.strip() for i in fin])

读取idf文件

格式如下:

word idf

凳子 8.36728816325

1
2
3
4
5
6
7
8
def load_idf(file_input):
"""读取idf词典"""
words_idf = {}
with open(file_input, encoding='utf-8', errors="ignore") as fin:
for i in fin:
word, idf = i.strip().split()
words_idf[word] = float(idf)
return words_idf

读取词向量文件

格式如下:一般词向量文件第一行会显示info

word vec

的 1 2 3 4 5 6 7 …

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def load_word2vec(vec_file, word_dim):
"读取词向量文件"
"param:vec_file:词向量文件路径"
"word_dim:向量维度"
"return:词与array类型向量的字典"
word2vec = {}
with open(vec_file, encoding='utf-8', errors="ignore") as fin:
fin.readline()
for i in fin:
line = i.strip().split(" ",1)
if len(line) != 2:
logging.error("vec file maybe err, please check!")
continue
word, vec = line[0], line[1]
try:
word2vec[word] = np.array([float(i) for i in vec.split()]).reshape(1, word_dim)
except Exception as e:
raise ValueError("vec size is not equal %s, error is %s"%(word_dim,e))
return word2vec

获取文件行数

1
2
3
4
5
# 这里使用wc指令来获取行数,不需要对文件进行再次的open
def get_file_len(file_in):
"通过wc指令获取文件行数"
result = subprocess.run(["wc", "-l", file_in], capture_output=True)
return int(result.stdout.strip().split()[0])

分割文件

当文件过大时,一次性不能将全部文件装入内存,尤其在做大模型的Embedding操作预处理时。

我们需要将文件切割为小文件。这里提供两种切割方法:按照子文件个数或行数进行切割

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def split_by_file_num(file_in, num=10, line_num="100", mem="5k", method="num", need_remaining=False):
"按照文件数分割大文件为小文件"
"param:"
"file_in:输入的大文件"
"n:分割为n个小文件"
"each_file_num:按照行数分割时,每个文件的行数"
"need_remaining:末尾不足平均行数的文本是否存为第n+1个文件"
"method:分割方式"
" --num:按照文件个数分割--mem:按照文件大小分割--line_no:按照行数分割"

"注意:"
"这里把输出的文件直接放在原路径下,文件名和原文件名相同并加后缀。"
"如果要改输出可以自己改改代码或者提issue我有时间改改"
"按照num分割文件速度比较慢,后面有时间优化下速度"
try:
n = int(num)
line_num = str(int(line_num))
except Exception as e:
raise ValueError(f"n or line_num must be a number, please check it!")

if file_in.startswith("."):
prefix = "."+"".join(file_in.split(".")[:-1])
else:
prefix = "".join(file_in.split(".")[:-1])

logging.info("file prefix is ", prefix)

len_file = get_file_len(file_in)

if method == "line_num": # 按照行数进行分割,这里使用shell的split函数,分割后的文件会以双字母结尾
subprocess.run(["split", "-l", line_num, file_in, prefix], capture_output=True)
elif method == "mem": # 按照大小切割,这里使用shell的split函数,分割后的文件会以双字母结尾
subprocess.run(["split", "-b", mem, file_in, prefix], capture_output=True)
elif method == "num": # 按照文件个数进行分割
each_file_len = len_file // n # 每个文件行数
idx = 0
all_text, sub_text = [], []

with open(file_in, encoding='utf-8') as fin:
for line_num, line_text in enumerate(fin):
sub_text.append(line_text)
if line_num == each_file_len * (idx + 1):
idx += 1
all_text.append(sub_text)
sub_text = []
logging.warning(f"file {idx} append success")

if need_remaining:
all_text.append(sub_text) # 多余的不足平均行数的文本
logging.warning(f"file {idx} append success")

for idx, text in enumerate(all_text):
write_file = f"{prefix}_{idx+1}.txt"
with open(write_file, 'w', encoding='utf-8') as fout:
for i in text:
fout.write(i)
else:
raise ValueError("please input right method one of num,line_num,mem")

合并文件

有时候会遇到需要将某个文件夹下的文件合并为一个大文件的需求。

这里提供递归读取合并和只合并文件夹下单层目录的两种选择,主要使用的函数是os下的walk函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def merge_and_write(base_dir, write_path, is_dfs=False):
"""将一个文件夹下的多个文件写入一个文件
param:
base_dir:原文件路径
write_dir:写入文件路径
if_dfs:是否递归写入
true:将文件夹内所有文件夹下的文件全部写入
false:只写入该文件夹下的文件,不写子文件夹下的文件
return:
None"""
with open(write_path, 'w', encoding='utf-8') as fout:
for dirpath, dirname, filenames in os.walk(base_dir):
for sub_file in filenames:
file_path = os.path.join(dirpath, sub_file)
for i in read_file(file_path):
fout.write(f"{i}\n")
logging.info(f"writing file {file_path} success!!!")
if is_dfs:
break