ksql:udf不接受参数(string,string)

pu82cl6c  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(582)

我在尝试使用自定义项设置etl管道时遇到了ksql的一个问题。在etl过程中的某个时刻,我需要从数据中的描述字段(varchar)中隔离特定的信息。上下文的虚构示例:
description=“物种=狗。性别=男性。颜色=金发。年龄=10。”(实际数据的格式相同)
我已经编写了一个简单的自定义项来隔离任何需要的信息。看起来是这样的:

  1. package com.my.package;
  2. /**IMPORTS**/
  3. import io.confluent.ksql.function.udf.Udf;
  4. import io.confluent.ksql.function.udf.UdfDescription;
  5. /**ClASS DEFINITION**/
  6. @UdfDescription(name = "extract_from_description",
  7. author = "Me",
  8. version = "0.0.1",
  9. description = "Given a description and a request for information, isolates and returns the requested information. Pass requested tag as 'tag='".)
  10. public class Extract_From_Description {
  11. @Udf(description = "Given a description and a request for information, isolates and returns the requested information. Pass requested tag as 'tag='.)
  12. public String extract_from_description(final String description, final String request) {
  13. return description.split(request)[1].split("\\.")[0];
  14. }
  15. }

我可以很好的上传和注册这个函数,当我运行的时候它被正确的列出和描述了:

  1. ksql> list functions;
  2. ksql> describe function EXTRACT_FROM_DESCRIPTION;

我这样调用函数来创建一个新流:

  1. CREATE STREAM result AS
  2. SELECT recordId,
  3. OtherVariables,
  4. EXTRACT_FROM_DESCRIPTION(description, 'species=') AS species
  5. FROM parent_stream
  6. EMIT CHANGES;

有个错误我搞不懂:
函数“extract \u from \u description”不接受参数(string,string)。有效的替代方案包括:
显然ksql不能正确解释函数的输入应该是什么(看起来它不需要输入?),我也不知道为什么。我已经通读了文档,看看我是否以一种奇怪的方式定义了我的函数,但是没有发现示例和我的函数之间的任何差异。我注意到应该有几种方法来定义函数的输入,并尝试了所有方法,但结果总是一样的。
我使用maven为这个函数创建jar文件(jdk1.8.0\u201)。有人能帮我弄清楚发生了什么事吗?
热释光;dr:my ksql udf不接受类型(string,string)的输入,即使函数指定输入应为类型(string,string)

2ul0zpep

2ul0zpep1#

找到了问题,在这里回答任何可能遇到同样问题的人。您需要使用@udfparameter指定参数,如下所示:

  1. import io.confluent.ksql.function.udf.UdfParameter; // add this to the list of imports
  2. // add @UdfParameter(name) to each input variable
  3. public String extract_from_description(@UdfParameter(value = "description") final String description, @UdfParameter(value = "request") final String request){
  4. function body
  5. }

相关问题