Files
ca_auto_table/spider/main.py
2025-11-20 11:42:18 +08:00

323 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from math import log
import random
from re import S
import time
from tkinter import N
from DrissionPage import Chromium
from loguru import logger
from work import get_random_canada_info
from mail_ import mail_
from bit_browser import bit_browser
class Auto:
def __init__(self,http:str):
self.browser = Chromium(http)
self.tab = self.browser.latest_tab
pass
# cf打码
def solve_cloudflare(self):
tab = self.browser.latest_tab
for _ in range(8):
self.tab.wait(1)
try:
shadow1 = tab.ele(
'x://*[@name="cf-turnstile-response"]').parent().shadow_root
iframe = shadow1.get_frame(1)
if iframe:
logger.debug("找到Cloudflare iframe")
shadow2 = iframe.ele('x:/html/body').shadow_root
if shadow2:
logger.debug("找到Cloudflare iframe body shadow root")
status = shadow2.ele(
'x://span[text()="Success!"]', timeout=1)
if status:
logger.debug("Cloudflare验证成功")
return True
checkbox = shadow2.ele(
'x://input[@type="checkbox"]', timeout=1)
if checkbox:
checkbox.click()
logger.debug("点击Cloudflare复选框")
tab.wait(2)
logger.debug("重新获取状态")
# return False
except Exception as e:
# logger.error(f"处理Cloudflare异常: {e}")
logger.debug(f"cloudflare处理通过: {e}")
return True
tab.wait(1)
return False
# 打开URL
def open_url(self, url: str):
self.tab.get(url)
# 等待进入首页
def wait_home(self):
logger.debug("等待进入首页")
jc = 0
while True:
if jc > 5:
logger.error("等待进入首页超过5次未成功")
return False
self.tab.wait(1)
# 判断cf是否通过
bol = self.solve_cloudflare()
if not bol:
logger.debug("Cloudflare验证失败.")
continue
else:
logger.debug("Cloudflare验证成功.")
self.tab.wait(1.5)
html = self.tab.url
logger.debug(f"当前URL: {html}")
if 'https://veritaconnect.ca/canadianbreadsettlement/en-us' == html:
logger.debug("成功进入首页")
return True
jc += 1
# 点击continue按钮
def click_continue(self, bl: bool = False):
logger.debug("点击Continue按钮")
jc = 0
while True:
if jc > 5:
logger.error("点击Continue按钮超过5次未成功")
return False
try:
continue_button = self.tab.ele(
't:button@text():Continue', timeout=1)
if continue_button:
# 判断cf是否通过
bol = self.solve_cloudflare()
if not bol:
logger.debug("Cloudflare验证失败..")
continue
else:
logger.debug("Cloudflare验证成功..")
continue_button.click()
logger.debug("点击Continue按钮成功")
self.tab.wait(1.5)
bol = self.tab.ele(
't:li@text():There was a problem, please try again.', timeout=1)
if bol:
if bl:
logger.debug("多次异常界面, 结束继续点击")
return False
logger.debug("异常界面")
self.tab.wait(1)
return self.click_continue(bl=True)
# bol = self.tab.ele('t:h2@text()=Claim Form', timeout=1)
# if bol:
# logger.debug("成功进入问卷界面")
# return True
html = self.tab.url
logger.debug(f"当前URL: {html}")
if 'https://veritaconnect.ca/canadianbreadsettlement/en-us/Claimant/UnknownClaimForm' in html:
logger.debug("成功进入问卷界面")
return True
except Exception as e:
logger.error(f"点击Continue按钮异常: {e}")
self.tab.wait(1)
return False
# 随机取城市
def get_random_city(self, province: str|None=None):
cities = {
"Alberta": ["Calgary", "Edmonton"],
"British Columbia": ["Vancouver"],
# "Manitoba": ["Winnipeg", "Rochester"],
# "New Brunswick": ["Fredericton", "Moncton"],
# "Newfoundland and Labrador": ["St. John's", "Halifax"],
"Nova Scotia": ["Halifax"],
"Ontario": ["Toronto"],
# "Prince Edward Island": ["Charlottetown", "St. John's"],
# "Quebec": ["Quebec City", "Montreal"],
# "Saskatchewan": ["Saskatoon", "Regina"],
}
if province is None:
province = random.choice(list(cities.keys()))
return province,random.choice(cities.get(province, []))
# 填写问卷
def fill_questionnaire(self):
province, city = self.get_random_city()
info = get_random_canada_info(province, city)
first_name = info["firstname"]
last_name = info["lastname"]
# 将生日格式从 '8/28/1995' 转为 'yyyy-mm-dd'日月不足两位补0
birthday = info["birthday"]
current_address = info["address_str"]
city = info["city_name"]
province = info["province"]
postal_code = info["postcode"]
email = 'sfsf@qq.com'
phone = info["phone"]
text = '3333'
# 人数
person_count = str(random.randint(3, 5))
logger.debug("填写问卷")
self.tab.wait(0.1)
logger.debug(f"填写first_name: {first_name}")
self.tab.ele('t:input@id=FirstName').set.value(first_name)
self.tab.wait(0.1)
logger.debug(f"填写last_name: {last_name}")
self.tab.ele('t:input@id=LastName').set.value(last_name)
self.tab.wait(0.1)
logger.debug(f"填写birthday: {birthday}")
self.tab.ele('t:input@id=DateOfBirth').set.value(birthday)
self.tab.wait(0.1)
logger.debug(f"填写current_address: {current_address}")
self.tab.ele('t:input@id=AddressLine1').set.value(current_address)
self.tab.wait(0.1)
logger.debug(f"填写city: {city}")
self.tab.ele('t:input@id=City').set.value(city)
self.tab.wait(0.1)
logger.debug(f"填写province: {province}")
self.tab.ele(
't:select@id=CanProv').ele(f't:option@text()={province}').click()
self.tab.wait(0.1)
logger.debug(f"填写postal_code: {postal_code}")
self.tab.ele('t:input@id=CanPostal').set.value(postal_code)
self.tab.wait(0.1)
logger.debug(f"填写NumberOfAdults: {person_count}")
self.tab.ele(
't:select@id=NumberOfAdults').ele(f't:option@text()={person_count}').click()
self.tab.wait(0.1)
logger.debug(f"选择地址没变")
self.tab.eles('t:input@id=IsDifferentAddress')[1].click()
self.tab.wait(0.1)
logger.debug(f"填写email: {email}")
self.tab.ele('t:input@id=EmailAddress').set.value(email)
self.tab.wait(0.1)
logger.debug(f"填写ConfirmEmailAddress: {email}")
self.tab.ele('t:input@id=ConfirmEmailAddress').set.value(email)
self.tab.wait(0.1)
logger.debug(f"填写phone: {phone}")
self.tab.ele('t:input@id=PhoneNumber').set.value(phone)
self.tab.wait(0.1)
logger.debug(f"选择同意条款")
self.tab.ele('t:input@id=IVerify').click()
self.tab.wait(0.1)
logger.debug(f"选择没有申请过")
self.tab.eles('t:input@id=IsCompensated')[1].click()
self.tab.wait(0.1)
logger.debug(f"填写text: {text}")
self.tab.ele('t:textarea@id=MetaAnswerA').set.value(text)
self.tab.wait(0.1)
logger.debug(f"勾选同意我的名字")
self.tab.ele('t:input@id=IDeclare').click()
self.tab.wait(0.1)
logger.debug(f"填写PrintName: {last_name+' '+first_name}")
self.tab.ele(
't:input@id=PrintName').set.value(last_name+' '+first_name)
self.tab.wait(0.1)
# logger.debug(f"点击Submit按钮")
# self.tab.ele('t:button@text():Submit').click()
# 取对应城市的代理
def get_proxy( city: str):
if city == "Calgary":
return "us.novproxy.io:1000:uwqr8065-region-CA-st-Alberta-city-Calgary:d6vqwerx".split(':')
elif city =='Edmonton':
return 'us.novproxy.io:1000:uwqr8065-region-CA-st-Alberta-city-Edmonton:d6vqwerx'.split(':')
elif city =='Vancouver':
return 'us.novproxy.io:1000:uwqr8065-region-CA-st-British Columbia-city-Vancouver:d6vqwerx'.split(':')
elif city =='Halifax':
return 'us.novproxy.io:1000:uwqr8065-region-CA-st-Nova Scotia-city-Halifax:d6vqwerx'.split(':')
elif city == 'Toronto':
return 'us.novproxy.io:1000:uwqr8065-region-CA-st-Ontario-city-Toronto:d6vqwerx'.split(':')
else:
return None
"""指纹浏览器操作"""
# 创建指纹浏览器
def create_fingerprint_browser(city: str):
"""
根据城市创建指纹浏览器并执行问卷流程
参数:
city (str): 城市名称,例如 `Calgary`、`Edmonton` 等
"""
browser_id = None
try:
proxy = get_proxy(city)
logger.info(f"{city} 准备创建指纹浏览器")
browser_id = bit_browser.bit_browser_create(
remark=city,
host=proxy[0],
port=proxy[1],
proxy_user=proxy[2],
proxy_pwd=proxy[3],
proxy_type='socks5'
)
logger.debug(browser_id)
# 打开指纹浏览器
http = bit_browser.bit_browser_open(browser_id)
logger.debug(http)
auto = Auto(http)
auto.open_url(
"https://veritaconnect.ca/canadianbreadsettlement/en-us/Claimant/UnknownClaimForm")
bol = auto.wait_home()
if not bol:
logger.error(f"{city} 进入首页失败,结束该线程")
return
bol = auto.click_continue()
if not bol:
logger.error(f"{city} 点击 Continue 失败,结束该线程")
return
auto.fill_questionnaire()
time.sleep(5)
finally:
if browser_id:
# 关闭指纹浏览器
try:
bit_browser.bit_browser_close(browser_id)
except Exception as e:
logger.error(f"{city} 关闭浏览器异常: {e}")
# 删除指纹浏览器
try:
bit_browser.bit_browser_delete(browser_id)
except Exception as e:
logger.error(f"{city} 删除浏览器异常: {e}")
def run_city_forever(city: str):
"""
持续循环运行指定城市流程:完成一次即关闭并删除浏览器,然后重新创建继续运行
参数:
city (str): 城市名称
"""
while True:
try:
create_fingerprint_browser(city)
except Exception as e:
logger.error(f"{city} 流程异常: {e}")
time.sleep(2)
def run_all_cities_concurrently():
"""
多线程并发运行所有城市流程
"""
import threading
cities = ['Calgary', 'Edmonton', 'Vancouver', 'Halifax', 'Toronto']
threads = []
for city in cities:
t = threading.Thread(target=run_city_forever, args=(city,), name=f"{city}-thread")
t.start()
threads.append(t)
logger.info(f"{city} 线程已启动")
time.sleep(2)
for t in threads:
t.join()
logger.info("所有城市流程执行完成")
if __name__ == "__main__":
run_all_cities_concurrently()