我不太会流口水。我试着用口水和Spark。当我在当地跑步的时候,规则被炒了。但对于胖jar,我正试图在星火团上运行相同的程序,但它一点也不启动。有人能帮我找出什么原因,为什么它不能在集群模式下发射。请找到我尝试过的代码fireallrules和rule。
//Rule - 1
rule "Rule1"
when
$prep : DroolsInput (testgroupname == "Test", ACCOUNT_NUMBER.startsWith("5"));
then
DroolsFunctions.insertResponse(kcontext, $prep, "FAIL");
end
========
def loadDrl(drlFilePath: String): InternalKnowledgeBase = {
val resource = ResourceFactory.newClassPathResource(drlFilePath)
val kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder()
kbuilder.add(resource, ResourceType.DRL)
if (kbuilder.hasErrors()) {
throw new RuntimeException(kbuilder.getErrors().toString())
}
val kbase = KnowledgeBaseFactory.newKnowledgeBase()
kbase.addPackages(kbuilder.getKnowledgePackages())
kbase
}
def fireAllRules(kbase: InternalKnowledgeBase, prepInput: Row): CommonResponse = {
println(prepInput.toSeq) //these statement are not getting printed when I run in cluster
val prep = DroolsInput().createFromSeq(prepInput.toSeq)
val session = kbase.newKieSession()
println("ACCOUNT_NUMBER-> " + prep.ACCOUNT_NUMBER) //these statement are not getting printed when I run in cluster
session.insert(prep)
session.fireAllRules()
val Response = getResults(session, "CommonResponse") match {
case Some(x) => x.asInstanceOf[CommonResponse]
case None => null
}
session.dispose()
Response
}
main中的代码用法:
val df = spark.read.option("header","true").csv("path")//
val kbase = loadDrl("drools/test.drl")
def fireRules = udf(fireAllRules(kbase, _: Row))
val outDF = df.withColumn("Response", fireRules((struct(df.columns.toSeq.map(col): _*))))
outDF.show()
请告诉我问题出在哪里,以及为什么它在集群中触发规则。注意:。drl文件已成功加载。但规则并没有启动。谢谢你的帮助。!
暂无答案!
目前还没有任何答案,快来回答吧!