from math import log import random from re import S import time from tkinter import N from DrissionPage import Chromium from loguru import logger from work import get_random_canada_info from mail_ import mail_ from bit_browser import bit_browser from api import api class Auto: def __init__(self, http: str = None): self.browser = Chromium(http) self.tab = self.browser.latest_tab pass # cf打码 def solve_cloudflare(self, is_ok: bool = False): tab = self.browser.latest_tab for _ in range(3): self.tab.wait(1) try: shadow1 = tab.ele( 'x://*[@name="cf-turnstile-response"]').parent().shadow_root iframe = shadow1.get_frame(1) if iframe: logger.debug("找到Cloudflare iframe") shadow2 = iframe.ele('x:/html/body').shadow_root if shadow2: logger.debug("找到Cloudflare iframe body shadow root") status = shadow2.ele( 'x://span[text()="Verifying..."]', timeout=1.5) if status: tab.wait(3) status = shadow2.ele( 'x://span[text()="Success!"]', timeout=1.5) if status: logger.debug("Cloudflare验证成功") return True checkbox = shadow2.ele( 'x://input[@type="checkbox"]', timeout=1.5) if checkbox: checkbox.click() logger.debug("点击Cloudflare复选框") tab.wait(3) logger.debug("重新获取状态") # return False except Exception as e: # logger.error(f"处理Cloudflare异常: {e}") if is_ok: logger.debug(f"cloudflare处理通过: {e}") return True return self.solve_cloudflare(is_ok=True) tab.wait(1) return False # 打开URL def open_url(self, url: str): self.tab.get(url) # 等待进入首页 def wait_home(self): logger.debug("等待进入首页") jc = 0 while True: if jc > 3: logger.error("等待进入首页超过5次,未成功") return False self.tab.wait(1) # 判断cf是否通过 bol = self.solve_cloudflare() if not bol: logger.debug("Cloudflare验证失败.") continue else: logger.debug("Cloudflare验证成功.") self.tab.wait(1.5) bol = self.tab.ele( 't:h1@text():Sorry, you have been blocked', timeout=1) if bol: logger.debug("ip被ban秒") return False bol = self.tab.ele( 't:div@text():ERR_SSL_PROTOCOL_ERROR', timeout=1) if bol: logger.debug("刷新网页") self.tab.refresh() self.tab.wait(1.5) html = self.tab.url logger.debug(f"当前URL: {html}") if 'https://veritaconnect.ca/canadianbreadsettlement/en-us' == html: logger.debug("成功进入首页") return True jc += 1 # 点击continue按钮 def click_continue(self, bl: bool = False): logger.debug("点击Continue按钮") jc = 0 while True: if jc > 3: logger.error("点击Continue按钮超过5次,未成功") return False try: continue_button = self.tab.ele( 't:button@text():Continue', timeout=1) if continue_button: # 滚动到最底部 self.tab.scroll.to_bottom() self.tab.wait(1) # 判断cf是否通过 bol = self.solve_cloudflare() if not bol: logger.debug("Cloudflare验证失败..") continue else: logger.debug("Cloudflare验证成功..") continue_button.click() logger.debug("点击Continue按钮成功") self.tab.wait(1.5) bol = self.tab.ele('t:div@text():Loading...', timeout=1) if bol: logger.debug("Loading...") if bl: logger.debug("多次异常界面, 结束继续点击") return False logger.debug("异常界面") self.tab.wait(1) return self.click_continue(bl=True) bol = self.tab.ele( 't:h2@text()=You are being rate limited', timeout=1) if bol: logger.debug("被限流, 退出") return False bol = self.tab.ele( 't:li@text():There was a problem, please try again.', timeout=1) if bol: if bl: logger.debug("多次异常界面, 结束继续点击") return False logger.debug("异常界面") self.tab.wait(1) return self.click_continue(bl=True) html = self.tab.url logger.debug(f"当前URL: {html}") if 'https://veritaconnect.ca/canadianbreadsettlement/en-us/Claimant/UnknownClaimForm' in html: logger.debug("成功进入问卷界面") return True except Exception as e: logger.error(f"点击Continue按钮异常: {e}") self.tab.wait(1) return False # 随机取城市 def get_random_city(self, province: str | None = None): cities = { "Alberta": ["Calgary", "Edmonton"], "British Columbia": ["Vancouver"], # "Manitoba": ["Winnipeg", "Rochester"], # "New Brunswick": ["Fredericton", "Moncton"], # "Newfoundland and Labrador": ["St. John's", "Halifax"], "Nova Scotia": ["Halifax"], "Ontario": ["Toronto"], # "Prince Edward Island": ["Charlottetown", "St. John's"], # "Quebec": ["Quebec City", "Montreal"], # "Saskatchewan": ["Saskatoon", "Regina"], } if province is None: province = random.choice(list(cities.keys())) return province, random.choice(cities.get(province, [])) def get_province_by_city(self, city: str) -> str | None: """ 根据城市名称解析对应省份 参数: city (str): 城市名称,例如 `Calgary`、`Edmonton` 等 返回值: str | None: 对应的省份名称;未匹配返回 None """ mapping = { "Calgary": "Alberta", "Edmonton": "Alberta", "Vancouver": "British Columbia", "Halifax": "Nova Scotia", "Toronto": "Ontario", } return mapping.get(city) # 随机实物 def get_random_food(self, city: str, shop: str) -> list[str]: """ 随机选择 1~2 种食物类别,并为每个类别至少选择 1 个具体产品 参数: shop (str): 商店名称(当前未使用,占位参数) 返回值: list[str]: 随机选取的产品名称列表 """ categories = [ [ 'Wonder Bread White', 'Villaggio White Bread', 'No Name Sliced White Bread', "President's Choice White Sliced Bread", ], [ "Ben's Original Whole Wheat Bread", "POM Whole Wheat Bread", "Silver Hills Bakery Whole Wheat Sliced Bread", "Country Harvest Whole Wheat Bread", ], [ "Wonder Bread Hot Dog Buns", "Villaggio Hamburger Buns", "Dempster's Dinner Rolls", "No Frills Hot Dog Buns", ], [ "Stonemill Bakehouse Bagels", "Wonder Bagels", "Montreal Bagels (pre-packaged, e.g., St. Lawrence brand)", "President's Choice Bagels", ], [ "Silver Hills Multi-Grain Sliced Bread", "POM Multi-Grain Bread", "Country Harvest Multi-Grain Loaf", ], [ "President's Choice French Stick", "Dempster's Italian Style Bread", "Wonder Italian Bread", "Villaggio Country Style Loaf", ], ] # 随机选择 1~2 个类别(不重复) category_count = random.randint(1, 2) chosen_categories = random.sample(categories, k=category_count) # 每个类别至少选择 1 个产品,最多选择 3 个以避免过多 selected_products: list[str] = [] for cat in chosen_categories: max_pick = min(3, len(cat)) pick_count = random.randint(1, max_pick) selected_products.extend(random.sample(cat, k=pick_count)) logger.debug(f"随机选择的产品: {selected_products}") text = f'{shop}, {city} buy: ' for p in selected_products: text += f'{p} * {random.randint(1, 3)}, ' text = text[:-2] text = text + '.' logger.debug(f'随机选择的产品文本: {text}') return text # 填写问卷 def fill_questionnaire(self, city: str): """ 根据传入的城市解析省份并完成问卷填写 参数: city (str): 线程启动时传入的城市名称,用于匹配省份并填写数据 """ try: province = self.get_province_by_city(city) if province is None: logger.error(f"未找到城市对应省份: {city}") return j = 0 while True: if j > 3: return False info = get_random_canada_info(province, city) if len(info.get('postcode')) > 5: break j += 1 first_name = info["firstname"] last_name = info["lastname"] # 将生日格式从 '8/28/1995' 转为 'yyyy-mm-dd',日月不足两位补0 birthday = info["birthday"] current_address = info["address_str"] # 保持使用线程传入的城市与解析出的省份 postal_code = info["postcode"] email = mail_.email_create_random() phone = info["phone"] shop = api.get_random_shop() if shop is None: return None street = shop.get('street') if street is None: return None text = self.get_random_food(shop.get('city'), street) # 人数 person_count = str(random.randint(3, 5)) logger.debug("填写问卷") self.tab.wait(0.1) logger.debug(f"填写first_name: {first_name}") self.tab.ele('t:input@id=FirstName').set.value(first_name) self.tab.wait(0.1) logger.debug(f"填写last_name: {last_name}") self.tab.ele('t:input@id=LastName').set.value(last_name) self.tab.wait(0.1) logger.debug(f"填写birthday: {birthday}") self.tab.ele('t:input@id=DateOfBirth').set.value(birthday) self.tab.wait(0.1) logger.debug(f"填写current_address: {current_address}") self.tab.ele('t:input@id=AddressLine1').set.value(current_address) self.tab.wait(0.1) logger.debug(f"填写city: {city}") self.tab.ele('t:input@id=City').set.value(city) self.tab.wait(0.1) logger.debug(f"填写province: {province}") self.tab.ele( 't:select@id=CanProv').ele(f't:option@text()={province}').click() self.tab.wait(0.1) logger.debug(f"填写postal_code: {postal_code}") self.tab.ele('t:input@id=CanPostal').set.value(postal_code) self.tab.wait(0.1) logger.debug(f"填写NumberOfAdults: {person_count}") self.tab.ele( 't:select@id=NumberOfAdults').ele(f't:option@text()={person_count}').click() self.tab.wait(0.1) logger.debug(f"选择地址没变") self.tab.eles('t:input@id=IsDifferentAddress')[1].click() self.tab.wait(0.1) logger.debug(f"填写email: {email}") self.tab.ele('t:input@id=EmailAddress').set.value(email) self.tab.wait(0.1) logger.debug(f"填写ConfirmEmailAddress: {email}") self.tab.ele('t:input@id=ConfirmEmailAddress').set.value(email) self.tab.wait(0.1) logger.debug(f"填写phone: {phone}") self.tab.ele('t:input@id=PhoneNumber').set.value(phone) self.tab.wait(0.1) logger.debug(f"选择同意条款") self.tab.ele('t:input@id=IVerify').click() self.tab.wait(0.1) logger.debug(f"选择没有申请过") self.tab.eles('t:input@id=IsCompensated')[1].click() self.tab.wait(0.1) logger.debug(f"填写text: {text}") self.tab.ele('t:textarea@id=MetaAnswerA').set.value(text) self.tab.wait(0.1) logger.debug(f"勾选同意我的名字") self.tab.ele('t:input@id=IDeclare').click() self.tab.wait(0.1) logger.debug(f"填写PrintName: {last_name+' '+first_name}") self.tab.ele( 't:input@id=PrintName').set.value(last_name+' '+first_name) self.tab.wait(0.1) for i in range(3): bol = self.solve_cloudflare() if not bol: logger.debug("Cloudflare验证失败.") self.tab.wait(0.1) else: logger.debug("Cloudflare验证成功.") logger.debug(f"点击Submit按钮") self.tab.ele('t:button@text():Submit').click() break api.create_info( first_name=first_name, last_name=last_name, birthday=birthday, current_address=current_address, city=city, phone=phone, postal_code=postal_code, province=province, email=email, text=text ) self.tab.wait(2) except Exception as e: logger.error(f"填写问卷失败: {e}") # 取对应城市的代理 def get_proxy(city: str): if city == "Calgary": return "us.novproxy.io:1000:ozua8623-region-CA-st-Alberta-city-Calgary:6wdcv4gq".split(':') elif city == 'Edmonton': return 'us.novproxy.io:1000:ozua8623-region-CA-st-Alberta-city-Edmonton:6wdcv4gq'.split(':') elif city == 'Vancouver': return 'us.novproxy.io:1000:ozua8623-region-CA-st-British Columbia-city-Vancouver:6wdcv4gq'.split(':') elif city == 'Halifax': return 'us.novproxy.io:1000:ozua8623-region-CA-st-Nova Scotia-city-Halifax:6wdcv4gq'.split(':') elif city == 'Toronto': return 'us.novproxy.io:1000:ozua8623-region-CA-st-Ontario-city-Toronto:6wdcv4gq'.split(':') else: return None """指纹浏览器操作""" # 创建指纹浏览器 def create_fingerprint_browser(city: str): """ 根据城市创建指纹浏览器并执行问卷流程 参数: city (str): 城市名称,例如 `Calgary`、`Edmonton` 等 """ browser_id = None try: proxy = get_proxy(city) if proxy is None: logger.error(f"{city} 未配置对应代理,结束该线程") return logger.info(f"{city} 准备创建指纹浏览器") browser_id = bit_browser.bit_browser_create( remark=city, host=proxy[0], port=proxy[1], proxy_user=proxy[2], proxy_pwd=proxy[3], proxy_type='socks5' ) logger.debug(browser_id) # 打开指纹浏览器 http = bit_browser.bit_browser_open(browser_id) logger.debug(http) auto = Auto(http) auto.open_url( "https://veritaconnect.ca/canadianbreadsettlement/en-us/Claimant/UnknownClaimForm") bol = auto.wait_home() if not bol: logger.error(f"{city} 进入首页失败,结束该线程") return bol = auto.click_continue() if not bol: logger.error(f"{city} 点击 Continue 失败,结束该线程") return auto.fill_questionnaire(city) time.sleep(5) finally: if browser_id: # 关闭指纹浏览器 try: bit_browser.bit_browser_close(browser_id) except Exception as e: logger.error(f"{city} 关闭浏览器异常: {e}") # 删除指纹浏览器 try: bit_browser.bit_browser_delete(browser_id) except Exception as e: logger.error(f"{city} 删除浏览器异常: {e}") def run_city_forever(city: str): """ 持续循环运行指定城市流程:完成一次即关闭并删除浏览器,然后重新创建继续运行 参数: city (str): 城市名称 """ while True: try: create_fingerprint_browser(city) except Exception as e: logger.error(f"{city} 流程异常: {e}") time.sleep(2) def run_all_cities_concurrently(): """ 多线程并发运行所有城市流程 """ import threading cities = ['Calgary', 'Edmonton', 'Vancouver', 'Halifax', 'Toronto'] # cities = ['Calgary'] threads = [] for city in cities: t = threading.Thread(target=run_city_forever, args=(city,), name=f"{city}-thread") t.start() threads.append(t) logger.info(f"{city} 线程已启动") # time.sleep(2) for t in threads: t.join() logger.info("所有城市流程执行完成") if __name__ == "__main__": # auto = Auto() # auto.get_random_food('a') run_all_cities_concurrently()