drools规则在分布式系统(spark cluster)上运行时不会触发

ergxz8rk  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(253)

我不太会流口水。我试着用口水和Spark。当我在当地跑步的时候,规则被炒了。但对于胖jar,我正试图在星火团上运行相同的程序,但它一点也不启动。有人能帮我找出什么原因,为什么它不能在集群模式下发射。请找到我尝试过的代码fireallrules和rule。

//Rule - 1
rule "Rule1"
when
  $prep : DroolsInput (testgroupname == "Test", ACCOUNT_NUMBER.startsWith("5"));
then
  DroolsFunctions.insertResponse(kcontext, $prep, "FAIL");
end

========

def loadDrl(drlFilePath: String): InternalKnowledgeBase = {
val resource = ResourceFactory.newClassPathResource(drlFilePath)
val kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder()
kbuilder.add(resource, ResourceType.DRL)

if (kbuilder.hasErrors()) {
  throw new RuntimeException(kbuilder.getErrors().toString())
}
val kbase = KnowledgeBaseFactory.newKnowledgeBase()
kbase.addPackages(kbuilder.getKnowledgePackages())
kbase
}

def fireAllRules(kbase: InternalKnowledgeBase, prepInput: Row): CommonResponse = {
println(prepInput.toSeq) //these statement are not getting printed when I run in cluster
val prep = DroolsInput().createFromSeq(prepInput.toSeq)
val session = kbase.newKieSession()
println("ACCOUNT_NUMBER-> " + prep.ACCOUNT_NUMBER) //these statement are not getting printed when I run in cluster
session.insert(prep)
session.fireAllRules()

val Response = getResults(session, "CommonResponse") match {
  case Some(x) => x.asInstanceOf[CommonResponse]
  case None    => null
}
session.dispose()
Response 
}

main中的代码用法:

val df = spark.read.option("header","true").csv("path")//
val kbase = loadDrl("drools/test.drl")
def fireRules = udf(fireAllRules(kbase, _: Row))
val outDF = df.withColumn("Response", fireRules((struct(df.columns.toSeq.map(col): _*))))
outDF.show()

请告诉我问题出在哪里,以及为什么它在集群中触发规则。注意:。drl文件已成功加载。但规则并没有启动。谢谢你的帮助。!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题