import { HttpException, HttpStatus, Injectable, OnModuleInit, } from '@nestjs/common'; import { AmqpConnection, RabbitRPC, RabbitSubscribe, } from '@golevelup/nestjs-rabbitmq'; import { ConsumeMessage } from 'amqplib'; import * as amqp from 'amqp-connection-manager'; import { PuppeteerCrawler, RequestQueue, Request, purgeDefaultStorages, Dataset, createPuppeteerRouter, } from 'crawlee'; import puppeteer from 'puppeteer'; @Injectable() export class WorkerService implements OnModuleInit { private queueInstanceName = `${Math.random()}-${Date.now()}|${Date.now()}`; private requestQueue = null; private crawlerInstance = null; constructor(private readonly amqpConnection: AmqpConnection) { // this.crawlerInit(); } async onModuleInit(): Promise { console.log(`The module has been initialized 1.`); await this.crawlerInit(); } async beforeApplicationShutdown(): Promise { this.crawlerInstance.close(); this.crawlerInstance.teardown(); } async crawlerInit(url?: string): Promise { // const startUrls = [url]; // const requestQueue = await RequestQueue.open(); // await requestQueue.addRequest({ // url: url, // uniqueKey: `${Math.random() + Date.now()}` // }); // await this.crawlerInit(); this.requestQueue = await RequestQueue.open(this.queueInstanceName); this.crawlerInstance = new PuppeteerCrawler({ // proxyConfiguration: new ProxyConfiguration({ proxyUrls: ['...'] }), requestHandlerTimeoutSecs: 50, maxSessionRotations: 1, retryOnBlocked: true, maxRequestRetries: 3, keepAlive: true, // sameDomainDelaySecs: 5, sessionPoolOptions: { // intentionally empty since all http response would go to requestHandler blockedStatusCodes: [], }, browserPoolOptions: { useFingerprints: false, maxOpenPagesPerBrowser: 1, closeInactiveBrowserAfterSecs: 50, retireBrowserAfterPageCount: 50, }, preNavigationHooks: [ async (crawlingContext) => { const { page } = crawlingContext; }, ], launchContext: { // useChrome: true, // useIncognitoPages: false, launchOptions: { executablePath: process.env.PUPPETEER_EXECUTABLE_PATH || undefined, headless: true, args: [ '--no-sandbox', '--disable-infobars', '--window-position=0,0', '--ignore-certifcate-errors', '--ignore-certifcate-errors-spki-list', '--disable-dev-shm-usage', '--disable-translate', '--autoplay-policy=no-user-gesture-required', '--use-fake-device-for-media-stream', '--disable-blink-features', '--disable-blink-features=AutomationControlled', ], ignoreDefaultArgs: ['--disable-extensions', '--enable-automation'], ignoreHTTPSErrors: true, }, }, minConcurrency: 1, maxConcurrency: 1, // maxRequestRetries: 1, // requestHandlerTimeoutSecs: 10, requestQueue: this.requestQueue, autoscaledPoolOptions: { isFinishedFunction: async () => { return false; }, }, requestHandler: async ({ enqueueLinks, page, request, log, pushData, }) => { const title = await page.title(); log.info(`${title}`, { url: request.loadedUrl }); page.on('response', (response) => { if (response.request().method() !== 'OPTIONS') { if (response.url() === 'link') { response.text().then(async (textBody) => { console.log(textBody); await this.amqpConnection.publish( 'CRAWLER_REQUEST', 'RECEIVED_REQUEST', JSON.stringify({ url: 1, data: textBody, }), { persistent: true }, ); }); } } }); await page.waitForNavigation({ waitUntil: 'networkidle0', }); // // await page // .waitForSelector('.ktnc-list') // .then(() => console.log('First URL with image:')); // await page.waitForTimeout(30000); await pushData({ url: request.loadedUrl, title, }); }, // Comment this option to scrape the full website. maxRequestsPerCrawl: 100, async failedRequestHandler({ request, log }) { const lastError = request.errorMessages[request.errorMessages.length - 1]; console.log('lastError', lastError, log); }, }); await this.queue(this.requestQueue); await this.crawlerInstance.run(); } public async queue(requestQueue: any) { const conn = amqp.connect(`amqp://guest:guest@rabbitmq:5672`); conn.on('connect', () => console.log('Connected!')); const onMessage = async function (data) { const message = JSON.parse(data.content.toString()); console.log('receiver: got message', message); await requestQueue.addRequest( new Request({ url: 'https://test.net/', uniqueKey: `${Math.random() + Date.now()}`, }), ); channelWrapper.ack(data); }; const channelWrapper = conn.createChannel({ setup: function (channel) { // `channel` here is a regular amqplib `ConfirmChannel`. return Promise.all([ channel.assertQueue('CRAWLER_QUEUE', { durable: true }), channel.prefetch(1), channel.consume('CRAWLER_QUEUE', onMessage), ]); }, }); channelWrapper.waitForConnect().then(function () { console.log('Listening for messages'); }); } }