import asyncio import imaplib import email import random import socket import string import time from email.header import decode_header from datetime import timezone, timedelta import email.utils import aiohttp import socks import requests import smtplib from email.mime.text import MIMEText from email.header import Header from functools import wraps from loguru import logger def retry(max_retries: int = 3, delay: float = 1.0, backoff: float = 1.0): """ 通用重试装饰器 :param max_retries: 最大重试次数 :param delay: 每次重试的初始延迟(秒) :param backoff: 每次重试延迟的递增倍数 """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): retries = 0 current_delay = delay while retries < max_retries: try: return func(*args, **kwargs) except Exception as e: retries += 1 if retries >= max_retries: logger.warning(f"函数 {func.__name__} 在尝试了 {max_retries} 次后失败,错误信息: {e}") return None # 重试次数用尽后返回 None logger.warning(f"正在重试 {func.__name__} {retries + 1}/{max_retries} 因错误: {e}") time.sleep(current_delay) current_delay *= backoff return None # 三次重试仍未成功,返回 None return wrapper return decorator def async_retry(max_retries: int = 3, delay: float = 1.0, backoff: float = 1.0): """ 支持异步函数的通用重试装饰器 :param max_retries: 最大重试次数 :param delay: 每次重试的初始延迟(秒) :param backoff: 每次重试延迟的递增倍数 """ def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): retries = 0 current_delay = delay while retries < max_retries: try: return await func(*args, **kwargs) # 直接执行原始方法 except Exception as e: retries += 1 if retries >= max_retries: logger.warning(f"函数 {func.__name__} 在尝试了 {max_retries} 次后失败,错误信息: {e}") return None # 重试次数用尽后返回 None logger.warning(f"正在重试 {func.__name__} {retries + 1}/{max_retries} 因错误: {e}") await asyncio.sleep(current_delay) # 异步延迟 current_delay *= backoff # 根据backoff递增延迟 return None # 三次重试仍未成功,返回 None return wrapper return decorator # 域名管理类 - 高内聚低耦合的域名管理方案 class DomainManager: """ 域名管理器 - 统一管理所有邮箱域名相关操作 实现高内聚低耦合的设计原则 """ def __init__(self): # 域名列表 - 只需要在这里添加新域名 self._domains = [ "gmail.com", "qianyouduo.com", "rxybb.com", "cqrxy.vip", "0n.lv", "qianyouduo.com", "ziyouzuan.com", "emaing.online", "emaing.fun", "emaing.asia", "isemaing.site", "emaing.cyou", "emaing.site", "emaing.icu", "emaing.store", "emaing.pw", "emaing.xyz", "qydkjgs.asia", "qydkj.homes", "qydkj.baby", "qydkj.cyou", "qydkjgs.autos", "qydkj.autos", "qydkjgs.cyou", "qydkjgs.homes", "qydgs.asia", "qydkj.asia", "qydgs.cyou", "lulanjing.asia", "lisihan.asia", "mmwan.asia", "xyttan.asia", "zpaily.asia", "youxinzhiguo.asia", "huijinfenmu.asia", "linghao.asia", "cqhc.asia", "huacun.asia", "huachen.asia", "yisabeier.asia", "xinxinr.cyou", "lilisi.asia", "xybbwan.cyou", "zhongjing.cyou", "zprxy.cyou", "cqhuacun.cyou", "huazong.icu", "huacun.cyou" ] def get_domain_by_type(self, mail_type: int) -> str: """ 根据邮箱类型获取域名 :param mail_type: 邮箱类型编号 :return: 对应的域名 """ if 0 <= mail_type < len(self._domains): return self._domains[mail_type] return self._domains[1] # 默认返回 qianyouduo.com def get_domain_type(self, domain: str) -> int: """ 根据域名获取类型编号 :param domain: 域名 :return: 对应的类型编号,如果不存在返回1 """ try: return self._domains.index(domain) except ValueError: return 1 # 默认返回 qianyouduo.com 的类型 def get_imap_server(self, mail_type: int) -> str: """ 根据邮箱类型获取IMAP服务器地址 :param mail_type: 邮箱类型编号 :return: IMAP服务器地址 """ domain = self.get_domain_by_type(mail_type) return f"imap.{domain}" def get_imap_server_by_domain(self, domain: str) -> str: """ 根据域名获取IMAP服务器地址 :param domain: 域名 :return: IMAP服务器地址 """ return f"imap.{domain}" def is_valid_domain(self, domain: str) -> bool: """ 检查域名是否在支持列表中 :param domain: 域名 :return: 是否支持该域名 """ return domain in self._domains def get_all_domains(self) -> list: """ 获取所有支持的域名列表 :return: 域名列表的副本 """ return self._domains.copy() def get_domain_count(self) -> int: """ 获取支持的域名总数 :return: 域名总数 """ return len(self._domains) def get_creatable_domains(self) -> list: """ 获取可用于创建邮箱的域名列表(排除gmail.com) :return: 可创建邮箱的域名列表 """ return [domain for domain in self._domains if domain != "gmail.com"] def get_creatable_domain_by_type(self, mail_type: int) -> str: """ 根据邮箱类型获取可创建的域名(排除gmail.com) :param mail_type: 邮箱类型编号 :return: 对应的域名,如果是gmail.com则返回默认域名 """ domain = self.get_domain_by_type(mail_type) if domain == "gmail.com": return self._domains[1] # 返回qianyouduo.com作为默认 return domain def get_random_creatable_domain(self) -> str: """ 随机获取一个可创建邮箱的域名(排除 gmail.com) 返回值: str: 随机选取的域名 """ creatable = self.get_creatable_domains() if not creatable: raise ValueError("无可用域名用于创建邮箱") return random.choice(creatable) # 邮箱模块 class Mail: def __init__(self): self.domain_manager = DomainManager() self.api_host = 'http://111.10.175.206:5020' def email_account_read(self, pk: int = None, account: str = None, status: bool = None, host: str = None, proxy_account: str = None, parent_account: str = None, order_by: str = None, level: int = None, update_time_start: str = None, update_time_end: str = None, res_count: bool = False, create_time_start: str = None, create_time_end: str = None, page: int = None, limit: int = None) -> dict: """ 读取mail账号 :param level: 邮箱等级(可选) :param status: 状态(可选) :param update_time_start: 更新时间起始(可选) :param update_time_end: 更新时间结束(可选) :param res_count: 返回总数 (可选) :param parent_account: 母邮箱账号 (可选) :param pk: 主键 (可选) :param account: 账号 (可选) :param host: 代理 (可选) :param proxy_account: 代理账号 (可选) :param order_by: 排序方式 (可选) id|create_time|update_time 前面加-表示倒序 :param create_time_start: 创建起始时间 (可选) :param create_time_end: 创建结束时间 (可选) :param page: 页码 (可选) :param limit: 每页数量 (可选) :return: 返回json 成功字段code=200 """ if pk is not None: url = f'{self.api_host}/mail/account/{pk}' return requests.get(url).json() url = f'{self.api_host}/mail/account' data = dict() if account is not None: data['account'] = account if status is not None: data['status'] = status if host is not None: data['host'] = host if proxy_account is not None: data['proxy_account'] = proxy_account if parent_account is not None: data['parent_account'] = parent_account if order_by is not None: data['order_by'] = order_by if level is not None: data['level'] = level if create_time_start is not None: data['create_time_start'] = create_time_start if create_time_end is not None: data['create_time_end'] = create_time_end if update_time_start is not None: data['update_time_start'] = update_time_start if update_time_end is not None: data['update_time_end'] = update_time_end if res_count: data['res_count'] = res_count if page is not None: data['page'] = page if limit is not None: data['limit'] = limit res = requests.get(url, params=data).json() if res.get('code') not in [200, 400, 404]: raise Exception(res) return res # 创建随机邮箱 @retry(max_retries=3, delay=1.0, backoff=1.0) def email_create_random(self, count: int = 8, pwd: str = 'Zpaily88', mail_type: int | None = None) -> str: """ 创建随机邮箱(随机域名,排除 gmail.com) :param count: 邮箱长度(默认8位) :param pwd: 邮箱密码(默认Zpaily88) :param mail_type: 指定邮箱类型编号;为 None 时随机选择可创建域名 :return: 邮箱账号 """ headers = { "Accept-Language": "zh-CN,zh;q=0.9", "Authorization": "Basic YWRtaW5AcWlhbnlvdWR1by5jb206WnBhaWx5ODgh", "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "application/json", "Origin": "https://mail.qianyouduo.com", "Pragma": "no-cache", "Referer": "https://mail.qianyouduo.com/admin/api/doc", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", "accept": "*/*", "sec-ch-ua": "\"Google Chrome\";v=\"131\", \"Chromium\";v=\"131\", \"Not_A Brand\";v=\"24\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"macOS\"" } url = "https://mail.qianyouduo.com/admin/api/v1/boxes" name = ''.join(random.choices(string.ascii_letters + string.digits, k=count)).lower() # 随机选择可创建域名(排除 gmail.com);如指定类型则按类型选择 mail_end = ( self.domain_manager.get_creatable_domain_by_type(mail_type) if mail_type is not None else self.domain_manager.get_random_creatable_domain() ) data = { "name": name, "email": f"{name}@{mail_end}", "passwordPlaintext": pwd } response = requests.post(url, headers=headers, json=data) if 'Validation errors: [user] This combination of username and domain is already in database' in response.text: return f'{name}@{mail_end}' if response.status_code != 201: raise Exception(response.status_code) return f"{name}@{mail_end}" # 异步创建随机邮箱 @async_retry(max_retries=3, delay=1.0, backoff=1.0) async def _email_create_random(self, count: int = 8, pwd: str = 'Zpaily88', mail_type: int | None = None) -> str: """ 创建随机邮箱(随机域名,排除 gmail.com) :param count: 邮箱长度(默认8位) :param pwd: 邮箱密码(默认Zpaily88) :param mail_type: 指定邮箱类型编号;为 None 时随机选择可创建域名 :return:邮箱账号 """ headers = { "Accept-Language": "zh-CN,zh;q=0.9", "Authorization": "Basic YWRtaW5AcWlhbnlvdWR1by5jb206WnBhaWx5ODgh", "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "application/json", "Origin": "https://mail.qianyouduo.com", "Pragma": "no-cache", "Referer": "https://mail.qianyouduo.com/admin/api/doc", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", "accept": "*/*", "sec-ch-ua": "\"Google Chrome\";v=\"131\", \"Chromium\";v=\"131\", \"Not_A Brand\";v=\"24\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"macOS\"" } url = "https://mail.qianyouduo.com/admin/api/v1/boxes" name = ''.join(random.choices(string.ascii_letters + string.digits, k=count)).lower() # 随机选择可创建域名(排除 gmail.com);如指定类型则按类型选择 mail_end = ( self.domain_manager.get_creatable_domain_by_type(mail_type) if mail_type is not None else self.domain_manager.get_random_creatable_domain() ) data = { "name": name, "email": f"{name}@{mail_end}", "passwordPlaintext": pwd } async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=data) as response: status = response.status text = await response.text() if 'Validation errors: [user] This combination of username and domain is already in database' in text: return f"{name}@{mail_end}" if status != 201: raise Exception(status) return f"{name}@{mail_end}" # 创建邮箱 @retry(max_retries=3, delay=1.0, backoff=1.0) def email_create(self, account: str, pwd: str = 'Zpaily88') -> str | None: """ 创建邮箱 :param account: 邮箱账号 :param pwd: 邮箱密码(默认Zpaily88) :return:邮箱账号 """ headers = { "Accept-Language": "zh-CN,zh;q=0.9", "Authorization": "Basic YWRtaW5AcWlhbnlvdWR1by5jb206WnBhaWx5ODgh", "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "application/json", "Origin": "https://mail.qianyouduo.com", "Pragma": "no-cache", "Referer": "https://mail.qianyouduo.com/admin/api/doc", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", "accept": "*/*", "sec-ch-ua": "\"Google Chrome\";v=\"131\", \"Chromium\";v=\"131\", \"Not_A Brand\";v=\"24\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"macOS\"" } url = "https://mail.qianyouduo.com/admin/api/v1/boxes" name = account.split('@')[0] mail_end = account.split('@')[1] # 排除gmail.com域名 if mail_end == "gmail.com": return None # 验证域名是否支持 if not self.domain_manager.is_valid_domain(mail_end): raise ValueError(f"不支持的域名: {mail_end},支持的域名列表: {self.domain_manager.get_all_domains()}") data = { "name": name, "email": f"{name}@{mail_end}", "passwordPlaintext": pwd } response = requests.post(url, headers=headers, json=data) print(f'创建邮箱响应: {response.status_code}') if response.status_code not in [201, 400]: raise Exception(response.status_code) return f"{name}@{mail_end}" # 异步创建邮箱 @async_retry(max_retries=3, delay=1.0, backoff=1.0) async def _email_create(self, account: str, pwd: str = 'Zpaily88') -> str | None: """ 创建邮箱 :param account: 邮箱账号 :param pwd: 邮箱密码(默认Zpaily88) :return: 邮箱账号 """ headers = { "Accept-Language": "zh-CN,zh;q=0.9", "Authorization": "Basic YWRtaW5AcWlhbnlvdWR1by5jb206WnBhaWx5ODgh", "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "application/json", "Origin": "https://mail.qianyouduo.com", "Pragma": "no-cache", "Referer": "https://mail.qianyouduo.com/admin/api/doc", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", "accept": "*/*", "sec-ch-ua": "\"Google Chrome\";v=\"131\", \"Chromium\";v=\"131\", \"Not_A Brand\";v=\"24\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"macOS\"" } url = "https://mail.qianyouduo.com/admin/api/v1/boxes" name = account.split('@')[0] mail_end = account.split('@')[1] # 排除gmail.com域名 if mail_end == "gmail.com": return None # 验证域名是否支持 if not self.domain_manager.is_valid_domain(mail_end): raise ValueError(f"不支持的域名: {mail_end},支持的域名列表: {self.domain_manager.get_all_domains()}") data = { "name": name, "email": f"{name}@{mail_end}", "passwordPlaintext": pwd } async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=data) as response: status = response.status if status not in [201, 400]: raise Exception(f'status code: {status}') return f"{name}@{mail_end}" # 删除邮箱 @retry(max_retries=3, delay=1.0, backoff=1.0) def email_delete(self, account: str) -> bool: """ 删除邮箱 :param account: 邮箱账号 :return: True表示删除成功,False表示删除失败 """ headers = { "Accept-Language": "zh-CN,zh;q=0.9", "Authorization": "Basic YWRtaW5AcWlhbnlvdWR1by5jb206WnBhaWx5ODgh", "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "application/json", "Origin": "https://mail.qianyouduo.com", "Pragma": "no-cache", "Referer": "https://mail.qianyouduo.com/admin/api/doc", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", "accept": "*/*", "sec-ch-ua": "\"Google Chrome\";v=\"131\", \"Chromium\";v=\"131\", \"Not_A Brand\";v=\"24\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"macOS\"" } url = f"https://mail.qianyouduo.com/admin/api/v1/boxes/{account}" if '@gmail.com' in account: return False response = requests.delete(url, headers=headers) print(f'删除邮箱响应: --> {response.status_code}') if response.status_code not in [204, 404]: raise Exception(response.status_code) return True # 异步删除邮箱 @async_retry(max_retries=3, delay=1.0, backoff=1.0) async def _email_delete(self, account: str) -> bool: """ 删除邮箱 :param account: 邮箱账号 :return: True表示删除成功,False表示删除失败 """ headers = { "Accept-Language": "zh-CN,zh;q=0.9", "Authorization": "Basic YWRtaW5AcWlhbnlvdWR1by5jb206WnBhaWx5ODgh", "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "application/json", "Origin": "https://mail.qianyouduo.com", "Pragma": "no-cache", "Referer": "https://mail.qianyouduo.com/admin/api/doc", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", "accept": "*/*", "sec-ch-ua": "\"Google Chrome\";v=\"131\", \"Chromium\";v=\"131\", \"Not_A Brand\";v=\"24\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"macOS\"" } url = f"https://mail.qianyouduo.com/admin/api/v1/boxes/{account}" if '@gmail.com' in account: return False async with aiohttp.ClientSession() as session: async with session.delete(url, headers=headers) as response: status = response.status if status not in [204, 404]: raise Exception(f'status code: {status}') return True # 处理邮件正文 @staticmethod def extract_body(msg): """ 提取邮件正文,优先返回 HTML 文本 - 更健壮的字符集解析:优先使用 part 的 charset 信息,失败回退到 utf-8 / latin-1 - 仅处理 inline 的 text/html 与 text/plain 内容 """ html_text = None plain_text = None def _decode_part(part): payload = part.get_payload(decode=True) if payload is None: return None # 优先从内容中解析 charset charset = (part.get_content_charset() or part.get_param('charset') or 'utf-8') try: return payload.decode(charset, errors='replace') except LookupError: # 未知编码时回退 try: return payload.decode('utf-8', errors='replace') except Exception: return payload.decode('latin-1', errors='replace') if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() content_disposition = part.get_content_disposition() if content_type == "text/html" and (not content_disposition or content_disposition == "inline"): html_text = _decode_part(part) or html_text elif content_type == "text/plain" and (not content_disposition or content_disposition == "inline"): plain_text = _decode_part(part) or plain_text else: content_type = msg.get_content_type() if content_type == "text/html": html_text = _decode_part(msg) elif content_type == "text/plain": plain_text = _decode_part(msg) # 优先返回 HTML 文本,如果没有 HTML 文本,则返回纯文本 return html_text or plain_text or "" # 转换邮件日期 @staticmethod def convert_to_china_time(date_str): """ 将邮件日期转换为10位时间戳(中国时区) - 保留原始邮件的时区信息;若无时区,则按 UTC 处理 - 异常时返回当前时间戳,避免解析失败导致崩溃 """ try: email_date = email.utils.parsedate_to_datetime(date_str) if email_date is None: return int(time.time()) if email_date.tzinfo is None: email_date = email_date.replace(tzinfo=timezone.utc) china_time = email_date.astimezone(timezone(timedelta(hours=8))) return int(china_time.timestamp()) except Exception: return int(time.time()) # 获取邮件 def email_read(self, user: str, from_: str, limit: int = 1, is_del: bool = False) -> list | None: """ 获取最新邮件 :param user: 母账号 :param from_: 发件人匹配关键字(可为邮箱或显示名,大小写不敏感) :param limit: 获取邮件数量(默认1封) :param is_del: 是否删除整个邮箱账号(非 Gmail 才会执行账号删除) :return: 返回邮件列表,每个元素格式为: { "title": "邮件标题", "from": "发件人", "date": "邮件日期(中国时区时间戳)", "content": "邮件正文", "code": 200 } """ user_li = user.split('@') domain = user_li[1] # 使用域名管理器获取邮箱类型 if not self.domain_manager.is_valid_domain(domain): return None mail_type = self.domain_manager.get_domain_type(domain) # 仅对 Gmail 进行点号归一化,其它域名按原样处理 local_part = user_li[0] if domain == "gmail.com": local_part = local_part.replace('.', '') user = local_part + '@' + user_li[1] proxy_host = None proxy_port = None proxy_user = None proxy_pwd = None if mail_type == 0: res = self.email_account_read(parent_account=user, status=True, level=0) if res['code'] != 200: return None pwd = res['items'][0]['parent_pwd'] proxy_host = res['items'][0]['host'] proxy_port = res['items'][0]['port'] proxy_user = res['items'][0]['proxy_account'] proxy_pwd = res['items'][0]['proxy_pwd'] else: pwd = 'Zpaily88' items = [] # 存储邮件列表 # 保存原始socket original_socket = None if proxy_host is not None and proxy_port is not None: original_socket = socket.socket if proxy_user is not None and proxy_pwd is not None: socks.setdefaultproxy(socks.SOCKS5, proxy_host, int(proxy_port), True, proxy_user, proxy_pwd) else: socks.setdefaultproxy(socks.SOCKS5, proxy_host, int(proxy_port), True) socket.socket = socks.socksocket imap_server = None had_error = False try: # 在设置代理后创建IMAP连接 imap_server = imaplib.IMAP4_SSL(self.domain_manager.get_imap_server(mail_type)) if not imap_server: had_error = True else: # pwd去除空格 pwd = pwd.replace(' ', '') # print(f'pwd: {pwd}') imap_server.login(user, pwd) status, _ = imap_server.select("INBOX") if status != 'OK': had_error = True else: status, email_ids = imap_server.search(None, "ALL") if status != 'OK': had_error = True else: email_id_list = email_ids[0].split() # 获取最近limit条邮件ID recent_ids = email_id_list[-20:] # 仍然获取最近20封以确保有足够的邮件可以筛选 found_count = 0 # 记录找到的符合条件的邮件数量 for email_id in recent_ids[::-1]: # 从最新的邮件开始处理 if found_count >= limit: # 如果已经找到足够数量的邮件,就退出循环 break status, msg_data = imap_server.fetch(email_id, "(RFC822)") for response in msg_data: if isinstance(response, tuple): msg = email.message_from_bytes(response[1]) # 兼容性发件人匹配:解析地址与显示名,大小写不敏感,支持子串匹配 from_field = msg.get("From", "") addresses = email.utils.getaddresses([from_field]) needle = (from_ or "").lower() candidates = [] for name, addr in addresses: if name: candidates.append(name.lower()) if addr: candidates.append(addr.lower()) if any(needle in c for c in candidates): # 标题解码,处理无标题或编码缺失的情况 raw_subject = msg.get("Subject") subject = "" if raw_subject is not None: dh = decode_header(raw_subject) if dh: s, enc = dh[0] if isinstance(s, bytes): try: subject = s.decode(enc or 'utf-8', errors='replace') except LookupError: subject = s.decode('utf-8', errors='replace') else: subject = s item = { "title": subject, "from": msg["From"], "content": self.extract_body(msg), "code": 200 } # 获取并转换邮件时间 date_str = msg["Date"] if date_str: item["date"] = self.convert_to_china_time(date_str) items.append(item) found_count += 1 if found_count >= limit: # 如果已经找到足够数量的邮件,就跳出内层循环 break # 读取完成不再对单封邮件做删除标记与 expunge except imaplib.IMAP4.error as e: # items.append({'title': 'error', 'from': 'error', 'content': f'连接邮箱失败: {e}', 'code': 500}) had_error = True except Exception as e: # items.append({'title': 'error', 'from': 'error', 'content': f'获取邮件异常: {e}', 'code': 500}) had_error = True finally: try: # 检查连接是否建立 if 'imap_server' in locals() and imap_server is not None: try: # 先检查是否处于已选择状态 if hasattr(imap_server, 'state') and imap_server.state == 'SELECTED': imap_server.close() except Exception as e: logger.error(f"关闭IMAP文件夹时发生错误: {e}") try: # 无论如何尝试登出 imap_server.logout() except Exception as e: logger.error(f"登出IMAP服务器时发生错误: {e}") # 在Windows上可能需要强制关闭socket try: if hasattr(imap_server, 'sock') and imap_server.sock is not None: imap_server.sock.close() except Exception as sock_err: logger.error(f"强制关闭socket时发生错误: {sock_err}") except Exception as outer_e: logger.error(f"处理IMAP连接关闭时发生错误: {outer_e}") finally: # 重置socket设置(如果使用了代理) if proxy_host is not None and original_socket is not None: socket.socket = original_socket # 若成功获取到至少一封匹配邮件且请求删除,则删除整个邮箱账号 if is_del and len(items) > 0: try: self.email_delete(user) except Exception as del_err: logger.error(f"删除邮箱账号失败: {del_err}") if had_error: return None if len(items) == 0: return None return items # 返回邮件列表 async def main(): """ 使用示例:展示新的域名管理系统的使用方法 """ mail = Mail() # mai = '0gz3vvd4@'+'qydgs.asia' # res = mail.email_create(mai) # print(f"创建的邮箱: {res}") random_email = mail.email_create_random() print(f"创建的随机邮箱: {random_email}") # 读取邮件 # res = mail.email_read('0gz3vvd4@qydgs.asia', '@', 1, is_del=True) # print(f'读取的邮件: {res}') # 删除邮箱 res = mail.email_delete(random_email) print(f"删除的邮箱: {res}") mail_ = Mail() # if __name__ == '__main__': # asyncio.run(main())