spark作为查找实现

q5lcpyga  于 2021-05-18  发布在  Spark
关注(0)|答案(0)|浏览(278)

我想用本机spark函数替换as lookup的udf实现。
因为查找表有大约500000条记录,如下所示:

+-----------+------------------------+------------------------------+
|    network|autonomous_system_number|autonomous_system_organization|
+-----------+------------------------+------------------------------+
| 1.0.0.0/24|                   13335|                 CLOUDFLARENET|
| 1.0.4.0/22|                   38803|          Wirefreebroadband...|
|1.0.16.0/24|                    2519|          ARTERIA Networks ...|
|1.0.64.0/18|                   18144|          Energia Communica...|
+-----------+------------------------+------------------------------+

传入数据包含有关源、目标ip地址的信息:

+-----------------+----------------------+
|sourceIPv4Address|destinationIPv4Address|
+-----------------+----------------------+
|     5.34.180.207|        213.226.224.51|
|    13.230.21.159|            81.95.96.2|
|      93.188.0.21|          34.223.46.15|
|   165.22.170.210|        89.187.156.222|
+-----------------+----------------------+

因为查找表只有关于ip子网的信息,所以我可能需要使用最长前缀匹配算法来连接这两个dfs,但是我不知道如何在不使用另一个udf的情况下将其实现到spark。有人能给我指出正确的方向吗?
此时,我使用maxmind作为查找库:

val asnLookup = udf(Udf.getASN)
val dfAS = df
    .withColumn("sourceAS", asnLookup(col(IpfixFields.sourceIPv4Address)))
    .withColumn("destinationAS", asnLookup(col(IpfixFields.destinationIPv4Address)))
def getASN: String => String = (ipAddress: String) => {
    val ia = InetAddress.getByName(ipAddress)
    val result = GeoIPWrapper.reader.tryAsn(ia)

    if (!result.isPresent) {
      "na"
    } else {
      result.get().getAutonomousSystemNumber.toString
    }
  }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题