This commit is contained in:
2025-11-20 21:42:20 +08:00
parent e2d2b0b75b
commit 150d45e3eb
10 changed files with 491 additions and 126 deletions

94
spider/api.py Normal file
View File

@@ -0,0 +1,94 @@
from tkinter import N
import requests
from loguru import logger
import csv
import os
import random
class Api:
def __init__(self) -> None:
self.base_url = 'http://127.0.0.1:6060'
# 创建店铺
def create_shop(self, city: str, street: str, shop_name: str) -> dict:
url = f'{self.base_url}/country/shop'
item = {
'city': city,
'street': street,
'shop_name': shop_name,
}
response = requests.post(url, json=item).json()
logger.info(response)
return response
# 查询店铺
def get_shop(self, city: str) -> dict:
url = f'{self.base_url}/country/shop'
response = requests.get(url).json()
# logger.info(response)
return response
# 创建信息
def create_info(self, first_name: str, last_name: str, birthday: str, current_address: str, city: str, phone: str, postal_code: str, province: str, email: str, text: str,status: bool=False, email_content: str|None=None) -> dict:
url = f'{self.base_url}/country/info'
item = {
"first_name": first_name,
"last_name": last_name,
"birthday": birthday,
"current_address": current_address,
"city": city,
"phone": phone,
"postal_code": postal_code,
"province": province,
"status": status,
"email": email,
"email_content": email_content,
"text": text
}
response = requests.post(url, json=item).json()
logger.info(response)
return response
# 根据城市 随机获取一个店铺
def get_random_shop(self) -> dict:
url = f'{self.base_url}/country/shop/random'
response = requests.get(url).json()
# logger.info(response)
if not response.get('street'):
logger.error(f'没有店铺')
return None
return response
# def main():
# """
# 从同目录的 `bakeries.csv` 读取面包店数据,按列映射输出或创建店铺
# 列顺序:`Name,Address,City`
# """
# api = Api()
# csv_path = os.path.join(os.path.dirname(__file__), 'bakeries.csv')
# if not os.path.exists(csv_path):
# logger.error(f'CSV 文件不存在: {csv_path}')
# return
# with open(csv_path, 'r', encoding='utf-8') as file:
# reader = csv.reader(file)
# header = next(reader, None)
# for row in reader:
# if len(row) < 3:
# logger.warning(f'行列数不足,跳过: {row}')
# continue
# shop_name, street, city = row[0], row[1], row[2]
# logger.info(f'city: {city}, street: {street}, shop_name: {shop_name}')
# api.create_shop(city, street, shop_name)
# def main2():
# api = Api()
# city = 'Toronto'
# shop = api.get_random_shop()
# if shop:
# logger.info(shop)
# if __name__ == '__main__':
# main2()
api = Api()

View File

@@ -225,6 +225,18 @@ class DomainManager:
return self._domains[1] # 返回qianyouduo.com作为默认
return domain
def get_random_creatable_domain(self) -> str:
"""
随机获取一个可创建邮箱的域名(排除 gmail.com
返回值:
str: 随机选取的域名
"""
creatable = self.get_creatable_domains()
if not creatable:
raise ValueError("无可用域名用于创建邮箱")
return random.choice(creatable)
# 邮箱模块
class Mail:
@@ -298,12 +310,12 @@ class Mail:
# 创建随机邮箱
@retry(max_retries=3, delay=1.0, backoff=1.0)
def email_create_random(self, count: int = 8, pwd: str = 'Zpaily88', mail_type: int = 1) -> str:
def email_create_random(self, count: int = 8, pwd: str = 'Zpaily88', mail_type: int | None = None) -> str:
"""
创建邮箱
创建随机邮箱(随机域名,排除 gmail.com
:param count: 邮箱长度(默认8位)
:param pwd: 邮箱密码(默认Zpaily88)
:param mail_type: 邮箱类型(1表示qianyouduo.com 2表示rxybb.com 3表示cqrxy.vip 4表示0n.lv 默认1)
:param mail_type: 指定邮箱类型编号;为 None 时随机选择可创建域名
:return: 邮箱账号
"""
headers = {
@@ -327,8 +339,12 @@ class Mail:
url = "https://mail.qianyouduo.com/admin/api/v1/boxes"
name = ''.join(random.choices(string.ascii_letters + string.digits, k=count)).lower()
# 使用域名管理器获取可创建域名排除gmail.com
mail_end = self.domain_manager.get_creatable_domain_by_type(mail_type)
# 随机选择可创建域名(排除 gmail.com;如指定类型则按类型选择
mail_end = (
self.domain_manager.get_creatable_domain_by_type(mail_type)
if mail_type is not None
else self.domain_manager.get_random_creatable_domain()
)
data = {
"name": name,
"email": f"{name}@{mail_end}",
@@ -343,12 +359,12 @@ class Mail:
# 异步创建随机邮箱
@async_retry(max_retries=3, delay=1.0, backoff=1.0)
async def _email_create_random(self, count: int = 8, pwd: str = 'Zpaily88', mail_type: int = 1) -> str:
async def _email_create_random(self, count: int = 8, pwd: str = 'Zpaily88', mail_type: int | None = None) -> str:
"""
创建邮箱
创建随机邮箱(随机域名,排除 gmail.com
:param count: 邮箱长度(默认8位)
:param pwd: 邮箱密码(默认Zpaily88)
:param mail_type: 邮箱类型(1表示qianyouduo.com 2表示rxybb.com 3表示cqrxy.vip 4表示0n.lv 默认1)
:param mail_type: 指定邮箱类型编号;为 None 时随机选择可创建域名
:return:邮箱账号
"""
headers = {
@@ -372,8 +388,12 @@ class Mail:
url = "https://mail.qianyouduo.com/admin/api/v1/boxes"
name = ''.join(random.choices(string.ascii_letters + string.digits, k=count)).lower()
# 使用域名管理器获取可创建域名排除gmail.com
mail_end = self.domain_manager.get_creatable_domain_by_type(mail_type)
# 随机选择可创建域名(排除 gmail.com;如指定类型则按类型选择
mail_end = (
self.domain_manager.get_creatable_domain_by_type(mail_type)
if mail_type is not None
else self.domain_manager.get_random_creatable_domain()
)
data = {
"name": name,
"email": f"{name}@{mail_end}",
@@ -815,21 +835,21 @@ async def main():
使用示例:展示新的域名管理系统的使用方法
"""
mail = Mail()
mai = '0gz3vvd4@'+'qydgs.asia'
res = mail.email_create(mai)
print(f"创建的邮箱: {res}")
# random_email = mail.email_create_random(count=8, mail_type=1)
# print(f"创建的随机邮箱: {random_email}")
# mai = '0gz3vvd4@'+'qydgs.asia'
# res = mail.email_create(mai)
# print(f"创建的邮箱: {res}")
random_email = mail.email_create_random()
print(f"创建的随机邮箱: {random_email}")
# 读取邮件
# res = mail.email_read('0gz3vvd4@qydgs.asia', '@', 1, is_del=True)
# print(f'读取的邮件: {res}')
# 删除邮箱
res = mail.email_delete(mai)
res = mail.email_delete(random_email)
print(f"删除的邮箱: {res}")
mail_ = Mail()
# if __name__ == '__main__':
# asyncio.run(main())
# asyncio.run(main())

View File

@@ -8,19 +8,19 @@ from loguru import logger
from work import get_random_canada_info
from mail_ import mail_
from bit_browser import bit_browser
from api import api
class Auto:
def __init__(self,http:str):
def __init__(self,http:str=None):
self.browser = Chromium(http)
self.tab = self.browser.latest_tab
pass
# cf打码
def solve_cloudflare(self):
tab = self.browser.latest_tab
for _ in range(8):
self.tab.wait(1)
for _ in range(5):
self.tab.wait(0.5)
try:
shadow1 = tab.ele(
'x://*[@name="cf-turnstile-response"]').parent().shadow_root
@@ -31,16 +31,16 @@ class Auto:
if shadow2:
logger.debug("找到Cloudflare iframe body shadow root")
status = shadow2.ele(
'x://span[text()="Success!"]', timeout=1)
'x://span[text()="Success!"]', timeout=0.5)
if status:
logger.debug("Cloudflare验证成功")
return True
checkbox = shadow2.ele(
'x://input[@type="checkbox"]', timeout=1)
'x://input[@type="checkbox"]', timeout=0.5)
if checkbox:
checkbox.click()
logger.debug("点击Cloudflare复选框")
tab.wait(2)
tab.wait(3)
logger.debug("重新获取状态")
# return False
except Exception as e:
@@ -59,7 +59,7 @@ class Auto:
logger.debug("等待进入首页")
jc = 0
while True:
if jc > 5:
if jc > 3:
logger.error("等待进入首页超过5次未成功")
return False
self.tab.wait(1)
@@ -83,7 +83,7 @@ class Auto:
logger.debug("点击Continue按钮")
jc = 0
while True:
if jc > 5:
if jc > 3:
logger.error("点击Continue按钮超过5次未成功")
return False
try:
@@ -100,7 +100,19 @@ class Auto:
continue_button.click()
logger.debug("点击Continue按钮成功")
self.tab.wait(1.5)
bol = self.tab.ele('t:div@text():Loading...',timeout=1)
if bol:
logger.debug("Loading...")
if bl:
logger.debug("多次异常界面, 结束继续点击")
return False
logger.debug("异常界面")
self.tab.wait(1)
return self.click_continue(bl=True)
bol = self.tab.ele('t:h2@text()=You are being rate limited', timeout=1)
if bol:
logger.debug("被限流, 退出")
return False
bol = self.tab.ele(
't:li@text():There was a problem, please try again.', timeout=1)
if bol:
@@ -110,10 +122,6 @@ class Auto:
logger.debug("异常界面")
self.tab.wait(1)
return self.click_continue(bl=True)
# bol = self.tab.ele('t:h2@text()=Claim Form', timeout=1)
# if bol:
# logger.debug("成功进入问卷界面")
# return True
html = self.tab.url
logger.debug(f"当前URL: {html}")
if 'https://veritaconnect.ca/canadianbreadsettlement/en-us/Claimant/UnknownClaimForm' in html:
@@ -142,81 +150,215 @@ class Auto:
province = random.choice(list(cities.keys()))
return province,random.choice(cities.get(province, []))
def get_province_by_city(self, city: str) -> str | None:
"""
根据城市名称解析对应省份
参数:
city (str): 城市名称,例如 `Calgary`、`Edmonton` 等
返回值:
str | None: 对应的省份名称;未匹配返回 None
"""
mapping = {
"Calgary": "Alberta",
"Edmonton": "Alberta",
"Vancouver": "British Columbia",
"Halifax": "Nova Scotia",
"Toronto": "Ontario",
}
return mapping.get(city)
# 随机实物
def get_random_food(self, shop: str) -> list[str]:
"""
随机选择 1~2 种食物类别,并为每个类别至少选择 1 个具体产品
参数:
shop (str): 商店名称(当前未使用,占位参数)
返回值:
list[str]: 随机选取的产品名称列表
"""
categories = [
[
'Wonder Bread White',
'Villaggio White Bread',
'No Name Sliced White Bread',
"President's Choice White Sliced Bread",
],
[
"Ben's Original Whole Wheat Bread",
"POM Whole Wheat Bread",
"Silver Hills Bakery Whole Wheat Sliced Bread",
"Country Harvest Whole Wheat Bread",
],
[
"Wonder Bread Hot Dog Buns",
"Villaggio Hamburger Buns",
"Dempster's Dinner Rolls",
"No Frills Hot Dog Buns",
],
[
"Stonemill Bakehouse Bagels",
"Wonder Bagels",
"Montreal Bagels (pre-packaged, e.g., St. Lawrence brand)",
"President's Choice Bagels",
],
[
"Silver Hills Multi-Grain Sliced Bread",
"POM Multi-Grain Bread",
"Country Harvest Multi-Grain Loaf",
],
[
"President's Choice French Stick",
"Dempster's Italian Style Bread",
"Wonder Italian Bread",
"Villaggio Country Style Loaf",
],
]
# 随机选择 1~2 个类别(不重复)
category_count = random.randint(1, 2)
chosen_categories = random.sample(categories, k=category_count)
# 每个类别至少选择 1 个产品,最多选择 3 个以避免过多
selected_products: list[str] = []
for cat in chosen_categories:
max_pick = min(3, len(cat))
pick_count = random.randint(1, max_pick)
selected_products.extend(random.sample(cat, k=pick_count))
logger.debug(f"随机选择的产品: {selected_products}")
text = f'{shop} buy: '
for p in selected_products:
text += f'{p} * {random.randint(1, 3)}, '
text = text[:-2]
text = text + '.'
logger.debug(f'随机选择的产品文本: {text}')
return text
# 填写问卷
def fill_questionnaire(self):
province, city = self.get_random_city()
info = get_random_canada_info(province, city)
first_name = info["firstname"]
last_name = info["lastname"]
# 将生日格式从 '8/28/1995' 转为 'yyyy-mm-dd'日月不足两位补0
birthday = info["birthday"]
current_address = info["address_str"]
city = info["city_name"]
province = info["province"]
postal_code = info["postcode"]
email = 'sfsf@qq.com'
phone = info["phone"]
text = '3333'
# 人数
person_count = str(random.randint(3, 5))
logger.debug("填写问卷")
self.tab.wait(0.1)
logger.debug(f"填写first_name: {first_name}")
self.tab.ele('t:input@id=FirstName').set.value(first_name)
self.tab.wait(0.1)
logger.debug(f"填写last_name: {last_name}")
self.tab.ele('t:input@id=LastName').set.value(last_name)
self.tab.wait(0.1)
logger.debug(f"填写birthday: {birthday}")
self.tab.ele('t:input@id=DateOfBirth').set.value(birthday)
self.tab.wait(0.1)
logger.debug(f"填写current_address: {current_address}")
self.tab.ele('t:input@id=AddressLine1').set.value(current_address)
self.tab.wait(0.1)
logger.debug(f"填写city: {city}")
self.tab.ele('t:input@id=City').set.value(city)
self.tab.wait(0.1)
logger.debug(f"填写province: {province}")
self.tab.ele(
't:select@id=CanProv').ele(f't:option@text()={province}').click()
self.tab.wait(0.1)
logger.debug(f"填写postal_code: {postal_code}")
self.tab.ele('t:input@id=CanPostal').set.value(postal_code)
self.tab.wait(0.1)
logger.debug(f"填写NumberOfAdults: {person_count}")
self.tab.ele(
't:select@id=NumberOfAdults').ele(f't:option@text()={person_count}').click()
self.tab.wait(0.1)
logger.debug(f"选择地址没变")
self.tab.eles('t:input@id=IsDifferentAddress')[1].click()
self.tab.wait(0.1)
logger.debug(f"填写email: {email}")
self.tab.ele('t:input@id=EmailAddress').set.value(email)
self.tab.wait(0.1)
logger.debug(f"填写ConfirmEmailAddress: {email}")
self.tab.ele('t:input@id=ConfirmEmailAddress').set.value(email)
self.tab.wait(0.1)
logger.debug(f"填写phone: {phone}")
self.tab.ele('t:input@id=PhoneNumber').set.value(phone)
self.tab.wait(0.1)
logger.debug(f"选择同意条款")
self.tab.ele('t:input@id=IVerify').click()
self.tab.wait(0.1)
logger.debug(f"选择没有申请过")
self.tab.eles('t:input@id=IsCompensated')[1].click()
self.tab.wait(0.1)
logger.debug(f"填写text: {text}")
self.tab.ele('t:textarea@id=MetaAnswerA').set.value(text)
self.tab.wait(0.1)
logger.debug(f"勾选同意我的名字")
self.tab.ele('t:input@id=IDeclare').click()
self.tab.wait(0.1)
logger.debug(f"填写PrintName: {last_name+' '+first_name}")
self.tab.ele(
't:input@id=PrintName').set.value(last_name+' '+first_name)
self.tab.wait(0.1)
# logger.debug(f"点击Submit按钮")
# self.tab.ele('t:button@text():Submit').click()
def fill_questionnaire(self, city: str):
"""
根据传入的城市解析省份并完成问卷填写
参数:
city (str): 线程启动时传入的城市名称,用于匹配省份并填写数据
"""
try:
province = self.get_province_by_city(city)
if province is None:
logger.error(f"未找到城市对应省份: {city}")
return
j = 0
while True:
if j >3:
return False
info = get_random_canada_info(province, city)
if len(info.get('postcode')) > 5:
break
j += 1
first_name = info["firstname"]
last_name = info["lastname"]
# 将生日格式从 '8/28/1995' 转为 'yyyy-mm-dd'日月不足两位补0
birthday = info["birthday"]
current_address = info["address_str"]
# 保持使用线程传入的城市与解析出的省份
postal_code = info["postcode"]
email = mail_.email_create_random()
phone = info["phone"]
shop = api.get_random_shop()
if shop is None:
return None
street = shop.get('street')
if street is None:
return None
text = self.get_random_food(street)
# 人数
person_count = str(random.randint(3, 5))
logger.debug("填写问卷")
self.tab.wait(0.1)
logger.debug(f"填写first_name: {first_name}")
self.tab.ele('t:input@id=FirstName').set.value(first_name)
self.tab.wait(0.1)
logger.debug(f"填写last_name: {last_name}")
self.tab.ele('t:input@id=LastName').set.value(last_name)
self.tab.wait(0.1)
logger.debug(f"填写birthday: {birthday}")
self.tab.ele('t:input@id=DateOfBirth').set.value(birthday)
self.tab.wait(0.1)
logger.debug(f"填写current_address: {current_address}")
self.tab.ele('t:input@id=AddressLine1').set.value(current_address)
self.tab.wait(0.1)
logger.debug(f"填写city: {city}")
self.tab.ele('t:input@id=City').set.value(city)
self.tab.wait(0.1)
logger.debug(f"填写province: {province}")
self.tab.ele(
't:select@id=CanProv').ele(f't:option@text()={province}').click()
self.tab.wait(0.1)
logger.debug(f"填写postal_code: {postal_code}")
self.tab.ele('t:input@id=CanPostal').set.value(postal_code)
self.tab.wait(0.1)
logger.debug(f"填写NumberOfAdults: {person_count}")
self.tab.ele(
't:select@id=NumberOfAdults').ele(f't:option@text()={person_count}').click()
self.tab.wait(0.1)
logger.debug(f"选择地址没变")
self.tab.eles('t:input@id=IsDifferentAddress')[1].click()
self.tab.wait(0.1)
logger.debug(f"填写email: {email}")
self.tab.ele('t:input@id=EmailAddress').set.value(email)
self.tab.wait(0.1)
logger.debug(f"填写ConfirmEmailAddress: {email}")
self.tab.ele('t:input@id=ConfirmEmailAddress').set.value(email)
self.tab.wait(0.1)
logger.debug(f"填写phone: {phone}")
self.tab.ele('t:input@id=PhoneNumber').set.value(phone)
self.tab.wait(0.1)
logger.debug(f"选择同意条款")
self.tab.ele('t:input@id=IVerify').click()
self.tab.wait(0.1)
logger.debug(f"选择没有申请过")
self.tab.eles('t:input@id=IsCompensated')[1].click()
self.tab.wait(0.1)
logger.debug(f"填写text: {text}")
self.tab.ele('t:textarea@id=MetaAnswerA').set.value(text)
self.tab.wait(0.1)
logger.debug(f"勾选同意我的名字")
self.tab.ele('t:input@id=IDeclare').click()
self.tab.wait(0.1)
logger.debug(f"填写PrintName: {last_name+' '+first_name}")
self.tab.ele(
't:input@id=PrintName').set.value(last_name+' '+first_name)
self.tab.wait(0.1)
for i in range(3):
bol = self.solve_cloudflare()
if not bol:
logger.debug("Cloudflare验证失败.")
self.tab.wait(0.1)
else:
logger.debug("Cloudflare验证成功.")
logger.debug(f"点击Submit按钮")
self.tab.ele('t:button@text():Submit').click()
break
api.create_info(
first_name=first_name,
last_name=last_name,
birthday=birthday,
current_address=current_address,
city=city,
phone=phone,
postal_code=postal_code,
province=province,
email=email,
text=text
)
self.tab.wait(2)
except Exception as e:
logger.error(f"填写问卷失败: {e}")
# 取对应城市的代理
@@ -247,6 +389,9 @@ def create_fingerprint_browser(city: str):
browser_id = None
try:
proxy = get_proxy(city)
if proxy is None:
logger.error(f"{city} 未配置对应代理,结束该线程")
return
logger.info(f"{city} 准备创建指纹浏览器")
browser_id = bit_browser.bit_browser_create(
remark=city,
@@ -272,7 +417,7 @@ def create_fingerprint_browser(city: str):
if not bol:
logger.error(f"{city} 点击 Continue 失败,结束该线程")
return
auto.fill_questionnaire()
auto.fill_questionnaire(city)
time.sleep(5)
finally:
if browser_id:
@@ -307,17 +452,19 @@ def run_all_cities_concurrently():
"""
import threading
cities = ['Calgary', 'Edmonton', 'Vancouver', 'Halifax', 'Toronto']
# cities = ['Calgary']
threads = []
for city in cities:
t = threading.Thread(target=run_city_forever, args=(city,), name=f"{city}-thread")
t.start()
threads.append(t)
logger.info(f"{city} 线程已启动")
time.sleep(2)
# time.sleep(2)
for t in threads:
t.join()
logger.info("所有城市流程执行完成")
if __name__ == "__main__":
# auto = Auto()
# auto.get_random_food('a')
run_all_cities_concurrently()

View File

@@ -57,6 +57,34 @@ CA_AREA_CODES = {
}
# 主要城市的区号(更精确的城市级约束)
CITY_AREA_CODES = {
"Calgary": ["403", "587", "825"],
"Edmonton": ["780", "587", "825"],
"Vancouver": ["604", "778", "236", "672"],
"Halifax": ["902", "782"],
"Toronto": ["416", "647", "437"],
}
# 邮编首字母合法性映射(按省份缩写)
POSTAL_PREFIXES = {
"AB": {"T"},
"BC": {"V"},
"MB": {"R"},
"NB": {"E"},
"NL": {"A"},
"NS": {"B"},
"ON": {"K", "L", "M"},
"PE": {"C"},
"QC": {"G", "H", "J"},
"SK": {"S"},
"NT": {"X"},
"NU": {"X"},
"YT": {"Y"},
}
REMOTE_PROVINCES = {"NL", "NT", "NU", "YT"}
@@ -261,6 +289,46 @@ def _random_phone(province_abbr: str) -> str:
return f"({area}) {exchange}-{line}"
def _random_phone_city(province_abbr: str, city: Optional[str]) -> str:
"""
按城市优先选择区号,若城市未配置则回退到省份区号
参数:
province_abbr (str): 省份缩写
city (Optional[str]): 城市名
返回值:
str: 电话,例如 "(403) 555-1234"
"""
codes = None
if city:
codes = CITY_AREA_CODES.get(city)
codes = codes or CA_AREA_CODES.get(province_abbr, ["000"])
area = random.choice(codes)
exchange = str(random.randint(200, 899)).zfill(3)
line = str(random.randint(1000, 9999)).zfill(4)
return f"(#{area}) {exchange}-{line}".replace("#", "")
def _postal_valid_for_province(province_abbr: str, postcode: str) -> bool:
"""
校验邮编首字母是否符合省份规范
参数:
province_abbr (str): 省份缩写
postcode (str): 邮编字符串
返回值:
bool: 合法返回 True否则 False
"""
if not postcode:
return False
prefixes = POSTAL_PREFIXES.get(province_abbr)
if not prefixes:
return True
return postcode[0].upper() in prefixes
def generate_canada_info(province: str, city: Optional[str] = None, max_attempts: int = 15, sleep_sec: float = 0.6) -> Dict[str, str]:
"""
随机生成加拿大个人与地址信息,可指定省份(全称或缩写)与可选城市
@@ -289,14 +357,14 @@ def generate_canada_info(province: str, city: Optional[str] = None, max_attempts
address_str = _format_address(addr, prov_abbr)
if prov_abbr in REMOTE_PROVINCES:
break
if addr.get("house_number") and (addr.get("road") or addr.get("residential") or addr.get("footway")) and city_name:
if addr.get("house_number") and (addr.get("road") or addr.get("residential") or addr.get("footway")) and city_name and _postal_valid_for_province(prov_abbr, postcode):
break
time.sleep(sleep_sec)
firstname, lastname = _random_name()
full_name = f"{firstname} {lastname}"
birthday = _random_birthday()
phone = _random_phone(prov_abbr)
phone = _random_phone_city(prov_abbr, city or chosen_city)
return {
"firstname": firstname,