Files
us_youtube_auto/spider/main.py
2025-12-12 16:32:49 +08:00

789 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import time
from datetime import datetime
from DrissionPage import Chromium
from loguru import logger
from work import generate_child_parent_names
from mail_ import mail_
from bit_browser import bit_browser
from api import api
from proxys import proxy_list
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from auto_challenge import ReCaptchaHandler
class Auto:
def __init__(self, http: str = None):
self.browser = Chromium(http)
# self.browser = Chromium()
self.tab = self.browser.latest_tab
pass
# cf打码
def solve_cloudflare(self, is_ok: bool = False):
tab = self.browser.latest_tab
for _ in range(5):
tab.wait(1)
res = tab.ele(
't:h1@text()=Sorry, you have been blocked', timeout=1)
if res:
logger.error("Cloudflare验证失败")
return False
try:
shadow1 = tab.ele(
'x://*[@name="cf-turnstile-response"]').parent().shadow_root
iframe = shadow1.get_frame(1)
if iframe:
logger.debug("找到Cloudflare iframe")
shadow2 = iframe.ele('x:/html/body').shadow_root
if shadow2:
logger.debug("找到Cloudflare iframe body shadow root")
status = shadow2.ele(
'x://span[text()="Verifying..."]', timeout=1.5)
if status:
tab.wait(3)
status = shadow2.ele(
'x://span[text()="Success!"]', timeout=1.5)
if status:
logger.debug("Cloudflare验证成功")
return True
checkbox = shadow2.ele(
'x://input[@type="checkbox"]', timeout=1.5)
if checkbox:
checkbox.click()
logger.debug("点击Cloudflare复选框")
tab.wait(3)
logger.debug("重新获取状态")
# return False
except Exception as e:
# logger.error(f"处理Cloudflare异常: {e}")
if is_ok:
logger.debug(f"cloudflare处理通过: {e}")
return True
return self.solve_cloudflare(is_ok=True)
tab.wait(1)
return False
# 谷歌验证码
def solve_recaptcha(self):
logger.debug("开始解决谷歌验证码")
recaptcha_handler = ReCaptchaHandler(self.tab)
res = recaptcha_handler.challenge()
if res.get("status"):
logger.debug("谷歌验证码成功")
iframe = self.tab.ele('t:iframe@title=reCAPTCHA')
# print(iframe)
res = iframe.ele('t:div@class=recaptcha-checkbox-border')
if res:
logger.debug(f"html: {res.html}")
if 'display: none;' in res.html:
logger.debug("谷歌验证码成功")
return True
else:
print("No element found")
return False
logger.error("谷歌验证码失败")
return False
# 打开URL
def open_url(self, url: str):
self.tab.get(url)
def get_tab(self):
return self.tab
# 等待进入首页
def wait_home(self):
logger.debug("等待进入首页")
jc = 0
while True:
if jc > 3:
logger.error("等待进入首页超过5次未成功")
return False
self.tab.wait(1)
bol = self.tab.ele(
't:div@text():YOUTUBE PRIVACY SETTLEMENT', timeout=5)
if bol:
logger.debug("成功进入首页")
return True
jc += 1
# 随机取城市
def get_random_city(self, province: str | None = None):
cities = {
"Alberta": ["Calgary", "Edmonton"],
"British Columbia": ["Vancouver"],
# "Manitoba": ["Winnipeg", "Rochester"],
# "New Brunswick": ["Fredericton", "Moncton"],
# "Newfoundland and Labrador": ["St. John's", "Halifax"],
"Nova Scotia": ["Halifax"],
"Ontario": ["Toronto"],
# "Prince Edward Island": ["Charlottetown", "St. John's"],
# "Quebec": ["Quebec City", "Montreal"],
# "Saskatchewan": ["Saskatoon", "Regina"],
}
if province is None:
province = random.choice(list(cities.keys()))
return province, random.choice(cities.get(province, []))
def get_province_by_city(self) -> str | None:
"""
根据城市名称解析对应省份
参数:
city (str): 城市名称,例如 `Calgary`、`Edmonton` 等
返回值:
str | None: 对应的省份名称;未匹配返回 None
"""
mapping = {
"Calgary": "Alberta",
"Edmonton": "Alberta",
"Vancouver": "British Columbia",
"Halifax": "Nova Scotia",
"Toronto": "Ontario",
"Ottawa": "Ontario",
"Mississauga": "Ontario",
"Brampton": "Ontario",
"Hamilton": "Ontario",
"Kitchener": "Ontario",
"London": "Ontario",
"Markham": "Ontario",
"Vaughan": "Ontario",
"Windsor": "Ontario",
"Oshawa": "Ontario",
"Brantford": "Ontario",
"Barrie": "Ontario",
"Sudbury": "Ontario",
"Kingston": "Ontario",
"Guelph": "Ontario",
"Cambridge": "Ontario",
"Sarnia": "Ontario",
"Peterborough": "Ontario",
"Waterloo": "Ontario",
"Belleville": "Ontario",
"Brockville": "Ontario",
"Burlington": "Ontario",
"Cornwall": "Ontario",
"Kawartha Lakes": "Ontario",
"North Bay": "Ontario",
"Orillia": "Ontario",
"Pickering": "Ontario",
"Sault Ste. Marie": "Ontario",
"Stratford": "Ontario",
"Durham": "Ontario",
"Norfolk County": "Ontario",
"Prince Edward County": "Ontario",
"Quinte West": "Ontario",
"St. Catharines": "Ontario",
"Welland": "Ontario",
"Thorold": "Ontario",
"Niagara Falls": "Ontario",
"Pelham": "Ontario",
"Port Colborne": "Ontario",
}
# 随机返回一条 key 和 value
return random.choice(list(mapping.items()))
# 随机实物
def get_random_food(self, city: str, shop: str) -> list[str]:
"""
随机选择 1~2 种食物类别,并为每个类别至少选择 1 个具体产品
参数:
shop (str): 商店名称(当前未使用,占位参数)
返回值:
list[str]: 随机选取的产品名称列表
"""
categories = [
[
'Wonder Bread White',
'Villaggio White Bread',
'No Name Sliced White Bread',
"President's Choice White Sliced Bread",
],
[
"Ben's Original Whole Wheat Bread",
"POM Whole Wheat Bread",
"Silver Hills Bakery Whole Wheat Sliced Bread",
"Country Harvest Whole Wheat Bread",
],
[
"Wonder Bread Hot Dog Buns",
"Villaggio Hamburger Buns",
"Dempster's Dinner Rolls",
"No Frills Hot Dog Buns",
],
[
"Stonemill Bakehouse Bagels",
"Wonder Bagels",
"Montreal Bagels (pre-packaged, e.g., St. Lawrence brand)",
"President's Choice Bagels",
],
[
"Silver Hills Multi-Grain Sliced Bread",
"POM Multi-Grain Bread",
"Country Harvest Multi-Grain Loaf",
],
[
"President's Choice French Stick",
"Dempster's Italian Style Bread",
"Wonder Italian Bread",
"Villaggio Country Style Loaf",
],
]
# 随机选择 1~2 个类别(不重复)
category_count = random.randint(1, 2)
chosen_categories = random.sample(categories, k=category_count)
# 每个类别至少选择 1 个产品,最多选择 3 个以避免过多
selected_products: list[str] = []
for cat in chosen_categories:
max_pick = min(3, len(cat))
pick_count = random.randint(1, max_pick)
selected_products.extend(random.sample(cat, k=pick_count))
logger.debug(f"随机选择的产品: {selected_products}")
text = f'{shop}, {city} buy: '
for p in selected_products:
text += f'{p} * {random.randint(1, 3)}, '
text = text[:-2]
text = text + '.'
logger.debug(f'随机选择的产品文本: {text}')
return text
# 填写问卷
def fill_questionnaire(self):
"""
完成问卷填写
参数:
city (str): 线程启动时传入的城市名称,用于匹配省份并填写数据
"""
try:
info = generate_child_parent_names()
child_full_name = info['child_full_name']
parent_full_name = info['parent_full_name']
child_birthday = info['child_birthday']
# 2023-04-01转为MM/DD/YYYY
child_birthday = datetime.strptime(child_birthday, '%Y-%m-%d').strftime('%m/%d/%Y')
address_str = info['child_address_str']
city_name = info['child_city_name']
postcode = info['child_postcode']
parent_phone = info['parent_phone']
province = info['parent_state']
email = mail_.email_create_random()
# email = 'zhiyu@qq.com'
logger.debug(f"child_full_name --> {child_full_name}")
logger.debug(f"parent_full_name --> {parent_full_name}")
logger.debug(f"child_birthday --> {child_birthday}")
logger.debug(f"address_str --> {address_str}")
logger.debug(f"city_name --> {city_name}")
logger.debug(f"postcode --> {postcode}")
logger.debug(f"parent_phone --> {parent_phone}")
logger.debug(f"province --> {province}")
logger.debug(f"email --> {email}")
self.tab.wait(0.1)
self.tab.ele('t:input@id=name1').input(child_full_name)
self.tab.wait(0.1)
self.tab.ele('t:input@id=name2').input(parent_full_name)
self.tab.wait(0.1)
self.tab.ele('t:input@id=dateOfBirth').input(child_birthday)
self.tab.wait(0.1)
self.tab.ele('t:input@id=street1').input(address_str)
self.tab.wait(0.1)
self.tab.ele('t:input@id=city').input(city_name)
self.tab.wait(0.1)
self.tab.ele(
't:select@formcontrolname=state').ele(f't:option@text():{province}').click()
self.tab.wait(0.1)
self.tab.ele('t:input@id=zip').input(postcode)
self.tab.wait(0.1)
self.tab.ele('t:input@id=phone1').input(parent_phone)
self.tab.wait(0.1)
self.tab.ele('t:input@id=emailAddress').input(email)
self.tab.wait(0.1)
self.tab.ele('t:input@id=confirmEmailemail').input(email)
self.tab.wait(0.1)
self.tab.ele('t:input@@formcontrolname=resideInUS@@id=Yes').click()
self.tab.wait(0.1)
self.tab.ele('t:input@@formcontrolname=watchedDuringPeriod@@id=Yes').click()
self.tab.wait(0.1)
self.tab.ele('t:input@id=signatureMinor').input(child_full_name)
self.tab.wait(0.1)
self.tab.ele('t:input@id=signatureParentGuardian').input(parent_full_name)
return self.submit_file(
child_full_name=child_full_name,
parent_full_name=parent_full_name,
child_birthday=child_birthday,
address_str=address_str,
city_name=city_name,
parent_phone=parent_phone,
postcode=postcode,
province=province,
email=email,
text=""
)
except Exception as e:
logger.error(f"填写问卷失败: {e}")
return False
# 提交问卷
def submit_file(self, child_full_name: str, parent_full_name: str, child_birthday: str, address_str: str, city_name: str, parent_phone: str, postcode: str, province: str, email: str, text: str):
"""
提交问卷后的数据保存到后端服务(孩子与家长字段)
参数:
child_full_name (str): 孩子全名
parent_full_name (str): 家长全名
child_birthday (str): 孩子生日(字符串,已为 MM/DD/YYYY
address_str (str): 街道地址
city_name (str): 城市
parent_phone (str): 家长电话
postcode (str): 邮编
province (str): 省/州全称
email (str): 邮箱
text (str): 文本内容(如反馈地址)
"""
jc = 0
while True:
if jc >= 3:
logger.error("提交问卷失败")
return False
res = self.solve_recaptcha()
if not res:
jc += 1
continue
res = self.tab.ele('t:button@text():SUBMIT')
if res:
logger.debug(f"点击Submit按钮")
res.click()
self.tab.wait(3)
res = self.tab.ele(
't:h2@text()=THANK YOU FOR SUBMITTING YOUR INFORMATION', timeout=1)
if res:
logger.info("提交问卷成功")
logger.info(f"反馈地址: {text}")
res = self.tab.ele('t:b')
if res:
logger.info(f"反馈地址: {res.text}")
text = res.text
status = True
else:
status=False
api.create_info(
child_full_name=child_full_name,
parent_full_name=parent_full_name,
child_birthday=child_birthday,
address_str=address_str,
city_name=city_name,
parent_phone=parent_phone,
postcode=postcode,
province=province,
email=email,
text=text,
status=status
)
return True
bol = self.tab.ele(
't:div@text():ERR_TIMED_OUT', timeout=1)
if bol:
logger.debug("刷新网页")
self.tab.refresh()
self.tab.wait(1.5)
bol = self.tab.ele(
't:div@text():ERR_SSL_PROTOCOL_ERROR', timeout=1)
if bol:
logger.debug("刷新网页")
self.tab.refresh()
self.tab.wait(1.5)
bol = self.tab.ele(
't:div@text():ERR_SOCKS_CONNECTION_FAILED', timeout=1)
if bol:
logger.debug("刷新网页")
self.tab.refresh()
self.tab.wait(1.5)
jc += 1
def parse_proxy(proxy: str) -> tuple[str, int, str, str] | None:
"""
解析代理字符串为四元组 `(host, port, user, pwd)`
参数:
proxy: 形如 `host:port:user:pwd`
返回值:
(host, port, user, pwd) 或 None格式错误
"""
try:
host, port, user, pwd = proxy.split(":", 3)
return host, int(port), user, pwd
except Exception:
logger.error(f"代理格式错误: {proxy}")
return None
def create_fingerprint_browser(proxy: str) -> tuple[str, str] | None:
"""
创建指纹浏览器并打开窗口,返回 `(browser_id, debugger_http)`
参数:
proxy: 代理字符串
返回值:
(browser_id, http) 或 None失败
"""
info = parse_proxy(proxy)
if info is None:
return None
host, port, user, pwd = info
try:
browser_id = bit_browser.bit_browser_create(
remark=f"{user}",
proxy_type="socks5",
host=host,
port=str(port),
proxy_user=user,
proxy_pwd=pwd,
)
if not browser_id:
return None
logger.info(f"创建指纹浏览器成功: {browser_id}")
time.sleep(1)
http = bit_browser.bit_browser_open(browser_id)
if not http:
return None
logger.info(f"打开指纹浏览器成功: {browser_id}")
return browser_id, http
except Exception as e:
logger.error(f"创建指纹浏览器失败: {e}")
return None
def close_and_delete_browser(browser_id: str) -> None:
"""
关闭并删除指定指纹浏览器
参数:
browser_id: 指纹浏览器ID
"""
try:
bit_browser.bit_browser_close(browser_id)
except Exception as e:
logger.warning(f"关闭浏览器失败或已关闭: {browser_id} - {e}")
time.sleep(1)
try:
bit_browser.bit_browser_delete(browser_id)
except Exception as e:
logger.warning(f"删除浏览器失败或已删除: {browser_id} - {e}")
def timeout_guard(browser_id: str, done_event: threading.Event, timeout_sec: float = 300.0) -> None:
"""
任务超时守护:超过指定时间未完成则关闭并删除对应指纹浏览器
参数:
browser_id: 指纹浏览器ID
done_event: 任务完成事件,完成则终止守护
timeout_sec: 超时时长(秒),默认 300 秒
"""
finished = done_event.wait(timeout=timeout_sec)
if finished:
return
logger.warning(f"任务超过 {timeout_sec} 秒未完成,开始清理浏览器: {browser_id}")
try:
close_and_delete_browser(browser_id)
except Exception as e:
logger.warning(f"超时清理浏览器失败: {browser_id} - {e}")
def run_task_with_proxy(proxy: str, stop_event: threading.Event) -> None:
"""
使用代理创建指纹浏览器、执行自动化,并在结束后清理;内置 5 分钟超时守护
参数:
proxy: 代理字符串
"""
browser_id: str | None = None
done_event = threading.Event()
try:
created = create_fingerprint_browser(proxy)
if not created:
return
browser_id, http = created
logger.info(f"browser_id: {browser_id} http: {http}")
# 启动超时守护线程5分钟
guard = threading.Thread(target=timeout_guard, args=(browser_id, done_event, 300.0), daemon=True)
guard.start()
if stop_event.is_set():
return
auto = Auto(http=http)
auto.open_url('https://www.claimform.youtubeprivacysettlement.com')
if stop_event.is_set():
return
if not auto.wait_home():
return
if stop_event.is_set():
return
auto.fill_questionnaire()
except Exception as e:
logger.error(f"执行任务异常: {e}")
finally:
# 标记任务已完成,终止守护线程
try:
done_event.set()
except Exception:
pass
if browser_id:
try:
close_and_delete_browser(browser_id)
except Exception:
pass
def proxy_loop(proxy: str, stop_event: threading.Event) -> None:
"""
为单个代理保持持续运行:任务结束后立即重建并再次执行
参数:
proxy: 代理字符串
stop_event: 停止事件,用于外部触发退出循环
"""
while not stop_event.is_set():
try:
if is_forbidden_time():
if stop_event.wait(timeout=60):
break
cleanup_all_browsers()
secs = seconds_until(20, 0)
if stop_event.wait(timeout=secs):
break
continue
run_task_with_proxy(proxy, stop_event)
except Exception as e:
logger.error(f"代理循环异常: {proxy} - {e}")
if stop_event.is_set():
break
if stop_event.wait(timeout=0.1):
break
def is_forbidden_time() -> bool:
"""
判断当前是否处于禁跑时段(每日 18:30 ~ 20:00本地时间
返回值:
bool: True 表示处于禁跑时段
"""
# 去除晚上停止功能
return False
# 禁跑时段为 18:30 ~ 20:00
now = datetime.now()
start = now.replace(hour=18, minute=30, second=0, microsecond=0)
end = now.replace(hour=20, minute=0, second=0, microsecond=0)
return start <= now < end
def wait_until_out_of_forbidden(interval_sec: float = 5.0, stop_event: threading.Event | None = None) -> None:
"""
在禁跑时段内循环等待,直到禁跑时段结束
参数:
interval_sec: 轮询间隔秒数
stop_event: 可选停止事件,若设置则在等待期间可提前结束
"""
while is_forbidden_time():
if stop_event is not None and stop_event.wait(timeout=interval_sec):
break
time.sleep(interval_sec)
def seconds_until(hour: int, minute: int) -> float:
"""
计算到今天指定时间点的剩余秒数
参数:
hour: 目标小时24小时制
minute: 目标分钟
返回值:
float: 剩余秒数,若目标时间已过则为 0
"""
now = datetime.now()
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
return 0.0
return (target - now).total_seconds()
def count_fingerprint_browsers() -> int:
"""
统计当前指纹浏览器数量
返回值:
int: 当前总数量
"""
try:
res = bit_browser.bit_browser_get(0, 100)
data = res.get("data", {}) if isinstance(res, dict) else {}
total = data.get("totalNum")
lst = data.get("list", [])
if isinstance(total, int) and total >= 0:
return total
return len(lst)
except Exception as e:
logger.warning(f"统计指纹浏览器数量失败: {e}")
return 0
def cleanup_all_browsers() -> None:
"""
关闭并删除所有指纹浏览器
"""
try:
res = bit_browser.bit_browser_get(0, 100)
data = res.get("data", {}) if isinstance(res, dict) else {}
lst = data.get("list", [])
ids = [i.get("id") for i in lst if i.get("id")]
for bid in ids:
close_and_delete_browser(bid)
except Exception as e:
logger.warning(f"清理所有指纹浏览器失败: {e}")
def delete_excess_browsers(limit: int) -> None:
"""
删除超出上限的指纹浏览器,从列表末尾开始删除
参数:
limit: 允许的最大浏览器数量
"""
try:
res = bit_browser.bit_browser_get(0, 100)
data = res.get("data", {}) if isinstance(res, dict) else {}
lst = data.get("list", [])
ids = [i.get("id") for i in lst if i.get("id")]
count = len(ids)
if count <= limit:
return
excess = count - limit
to_delete = ids[-excess:]
for bid in reversed(to_delete):
close_and_delete_browser(bid)
logger.info(f"已删除超出数量 {excess},当前限制为 {limit}")
except Exception as e:
logger.warning(f"删除超额浏览器失败: {e}")
def monitor_browsers_and_restart(limit: int, stop_event: threading.Event, restart_event: threading.Event) -> None:
"""
每 3 秒检测指纹浏览器数量,超过 `limit` 则从末尾删除超出部分
参数:
limit: 允许的最大浏览器数量(通常为代理数量)
restart_event: 触发重启的事件(当前策略不使用)
"""
while not stop_event.is_set():
time.sleep(3)
count = count_fingerprint_browsers()
if count > limit:
logger.warning(f"指纹浏览器数量 {count} 超过限制 {limit},开始删除超出部分")
delete_excess_browsers(limit)
def main():
"""
多线程并发管理:按代理数量并发创建指纹浏览器并执行任务;每 3 秒监控数量,超限则从末尾删除多余浏览器。
"""
proxies = list(proxy_list)
while True:
stop_event = threading.Event()
restart_event = threading.Event()
if is_forbidden_time():
if stop_event.wait(timeout=60):
continue
cleanup_all_browsers()
logger.info("处于禁跑时段,等待至禁跑结束")
wait_until_out_of_forbidden()
continue
executor = ThreadPoolExecutor(max_workers=len(proxies))
try:
futures_map = {executor.submit(proxy_loop, p, stop_event): p for p in proxies}
monitor_thread = threading.Thread(
target=monitor_browsers_and_restart,
args=(len(proxies), stop_event, restart_event),
daemon=True,
)
monitor_thread.start()
while True:
if restart_event.is_set():
stop_event.set()
try:
executor.shutdown(wait=True)
except Exception:
pass
break
if is_forbidden_time():
logger.info("进入禁跑时段停止当前批次等待1分钟后清理指纹浏览器")
stop_event.set()
try:
executor.shutdown(wait=True)
except Exception:
pass
time.sleep(60)
cleanup_all_browsers()
wait_until_out_of_forbidden()
break
for f, proxy in list(futures_map.items()):
if f.done() and not stop_event.is_set() and not restart_event.is_set():
try:
_ = f.exception()
except Exception:
pass
try:
new_future = executor.submit(proxy_loop, proxy, stop_event)
del futures_map[f]
futures_map[new_future] = proxy
except Exception as e:
logger.error(f"重启代理线程失败: {proxy} - {e}")
time.sleep(0.2)
try:
monitor_thread.join(timeout=5)
except Exception:
pass
finally:
try:
executor.shutdown(wait=True)
except Exception:
pass
continue
def main2():
auto = Auto()
auto.open_url('https://www.claimform.youtubeprivacysettlement.com')
bol = auto.wait_home()
if not bol:
return
auto.fill_questionnaire()
# auto.solve_recaptcha()
if __name__ == "__main__":
main()