import email
from email import policy
from email.parser import BytesParser
import os
import secrets
def generate_unique_id():
# 获取当前时间的精确到秒的字符串表示
current_time = datetime.now().strftime('%y%m%d%H%M%S')
# 生成一个6位的随机数字字符串
# 使用secrets.choice以获得更高的安全性
random_part = ''.join(secrets.choice('0123456789') for _ in range(6))
# 组合时间与随机数
unique_id = f"{current_time}{random_part}"
return unique_id
def download_attachments(eml_file_path,filenames,output_folder):
# 读取.eml文件
if os.path.exists(eml_file_path):
with open(eml_file_path, 'rb') as file:
msg = BytesParser(policy=policy.default).parse(file)
# 遍历邮件的所有部分
for part in msg.iter_parts():
# 检查是否为附件
if part.get_content_disposition() == 'attachment':
filename = part.get_filename()
print(f"filename{filename}")
if filename:
# 解码文件名(如果需要)
dh = email.header.decode_header(filename)
if isinstance(dh[0][0], bytes):
filename = dh[0][0].decode(dh[0][1]
if dh[0][1]
else 'utf-8')
# 创建附件保存路径
random_number = ''.generate_unique_id()+".pdf"
attachment_path = os.path.join(output_folder, random_number)
print(f"创建附件保存路径{attachment_path}")
# 下载附件
with open(attachment_path, 'wb') as f:
f.write(part.get_payload(decode=True))
print(f"附件已保存: {attachment_path}")
def batch_extract_eml_attachments(eml_folder, output_folder):
"""批量处理文件夹中的所有 .eml 文件"""
print(output_folder)
if not os.path.exists(output_folder):
os.makedirs(output_folder)
for filename in os.listdir(eml_folder):
if filename.lower().endswith(".eml"):
eml_path = os.path.join(eml_folder, filename)
print(f"\n处理文件: {filename}")
download_attachments(eml_path,filename,output_folder)
if __name__ == "__main__":
# 配置路径
eml_folder = "/media/ai/8E5C87DF5C87C087/4.30/1" # 存放 .eml 文件的夹
output_folder = "./attachments" # 附件输出目录
# 执行批量提取
batch_extract_eml_attachments(eml_folder, output_folder)
print("\n所有附件提取完成!")