import uuid from crawlee import Glob, Request from fastapi import APIRouter, BackgroundTasks, Body, HTTPException from crawlee.playwright_crawler import PlaywrightCrawler, PlaywrightCrawlingContext from crawlee.router import Router import asyncio import os import re import aiohttp from urllib.parse import urlparse from typing import List router = APIRouter() crawl_router = Router[PlaywrightCrawlingContext]() # Function to sanitize file names async def sanitize_file_name(url: str, extension: str) -> str: parsed_url = urlparse(url) path = parsed_url.path if path == '/' or path == '/index.html': return f'index.{extension}' path = path.rstrip('/') file_name = os.path.basename(path) sanitized_file_name = re.sub(r'[^A-Za-z0-9_.]', '', file_name) if not sanitized_file_name.lower().endswith(f'.{extension}'): sanitized_file_name += f'.{extension}' return sanitized_file_name # Function to download files (can be used for any file type: PDF, PNG, JPG, etc.) async def download_file(url: str, save_path: str): async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: os.makedirs(os.path.dirname(save_path), exist_ok=True) # Ensure the directory exists with open(save_path, 'wb') as f: f.write(await response.read()) else: raise HTTPException(status_code=400, detail=f"Failed to download file from {url}") # Default handler for Crawlee router @crawl_router.default_handler async def default_handler(context: PlaywrightCrawlingContext) -> None: context.log.info(f'Processing {context.request.url}') # Fetch targets from context.user_data targets = context.request.user_data.get("targets", []) # Save HTML if "html" is in targets if "html" in targets: html_content = await context.page.content() html_file_name = await sanitize_file_name(context.request.url, "html") context.log.info(f'HTML Filename: {html_file_name}') html_file_path = os.path.join( 'C:\\Users\\Anonymous\\Documents\\Anonymous\\Repo\\Anonymous-crawler-poc\\saved_html', html_file_name) os.makedirs(os.path.dirname(html_file_path), exist_ok=True) # Ensure the directory exists context.log.info(f'HTML File path: {html_file_path}') with open(html_file_path, 'w', encoding='utf-8') as file: file.write(html_content) # Process file types specified in the targets list for target in targets: if target != "html": # Find links to files with the specified extension file_links = await context.page.locator(f'a[href$=".{target}"]').evaluate_all( "elements => elements.map(element => element.href)") context.log.info(f'{target.upper()} Links: {file_links}') download_tasks = [] for link in file_links: file_name = await sanitize_file_name(link, target) file_path = os.path.join( f'C:\\Users\\Anonymous\\Documents\\Anonymous\\Repo\\Anonymous-crawler-poc\\saved_{target}', file_name) context.log.info(f'Downloading {target.upper()} from {link} to {file_path}') download_tasks.append(download_file(link, file_path)) await asyncio.gather(*download_tasks) # Continue to enqueue other links if necessary await context.enqueue_links(exclude=[Glob('https://www.bir.gov.ph/')]) # Background task to start crawling async def start_crawling(start_url: str, targets: List[str]): if not start_url or not targets: raise HTTPException(status_code=400, detail="URL and targets are required") crawler = PlaywrightCrawler(request_handler=crawl_router) # Generate a unique ID for the request request_id = str(uuid.uuid4()) # Unique ID for the request print(request_id) # Create requests with both 'id' and 'unique_key' requests = [Request( url=start_url, user_data={"targets": targets}, id=request_id, # Explicitly set the 'id' unique_key=request_id # Optionally use 'unique_key' for deduplication )] await crawler.run(requests) # API endpoint to start crawling using a POST request and a JSON body @router.post("/crawl") async def trigger_crawl( background_tasks: BackgroundTasks, payload: dict = Body(..., description="The JSON body with the URL and targets") ): url = payload.get("url") targets = payload.get("targets", []) if not url: raise HTTPException(status_code=400, detail="URL is required") if not isinstance(targets, list) or not all(isinstance(t, str) for t in targets): raise HTTPException(status_code=400, detail="Targets must be a list of strings") background_tasks.add_task(start_crawling, url, targets) return {"message": "Crawling started", "url": url, "targets": targets}