如何使用groovy或python来解码NiFi中的动态json数组值?

dauxcl2d  于 2022-09-21  发布在  Python
关注(0)|答案(2)|浏览(224)

我想解码NiFi json内容中动态大小数组的Base64编码字符串值。

我尝试使用EvaluateJsonPathUpdateAttributes,但数组大小是动态的,我无法像for loop一样获取所有元素的索引变量。

然后我也尝试使用更新记录。这对我来说是非常完美的。但在我的NIFI版本中,并没有包含NIFI录制路径的Base64Decode功能......

所以我想尝试使用脚本处理器。

如何使用流数据编写groovy/python脚本。

下面是输入和输出JSON示例。

输入JSON:

  1. {
  2. "A": [
  3. {
  4. "CC" : "Encoded string",
  5. "DD" : "any string"
  6. },
  7. {
  8. "CC" : "Encoded string",
  9. "DD" : "any string"
  10. }
  11. ]
  12. "B": "any string"
  13. }

输出JSON:

  1. {
  2. "A": [
  3. {
  4. "CC" : "Decoded string",
  5. "DD" : "any string"
  6. },
  7. {
  8. "CC" : "Decoded string",
  9. "DD" : "any string"
  10. }
  11. ]
  12. "B": "any string"
  13. }
wh6knrhe

wh6knrhe1#

在NiFi中,您可以通过以下代码使用ExecuteGroovyScrip

  1. def flowFile = session.get()
  2. if (flowFile != null) {
  3. def inputFlow = flowFile.read().getText("UTF-8").readLines()
  4. def search = inputFlow.findAll { it.contains('"CC" :') }
  5. def searchStr = search.toString()
  6. flowFile = session.putAttribute(flowFile, 'search', searchStr)
  7. session.transfer(flowFile, REL_SUCCESS)
  8. }

现在你有了一个属性搜索,行为CC,或者你也可以在这个代码中添加Base64解码

享受

iyfamqjs

iyfamqjs2#

我参考他们的答案来解决我的问题,如下所示。

我必须解析文本,因为解码后的字符串也是Json字符串。

如果我不解析已解码的字符串,结果将包含“\”字符,因为它只是字符串,而不是Json字符串

谢谢大家!

  1. import groovy.json.JsonBuilder
  2. import groovy.json.JsonSlurper
  3. import groovy.json.JsonOutput;
  4. import org.apache.nifi.processor.io.StreamCallback
  5. import java.nio.charset.StandardCharsets
  6. def flowFile = session.get();
  7. if (flowFile == null) {
  8. return;
  9. }
  10. flowFile = session.write(flowFile, { inputStream, outputStream ->
  11. def content = inputStream.getText("UTF-8")
  12. def data = new JsonSlurper().parseText(content)
  13. data.A.each{obj ->
  14. def decodedString = new String(obj["CC"].decodeBase64())
  15. def decodedJsonObject = new JsonSlurper().parseText(decodedString)
  16. obj["CC"] = decodedJsonObject
  17. }
  18. outputStream.write(JsonOutput.toJson(data).getBytes(StandardCharsets.UTF_8))
  19. } as StreamCallback)
  20. session.transfer(flowFile, REL_SUCCESS)
展开查看全部

相关问题