如何在Ruby中匹配大量IP网络对象中的IP地址

ymdaylpp  于 2023-06-29  发布在  Ruby
关注(0)|答案(1)|浏览(143)

对于流畅的环境,我试图用上下文信息丰富我们的日志(它们包含IP地址)。为此,我写了一个基于Ruby的fluentd插件。
我有一个包含子网信息的文件,以及JSON格式的每个子网的 meta信息(例如国家和内部site_id)。现在的想法是编写一个过滤器插件,将src和dst IP地址与该文件中的网络进行匹配,并将适当的元数据添加到记录中。
当前的代码看起来像这样(我已经删除了一些注解和错误处理,以使其尽可能简短):

require "fluent/plugin/filter"
require "ipaddr"

module Fluent
  module Plugin
    class IpaddressFilter < Fluent::Plugin::Filter
      Fluent::Plugin.register_filter("ipaddress", self)

      config_param :ipaddress_file_path, :string
      config_param :source_address_field, :string, :default => "src_ip"
      config_param :destination_address_field, :string, :default => "dest_ip"

      def initialize
        super
        @parsed_subnets = []
      end

      def configure(conf)
        super
        @ipaddress_file = File.read(@ipaddress_file_path)
        @ipaddress_data = JSON.parse(@ipaddress_file)

        # For each entry, create IP address object
        @ipaddress_data.each do |entry|
          # Create IP address object
          new_entry = {
            "network" => IPAddr.new(entry["network"], family = Socket::AF_INET),
            "country" => entry["country"],
            "site_id" => entry["site_id"]
          }
          # Append hash to array
          @parsed_subnets << new_entry
        end
      end

      def filter(tag, time, record)
        src_ip_obj = IPAddr.new(record[@source_address_field], family = Socket::AF_INET)
        dest_ip_obj = IPAddr.new(record[@destination_address_field], family = Socket::AF_INET)

        src_found = false
        dst_found = false

        # Check if IP addresses are in any of the subnets
        @parsed_subnets.each do |entry|
          # SRC IP
          if entry["network"].include?(src_ip_obj)
            record["src_country"] = entry["country"]
            record["src_site_id"] = entry["site_id"]
            src_found = true
          end
          # DEST IP
          if entry["network"].include?(dest_ip_obj)
            record["dest_country"] = entry["country"]
            record["dest_site_id"] = entry["site_id"]
            dst_found = true
          end

          # Stop loop if both are found
          if src_found & dst_found
            break
          end
        end

        # Return record
        record
      end
    end
  end
end

代码本身运行良好。子网列表有超过20.000个条目,我们每秒处理超过2000个日志条目。当前的解决方案以线性方式O(n)随子网条目的数量缩放,这远远不是最优的。子网已经总结(python的netaddr模块)到最大可能的扩展,而不会失去唯一性。
现在的问题是如何提高这个任务的速度?我认为基于树的方法可能会奏效。任何我可以做的前期(而插件加载数据)是一个一次性的成本,这将是绝对首选相比,这样做的每一个消息。

jaql4c8m

jaql4c8m1#

很可能是@parsed_subnets.each块降低了代码的速度,因为它迭代所有子网只找到两个匹配项。
我建议不要迭代所有子网,而是将所有子网写入一个支持IP地址运算符(如PostgreSQL)的数据库,然后只查询您感兴趣的那两个值。
一个可能的解决方案大致如下所示:

require "fluent/plugin/filter"
require "ipaddr"
require "pg"

module Fluent
  module Plugin
    class IpaddressFilter < Fluent::Plugin::Filter
      Fluent::Plugin.register_filter("ipaddress", self)
      
      desc 'Defines source IP field name within the record'
      config_param :source_address_field, :string, :default => "src_ip"

      desc 'Defines destination IP field name within the record'
      config_param :destination_address_field, :string, :default => "dest_ip"

      config_param :psql_host, :string, :default => "localhost"
      config_param :psql_user, :string, :default => "postgres"
      config_param :psql_pass, :string, :default => "postgres"
      config_param :psql_schema, :string, :default => "fluentd"
      config_param :psql_port, :integer, :default => 5432

      def configure(conf)
        super
        @db_conn = PG.connect(
          :host => @psql_host,
          :user => @psql_user,
          :password => @psql_pass,
          :dbname => @psql_schema,
          :port => @psql_port
        )

        @db_conn.prepare('get_network', 'SELECT * FROM ipam_networks WHERE network >> $1')
      end

      def filter(tag, time, record)

        # fetch infos from db - source
        @db_conn.exec_prepared('get_network', [record[@source_address_field]]) do |result|
          result.each do |row|
            record["src_country"] = row["country"]
            record["src_site_id"] = row["site_id"]
          end
        end

        # fetch infos from db - destination
        @db_conn.exec_prepared('get_network', [record[@destination_address_field]]) do |result|
          result.each do |row|
            record["dest_country"] = row["country"]
            record["dest_site_id"] = row["site_id"]
          end
        end

        record
      end
    end
  end
end

相关问题