|
import asyncio |
|
import logging |
|
import re |
|
from contextlib import AsyncExitStack |
|
from pathlib import Path |
|
from typing import Literal, Optional, Self |
|
|
|
import platform |
|
from playwright.async_api import Browser, BrowserContext, Page, Playwright, async_playwright |
|
from playwright.async_api import TimeoutError as PlaywrightTimeoutError |
|
from playwright_stealth import stealth_async |
|
from pydantic import Field |
|
from tenacity import before_sleep_log, retry, stop_after_delay, wait_exponential |
|
|
|
from proxy_lite.browser.bounding_boxes import POI, BoundingBox, Point, annotate_bounding_boxes |
|
from proxy_lite.logger import logger |
|
|
|
SELF_CONTAINED_TAGS = [ |
|
|
|
"area", |
|
"base", |
|
"br", |
|
"col", |
|
"embed", |
|
"hr", |
|
"img", |
|
"input", |
|
"link", |
|
"meta", |
|
"param", |
|
"source", |
|
"track", |
|
"wbr", |
|
] |
|
|
|
|
|
def element_as_text( |
|
mark_id: int, |
|
tag: Optional[str] = None, |
|
text: Optional[str] = None, |
|
**raw_attributes, |
|
) -> str: |
|
"""Return a text representation of all elements on the page.""" |
|
attributes = [] |
|
for k, v in raw_attributes.items(): |
|
if v is None: |
|
continue |
|
if isinstance(v, bool): |
|
if v: |
|
attributes.append(k) |
|
|
|
else: |
|
v = str(v) |
|
if len(v) > 2500: |
|
v = v[: 2500 - 1] + "…" |
|
attributes.append(f'{k}="{v}"') |
|
attributes = " ".join(attributes) |
|
attributes = (" " + attributes).rstrip() |
|
tag = tag.lower() |
|
if text is None: |
|
text = "" |
|
if len(text) > 2500: |
|
text = text[: 2500 - 1] + "…" |
|
|
|
|
|
attributes = re.sub(r"\r\n|\r|\n", "⏎", attributes) |
|
text = re.sub(r"\r\n|\r|\n", "⏎", text) |
|
|
|
if tag in SELF_CONTAINED_TAGS: |
|
if text: |
|
logger.warning( |
|
f"Got self-contained element '{tag}' which contained text '{text}'.", |
|
) |
|
else: |
|
return f"- [{mark_id}] <{tag}{attributes}/>" |
|
return f"- [{mark_id}] <{tag}{attributes}>{text}</{tag}>" |
|
|
|
|
|
class BrowserSession: |
|
def __init__( |
|
self, |
|
viewport_width: int = 1280, |
|
viewport_height: int = 720, |
|
headless: bool = True, |
|
): |
|
self.viewport_width = viewport_width |
|
self.viewport_height = viewport_height |
|
self.headless = headless |
|
self.playwright: Playwright | None = None |
|
self.browser: Browser | None = None |
|
self.context: BrowserContext | None = None |
|
self._exit_stack: AsyncExitStack | None = None |
|
|
|
self.poi_elements: list = Field(default_factory=list) |
|
self.poi_centroids: list[Point] = Field(default_factory=list) |
|
self.bounding_boxes: list[BoundingBox] = Field(default_factory=list) |
|
self.pois: list[POI] = Field(default_factory=list) |
|
|
|
async def __aenter__(self) -> Self: |
|
self._exit_stack = AsyncExitStack() |
|
self.playwright = await async_playwright().start() |
|
|
|
self.browser = await self.playwright.chromium.launch(headless=self.headless) |
|
self.context = await self.browser.new_context( |
|
viewport={"width": self.viewport_width, "height": self.viewport_height}, |
|
) |
|
await self.context.new_page() |
|
self.context.set_default_timeout(60_000) |
|
self.current_page.set_default_timeout(60_000) |
|
await stealth_async(self.current_page) |
|
await self.context.add_init_script( |
|
path=Path(__file__).with_name("add_custom_select.js"), |
|
) |
|
await self.context.add_init_script( |
|
path=Path(__file__).with_name("find_pois.js"), |
|
) |
|
|
|
return self |
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: |
|
if self.browser: |
|
await self.browser.close() |
|
if self.playwright: |
|
await self.playwright.stop() |
|
if self._exit_stack: |
|
await self._exit_stack.aclose() |
|
|
|
@property |
|
def current_page(self) -> Optional[Page]: |
|
if self.context.pages: |
|
return self.context.pages[-1] |
|
return None |
|
|
|
@property |
|
def current_url(self) -> Optional[str]: |
|
if self.current_page: |
|
return self.current_page.url |
|
return None |
|
|
|
|
|
@retry( |
|
wait=wait_exponential(multiplier=1, min=1, max=10), |
|
stop=stop_after_delay(5), |
|
reraise=True, |
|
before_sleep=before_sleep_log(logger, logging.ERROR), |
|
) |
|
async def process_iframe(self, iframe) -> Optional[tuple[dict, dict]]: |
|
try: |
|
|
|
bounding_box = await iframe.bounding_box() |
|
if not bounding_box: |
|
return None |
|
|
|
width, height = bounding_box["width"], bounding_box["height"] |
|
if width < 50 or height < 50: |
|
return None |
|
|
|
frame = await iframe.content_frame() |
|
if not frame: |
|
return None |
|
|
|
poi = await frame.evaluate( |
|
"""() => { |
|
overwriteDefaultSelectConvergence(); |
|
return findPOIsConvergence(); |
|
}""", |
|
) |
|
if not poi: |
|
return None |
|
|
|
iframe_offset = {"x": round(bounding_box["x"]), "y": round(bounding_box["y"])} |
|
return poi, iframe_offset |
|
except Exception as e: |
|
logger.error(f"Error processing iframe: {e}") |
|
return None |
|
|
|
|
|
@retry( |
|
wait=wait_exponential(multiplier=1, min=1, max=10), |
|
stop=stop_after_delay(5), |
|
reraise=True, |
|
before_sleep=before_sleep_log(logger, logging.ERROR), |
|
) |
|
async def update_poi(self) -> None: |
|
try: |
|
await self.current_page.wait_for_load_state(timeout=60000) |
|
except PlaywrightTimeoutError: |
|
logger.error(f"Timeout waiting for website load state: {self.current_url}") |
|
await self.current_page.wait_for_selector("body", timeout=60000, state="visible") |
|
|
|
page_info = await self.current_page.evaluate( |
|
"""() => { |
|
overwriteDefaultSelectConvergence(); |
|
return findPOIsConvergence(); |
|
}""", |
|
) |
|
|
|
self.poi_elements = page_info["element_descriptions"] |
|
element_centroids = page_info["element_centroids"] |
|
try: |
|
|
|
iframes = await self.current_page.query_selector_all("iframe") |
|
|
|
max_iframes = 10 |
|
|
|
|
|
|
|
tasks = [asyncio.create_task(self.process_iframe(iframe)) for iframe in iframes[:max_iframes]] |
|
|
|
results = await asyncio.gather(*tasks) |
|
|
|
filtered_results = [result for result in results if result is not None] |
|
|
|
iframes_pois = [] |
|
iframe_offsets = [] |
|
|
|
for poi, offset in filtered_results: |
|
iframes_pois.append(poi) |
|
iframe_offsets.append(offset) |
|
|
|
|
|
for index, iframe_poi in enumerate(iframes_pois): |
|
self.poi_elements.extend(iframe_poi["element_descriptions"]) |
|
for centroid in iframe_poi["element_centroids"]: |
|
centroid["x"] += iframe_offsets[index]["x"] |
|
centroid["y"] += iframe_offsets[index]["y"] |
|
centroid["left"] += iframe_offsets[index]["x"] |
|
centroid["top"] += iframe_offsets[index]["y"] |
|
centroid["right"] += iframe_offsets[index]["x"] |
|
centroid["bottom"] += iframe_offsets[index]["y"] |
|
element_centroids.extend(iframe_poi["element_centroids"]) |
|
|
|
except Exception as e: |
|
logger.error(f"Error in finding iframes: {e}") |
|
|
|
|
|
self.poi_centroids = [Point(x=xy["x"], y=xy["y"]) for xy in element_centroids] |
|
self.bounding_boxes = [BoundingBox(**xy, label=str(i)) for i, xy in enumerate(element_centroids)] |
|
self.pois = [ |
|
POI(info=info, element_centroid=centroid, bounding_box=bbox) |
|
for info, centroid, bbox in zip( |
|
self.poi_elements, |
|
self.poi_centroids, |
|
self.bounding_boxes, |
|
strict=False, |
|
) |
|
] |
|
|
|
@property |
|
def poi_text(self) -> str: |
|
|
|
texts = [element_as_text(mark_id=i, **element) for i, element in enumerate(self.poi_elements)] |
|
|
|
return "\n".join([txt for txt in texts if txt]) |
|
|
|
async def screenshot( |
|
self, |
|
delay: float = 0.0, |
|
quality: int = 70, |
|
type: str = "jpeg", |
|
scale: str = "css", |
|
) -> tuple[bytes, bytes]: |
|
if delay > 0.0: |
|
await asyncio.sleep(delay) |
|
await self.update_poi() |
|
old_poi_positions = [tuple(point) for point in self.poi_centroids] |
|
img = await self.current_page.screenshot(type=type, quality=quality, scale=scale) |
|
annotated_img = annotate_bounding_boxes(image=img, bounding_boxes=self.bounding_boxes) |
|
|
|
await self.update_poi() |
|
new_poi_positions = [tuple(point) for point in self.poi_centroids] |
|
if new_poi_positions != old_poi_positions: |
|
|
|
img = await self.current_page.screenshot(type=type, quality=quality, scale=scale) |
|
await self.update_poi() |
|
annotated_img = annotate_bounding_boxes(image=img, bounding_boxes=self.bounding_boxes) |
|
return img, annotated_img |
|
|
|
async def goto(self, url: str) -> None: |
|
await self.current_page.goto(url, wait_until="domcontentloaded") |
|
|
|
async def reload(self) -> None: |
|
await self.current_page.reload(wait_until="domcontentloaded") |
|
|
|
async def click_tab(self, mark_id: int) -> None: |
|
point: Point = self.poi_centroids[mark_id] |
|
await self.hover(point) |
|
await self.current_page.mouse.click(*point, button="middle") |
|
|
|
async def click(self, mark_id: int) -> None: |
|
point: Point = self.poi_centroids[mark_id] |
|
await self.hover(point) |
|
await self.current_page.mouse.click(*point) |
|
|
|
async def enter_text(self, mark_id: int, text: str, submit: bool = False) -> None: |
|
await self.clear_text_field(mark_id) |
|
await self.click(mark_id) |
|
await self.current_page.keyboard.type(text) |
|
|
|
if submit: |
|
await self.current_page.keyboard.press("Enter") |
|
|
|
async def scroll( |
|
self, |
|
direction: Literal["up", "down", "left", "right"], |
|
mark_id: Optional[int] = None, |
|
) -> None: |
|
if mark_id is None: |
|
point = Point(x=-1, y=-1) |
|
max_scroll_x = self.viewport_width |
|
max_scroll_y = self.viewport_height |
|
else: |
|
point: Point = self.poi_centroids[mark_id] |
|
bbox: BoundingBox = self.bounding_boxes[mark_id] |
|
max_scroll_x = bbox.right - bbox.left |
|
max_scroll_y = bbox.bottom - bbox.top |
|
|
|
await self.hover(point=point) |
|
scroll_x = int(max_scroll_x * 0.8) |
|
scroll_y = int(max_scroll_y * 0.8) |
|
is_vertical = direction in ("up", "down") |
|
reverse_scroll = direction in ("up", "left") |
|
await self.current_page.mouse.wheel( |
|
scroll_x * (-1 if reverse_scroll else 1) * (not is_vertical), |
|
scroll_y * (-1 if reverse_scroll else 1) * is_vertical, |
|
) |
|
|
|
async def go_back(self) -> None: |
|
|
|
if not self.current_page: |
|
return |
|
|
|
await self.current_page.go_back(wait_until="domcontentloaded") |
|
if self.current_page.url == "about:blank": |
|
if not len(self.context.pages) > 1: |
|
await self.current_page.go_forward(wait_until="domcontentloaded") |
|
raise Exception("There is no previous page to go back to.") |
|
await self.current_page.close() |
|
|
|
async def hover(self, point: Point) -> None: |
|
await self.current_page.mouse.move(*point) |
|
|
|
async def focus(self, point: Point) -> None: |
|
|
|
await self.current_page.evaluate( |
|
""" |
|
([x, y]) => { |
|
const element = document.elementFromPoint(x, y); |
|
if (element && element.focus) { |
|
element.focus(); |
|
} |
|
}""", |
|
tuple(point), |
|
) |
|
|
|
async def get_text(self, mark_id: int) -> str: |
|
return await self.current_page.evaluate( |
|
""" |
|
(mark_id) => { |
|
const element = marked_elements_convergence[mark_id]; |
|
if (element && (element.value !== undefined || element.textContent !== undefined)) { |
|
return element.value || element.textContent; |
|
} |
|
return ''; |
|
} |
|
""", |
|
(mark_id,), |
|
) |
|
|
|
async def clear_text_field(self, mark_id: int) -> None: |
|
existing_text = await self.get_text(mark_id) |
|
if existing_text.strip(): |
|
|
|
await self.click(mark_id) |
|
if platform.system() == "Darwin": |
|
await self.click(mark_id) |
|
await self.current_page.keyboard.press("Meta+a") |
|
await self.current_page.keyboard.press("Backspace") |
|
else: |
|
await self.current_page.keyboard.press("Control+Home") |
|
await self.current_page.keyboard.press("Control+Shift+End") |
|
await self.current_page.keyboard.press("Backspace") |
|
|
|
|
|
if __name__ == "__main__": |
|
import json |
|
|
|
test = """{"name": "return_value", "arguments": {'value': 'The most downloaded French speech recognition model on Hugging Face is DeepSeek-R1. Here are its evaluation metrics:\n\n- Claude-3.5-1022: MMLU 88.3, MMLU-Redux 88.9\n- GPT-4.0-5013: MMLU 87.2, MMLU-Redux 88.0\n- DeepSeek-01-3013: MMLU 88.5, MMLU-Redux 89.1\n- OpenAI-01-mini: MMLU 91.0, MMLU-Redux 88.7\n\nPlease see the attached screenshot for more details.'}}""" |
|
test = json.loads(test) |
|
print(test) |
|
exit() |
|
|
|
async def dummy_test(): |
|
async with BrowserSession(headless=False) as s: |
|
page = await s.context.new_page() |
|
await page.goto("http://google.co.uk") |
|
await asyncio.sleep(5) |
|
await page.screenshot(path="example.png") |
|
await s.update_poi() |
|
_, annotated_image = await s.screenshot() |
|
with open("output.png", "wb") as f: |
|
f.write(annotated_image) |
|
|
|
asyncio.run(dummy_test()) |
|
|