Files
us_youtube_auto/spider/auto_challenge.py
2025-12-12 14:40:04 +08:00

314 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import io
import time
import uuid
from typing import Optional, List
import requests
from PIL import Image
import base64
from loguru import logger
RESAMPLE_FILTER = Image.Resampling.LANCZOS
class ReCaptchaHandler:
path_map_44 = {
0: "//table/tbody/tr[1]/td[1]",
1: "//table/tbody/tr[1]/td[2]",
2: "//table/tbody/tr[1]/td[3]",
3: "//table/tbody/tr[1]/td[4]",
4: "//table/tbody/tr[2]/td[1]",
5: "//table/tbody/tr[2]/td[2]",
6: "//table/tbody/tr[2]/td[3]",
7: "//table/tbody/tr[2]/td[4]",
8: "//table/tbody/tr[3]/td[1]",
9: "//table/tbody/tr[3]/td[2]",
10: "//table/tbody/tr[3]/td[3]",
11: "//table/tbody/tr[3]/td[4]",
12: "//table/tbody/tr[4]/td[1]",
13: "//table/tbody/tr[4]/td[2]",
14: "//table/tbody/tr[4]/td[3]",
15: "//table/tbody/tr[4]/td[4]",
}
path_map_33 = {
0: "//table/tbody/tr[1]/td[1]",
1: "//table/tbody/tr[1]/td[2]",
2: "//table/tbody/tr[1]/td[3]",
3: "//table/tbody/tr[2]/td[1]",
4: "//table/tbody/tr[2]/td[2]",
5: "//table/tbody/tr[2]/td[3]",
6: "//table/tbody/tr[3]/td[1]",
7: "//table/tbody/tr[3]/td[2]",
8: "//table/tbody/tr[3]/td[3]",
}
api_host="http://192.168.11.13:7070/analyze_batch/"
def __init__(self, driver):
self.driver = driver
self.checkbox_iframe = None
self.challenge_iframe = None
self.challenge_type = None
self.challenge_question = None
self.challenge_i33_first = True
self.i11s = {}
self.challenge_44_img = None
@staticmethod
def split_image(image_bytes: bytes) -> Optional[List[str]]:
try:
image_stream = io.BytesIO(image_bytes)
img = Image.open(image_stream)
except:
return None
width, height = img.size
tile_width = width // 3
tile_height = height // 3
base64_tiles = []
for i in range(3):
for j in range(3):
left = j * tile_width
upper = i * tile_height
right = (j + 1) * tile_width if j < 2 else width
lower = (i + 1) * tile_height if i < 2 else height
tile = img.crop((left, upper, right, lower))
buf = io.BytesIO()
tile.save(buf, format="PNG")
b64 = base64.b64encode(buf.getvalue()).decode()
base64_tiles.append(b64)
return base64_tiles
def find_checkbox_iframe(self):
time.sleep(1)
try:
iframe = self.driver.ele('css: iframe[title="reCAPTCHA"]')
if iframe:
self.checkbox_iframe = iframe
self.checkbox_iframe.ele("#recaptcha-anchor").click()
return True
except:
pass
return False
def find_challenge_iframe(self):
try:
iframe = self.driver.ele("@|title=recaptcha challenge expires in two minutes@|title=reCAPTCHA 验证任务将于 2 分钟后过期")
# logger.info(f"iframe: {iframe}")
if iframe:
self.challenge_iframe = iframe
return True
except:
pass
return False
def check_11_refresh(self, check_ele):
for k, v in self.i11s.items():
if v.get("new"):
self.i11s[k]['new'] = False
check_ele = [i[0] for i in check_ele]
for idx in check_ele:
if idx not in self.i11s:
self.i11s[idx] = {'srcs': [], 'new': False}
while True:
ele = self.challenge_iframe.ele('#rc-imageselect-target').ele(
f"xpath:{self.path_map_33[idx]}")
img_ele = ele.ele('.rc-image-tile-11', timeout=0.1)
if not img_ele:
time.sleep(0.1)
continue
byte_data = img_ele.src()
b64_str = base64.b64encode(byte_data).decode()
if b64_str not in self.i11s[idx]['srcs']:
self.i11s[idx]['srcs'].append(b64_str)
self.i11s[idx]['new'] = True
break
def click_answer(self, result, challenge_type):
if challenge_type == 4:
for x in result["results"][0]['result']:
self.challenge_iframe.ele('#rc-imageselect-target').ele(
f"xpath:{self.path_map_44[x]}").click()
time.sleep(0.1)
# if not result["results"][0]['result']:
# try:
# image_bytes = base64.b64decode(self.challenge_44_img)
# name = str(uuid.uuid4())
# with open(rf"{name}.png",'wb') as f:
# f.write(image_bytes)
# except:
# pass
self.challenge_iframe.ele('#recaptcha-verify-button').click()
self.i11s.clear()
return True
if challenge_type == 3:
found_ele = []
for res in result["results"]:
if res["result"].get('target_found'):
idx = int(res["image_id"])
self.challenge_iframe.ele('#rc-imageselect-target').ele(
f"xpath:{self.path_map_33[idx]}").click()
found_ele.append((idx, self.path_map_33[idx]))
time.sleep(0.1)
if found_ele:
if len(found_ele) <= 2 and self.challenge_i33_first:
self.challenge_iframe.ele('#recaptcha-reload-button').click()
return False
cls = self.challenge_iframe.ele('#rc-imageselect-target').ele(
f"xpath:{found_ele[0][1]}").attr('class')
if 'rc-imageselect-tileselected' in cls:
self.challenge_iframe.ele('#recaptcha-verify-button').click()
self.i11s.clear()
return True
self.check_11_refresh(found_ele)
return False
self.challenge_iframe.ele('#recaptcha-verify-button').click()
self.i11s.clear()
return True
return False
def challenge_i33(self):
if len(self.challenge_iframe.eles('.rc-image-tile-33', timeout=1)) == 9:
self.challenge_i33_first = True
self.i11s.clear()
first_ele = self.challenge_iframe.eles('.rc-image-tile-33')[0]
byte_data = first_ele.src()
tiles = self.split_image(byte_data)
if tiles:
images = {i: t for i, t in enumerate(tiles)}
if res := self.identify_verification_code(images):
self.click_answer(res, 3)
else:
self.challenge_i33_first = False
data = {}
for k, v in self.i11s.items():
if v['new']:
img_b64 = v['srcs'][-1]
data[k] = img_b64
if res := self.identify_verification_code(data):
self.click_answer(res, 3)
def challenge_i44(self):
ele = self.challenge_iframe.eles('.rc-image-tile-44')[0]
byte_data = ele.src()
b64_str = base64.b64encode(byte_data).decode()
self.challenge_44_img = b64_str
if res := self.identify_verification_code({0: b64_str}):
self.click_answer(res, 4)
def identify_verification_code(self, images):
data = {"images": []}
for k, img in images.items():
if img:
data["images"].append({
"image_id": str(k),
"image_base64": img,
"target_class": self.challenge_question
})
if data['images']:
res = requests.post(self.api_host, json=data)
return res.json()
return None
def challenge(self):
if not self.find_checkbox_iframe():
return {"status": False, "message": "no verification code found"}
url_before = self.driver.url
# logger.info(f"url_before: {url_before}")
self.find_challenge_iframe()
if not self.challenge_iframe:
return {"status": False, "message": "no verification code found"}
while True:
time.sleep(1)
if self.driver.url != url_before:
return {"status": True, "message": "验证码自动通过1"}
if self.checkbox_iframe.ele("#recaptcha-anchor").attr('aria-checked') == 'true':
return {"status": True, "message": "验证码自动通过2"}
# 兼容 ChromiumFrame 无 style() 方法:优先读取 style 属性,其次使用 JS 计算样式
vis = None
try:
style_str = self.challenge_iframe.attr('style') or ''
if 'visibility' in style_str:
vis = 'hidden' if 'visibility: hidden' in style_str.replace(' ', '') else 'visible'
except Exception:
pass
if vis is None:
try:
# 通过 JS 获取 iframe 的可见性
vis = self.driver.run_js(
'var f = document.querySelector("iframe[title=\\"recaptcha challenge expires in two minutes\\"]") || document.querySelector("iframe[title=\\"reCAPTCHA 验证任务将于 2 分钟后过期\\"]");'
'f ? getComputedStyle(f).visibility : null;'
)
except Exception:
vis = None
if vis != 'hidden':
break
# try:
# if self.driver.url != url_before:
# return {"status": True, "message": "验证码自动通过1"}
# if self.checkbox_iframe.ele("#recaptcha-anchor").attr('aria-checked') == 'true':
# return {"status": True, "message": "验证码自动通过2"}
# if self.challenge_iframe.style('visibility') != 'hidden':
# logger.info(222)
# break
# except:
# logger.error("challenge error")
# pass
try:
while True:
# 重复使用可见性判断,避免依赖不存在的 style()
vis = None
try:
style_str = self.challenge_iframe.attr('style') or ''
if 'visibility' in style_str:
vis = 'hidden' if 'visibility: hidden' in style_str.replace(' ', '') else 'visible'
except Exception:
pass
if vis is None:
try:
vis = self.driver.run_js(
'var f = document.querySelector("iframe[title=\\"recaptcha challenge expires in two minutes\\"]") || document.querySelector("iframe[title=\\"reCAPTCHA 验证任务将于 2 分钟后过期\\"]");'
'f ? getComputedStyle(f).visibility : null;'
)
except Exception:
vis = None
if vis == 'hidden':
break
time.sleep(1)
if self.driver.url != url_before:
return {"status": True, "message": "captcha successfully resolved"}
if self.checkbox_iframe.ele("#recaptcha-anchor").attr('aria-checked') == 'true':
return {"status": True, "message": "captcha successfully resolved"}
# 获取题目
self.challenge_question = self.challenge_iframe.ele("tag:strong").text
# 判断 4×4
if self.challenge_iframe.ele('.rc-image-tile-44', timeout=0.1):
self.challenge_i44()
# 判断 3×3 或 1×1
elif self.challenge_iframe.ele('.rc-image-tile-33', timeout=0.1) or \
self.challenge_iframe.ele('.rc-image-tile-11', timeout=0.1):
self.challenge_i33()
except:
pass
return {"status": True, "message": "captcha successfully resolved"}