如何使用Scrapy抓取页面的内部链接?

nszi6y05  于 12个月前  发布在  其他
关注(0)|答案(1)|浏览(116)

在下面的页面上,我有3个属于受害者,恶意软件和threat_source的链接列表,我想输入他们的链接并在抓取此页面时到达上述项目时抓取其内容。
https://icsstrive.com/incident/lockbit-ransomware-attack-significantly-impacts-owens-group-operations/
我为此编写了以下代码并尝试了不同的方法。问题是,当它抓取受害者并转到其他受害者时,它会在输出文件中再次重复受害者并创建许多重复项,有时会跳过恶意软件或威胁源。

# importing the scrapy module
import random
import scrapy
import logging
from scrapy.utils.log import configure_logging
from pycti import OpenCTIApiClient
import stix2
from pycti import (
    Identity,
    ThreatActor,
    Malware,
    Location,
    StixCoreRelationship,
    Report,
)
import json

class icsstriveSpider(scrapy.Spider):

    stix_objects=[]

    name = "icsstrive"
    start_urls = ['https://icsstrive.com/']
    baseUrl="https://icsstrive.com"
    pages = None

    def parse(self, response, **kwargs):
        links = response.css('div.search-r-title a::attr(href)').getall()
        for i in range(len(links)):
            links[i] = links[i]
        yield from response.follow_all(links, self.parse_icsstrive)
      
        if self.pages is None:
            self.pages=response.xpath('//a[@class="wpv-filter-pagination-link js-wpv-pagination-link page-link"]/@href').getall()
        if len(self.pages) >0:
            url=self.pages[0]
            self.pages.remove(url)
            yield response.follow(self.baseUrl+url, self.parse,dont_filter=True)
       
        
    def parse_icsstrive(self, response, **kwargs):
        title = ""
        published = ""
        type = ""
        summary = ""
        incident_Date = ""
        location = ""
        estimated_Cost = ""
        victims_url = ""
        victim_title = ""
        malwares_urls = ""
        threat_source_urls = ""
        references_name = ""
        references_url = ""
        industries = ""
        impacts = ""
        title=response.xpath('//h1[@class="entry-title"]/text()').get()
        published=response.xpath('//p[@class="et_pb_title_meta_container"]/span/text()').get()
        type=response.xpath('//div[@class="et_pb_text_inner"]/text()').get()
        summary=response.xpath('//div[@class="et_pb_text_inner"]/p/text()').get()
        incident_Date = response.xpath('//h3[text()="Incident Date"]/following-sibling::*//text()').get()
        location = response.xpath('//h3[text()="Location"]/following-sibling::p/a/text()').get()
        estimated_Cost = response.xpath('//h3[text()="Estimated Cost"]/following-sibling::p/text()').get()
        victims_url = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="Victims"]/following-sibling::div/ul/li/a/@href').getall()
        malwares_urls = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="Type of Malware"]/following-sibling::div/ul/li/a/@href').getall()
        threat_source_urls = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="Threat Source"]/following-sibling::div/ul/li/a/@href').getall()
        references_name = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="References"]/following-sibling::div/ul/li/a/text()').getall()
        references_url = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="References"]/following-sibling::div/ul/li/a/@href').getall()
        industries = response.xpath('//h3[text()="Industries"]/following-sibling::p/a/text()').get()
        impacts = response.xpath('//h3[text()="Impacts"]/following-sibling::*//text()').get()

        item = {
            "title": title,
            "published": published,
            "type": type,
            "summary": summary,
            "incident_Date": incident_Date,
            "estimated_Cost": estimated_Cost,
            "references": ",".join(references_name),
            "industries": industries,
        }
        if location is not None:
            item["location"]= location.replace("'", '"')
        if impacts is not None:
            item["impacts"]= impacts.replace("'", '"')
       
        # Extract malware URLs
        if len(victims_url) > 0: 
            for url in victims_url:
                request= scrapy.Request(url + "?dummy=" + str(random.random()),callback=self.parse_victims,dont_filter=True,meta={'item': item, 'malwares_urls': malwares_urls, 'threat_source_urls':threat_source_urls})
                request.meta['dont_cache'] = True
                yield request
        else:
            yield item

字符串
在此处键入

def parse_victims(self, response, **kwargs):
        victim_title = ""
        victim_published = ""
        victim_des = ""
        victim_title=response.xpath('//h1[@class="entry-title"]/text()').get()
        victim_des=response.xpath('//div[@class="et_pb_text_inner"]/p/text()').get()
        victim_published = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="Incidents"]/following-sibling::div/ul/li/strong/text()').getall()
    
        item = response.meta['item']
       
        malwares_urls = response.meta['malwares_urls']
        threat_source_urls = response.meta['threat_source_urls']
        item["victim_title"] = victim_title
        item["victim_des"] = victim_des
        item["victim_url"] = response.url
        if victim_published:
            item["victim_published"] = victim_published[0]
        if item["title"]=="Chinese Identified Hackers Targeting Hawaii Water Utilities and unidentified Oil & Gas Pipeline in US":
             print(item)
       
        if len(malwares_urls) > 0:
            for malware_url in malwares_urls:
                request= scrapy.Request(malware_url+ "?dummy=" + str(random.random()), callback=self.parse_malware,dont_filter=True, meta={'item': item, 'threat_source_urls':threat_source_urls})
                request.meta['dont_cache'] = True
                yield request
        elif len(threat_source_urls) > 0:
            for threat_source_url in threat_source_urls:
                request= scrapy.Request(threat_source_url+ "?dummy=" + str(random.random()), callback=self.parse_threat_source,dont_filter=True,meta={'item': item})
                request.meta['dont_cache'] = True
                yield request
        else:
            yield item   
    def parse_malware(self, response, **kwargs):
        malware_title = ""
        malware_published = ""
        malware_des = ""
        malware_title=response.xpath('//h1[@class="entry-title"]/text()').get()
        malware_des=response.xpath('//div[@class="et_pb_text_inner"]/p/text()').get()
        malware_published = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="Incidents"]/following-sibling::div/ul/li/strong/text()').getall()
        item = response.meta['item']
        threat_source_urls = response.meta['threat_source_urls']
        item["malware_title"] = malware_title
        item["malware_des"] = malware_des
        
        if malware_published:
            item["malware_published"] = malware_published[0]
        if len(threat_source_urls) > 0:
            for threat_source_url in threat_source_urls:
                request= scrapy.Request(threat_source_url+ "?dummy=" + str(random.random()), callback=self.parse_threat_source,dont_filter=True,meta={'item': item})
                request.meta['dont_cache'] = True
                yield request
        else:
            yield item   

    def parse_threat_source(self, response, **kwargs):
        threat_source_title = ""
        threat_source_published = ""
        threat_source_des = ""
        threat_source_title=response.xpath('//h1[@class="entry-title"]/text()').get()
        threat_source_des=response.xpath('//div[@class="et_pb_text_inner"]/p/text()').get()
        threat_source_published = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="Incidents"]/following-sibling::div/ul/li/strong/text()').getall()
        item = response.meta['item']
        item["threat_source_title"] = threat_source_title
        item["threat_source_des"] = threat_source_des
        if item["title"]=="Chinese Identified Hackers Targeting Hawaii Water Utilities and unidentified Oil & Gas Pipeline in US":
             print(item)
        if threat_source_published:
            item["threat_source_published"] = threat_source_published[0]
        yield item

m3eecexj

m3eecexj1#

代码中有多种原因导致项在输出中重复出现。

原因

1.你几乎在所有的请求中都使用了dont_filter=True,这会禁用scrappy内置的重复过滤器,并导致对同一页面进行多次解析。
1.在parse方法的末尾,你收集了所有的分页链接,然后在一个循环中发送对每个链接的请求,并将它们发送给parse方法。所以每次解析任何页面时,它都会再次对其他页面发出相同的请求。
1.你从几乎每一个解析方法中yield一个字典,然后你把同一个字典传递给请求中的下一个解析方法metayield,这将导致重复和不完整的项目,因为你在它们完成之前就放弃了它们。

可能的解决方案

1.从请求中删除dont_filter参数。
1.不是每次运行parse方法时都发送每个页面的请求,而是只提交下一个页面的请求。
1.不要产生不完整的条目,如果你要将一个字典发送到另一个解析方法,那么等到字典完成后再输出。
下面是1和2的例子:

import scrapy

class icsstriveSpider(scrapy.Spider):

    name = "icsstrive"
    start_urls = ['https://icsstrive.com/']
    baseUrl="https://icsstrive.com"
    pages = None

    def parse(self, response):
        for link in response.css('div.search-r-title a::attr(href)').getall():
            yield response.follow(link, self.parse_icsstrive)
        current_page = response.css('li.wpv_page_current')
        if next_page := current_page.xpath("./following-sibling::li/a/@href").get():
            yield scrapy.Request(response.urljoin(next_page))

    def parse_icsstrive(self, response):
        victims_links = response.xpath("//div[h3[text()='Victims']]//li/a/@href").getall()
        victims = response.xpath("//div[h3[text()='Victims']]//li//text()").getall()
        malware_links = response.xpath("//div[h3[text()='Type of Malware']]//li/a/@href").getall()
        malware = response.xpath("//div[h3[text()='Type of Malware']]//li//text()").getall()
        threat_source_links = response.xpath("//div[h3[text()='Threat Source']]//li/a/@href").getall()
        threat_source = response.xpath("//div[h3[text()='Threat Source']]//li/a/text()").getall()
        title = response.xpath('//h1[@class="entry-title"]/text()').get()
        yield {
            "title": title,
            "victims": victims,
            "victims_links": victims_links,
            "malware": malware,
            "malware_links": malware_links,
            "threat_source_links": threat_source_links,
            "threat_source": threat_source
        }

字符串

相关问题