我需要使用apachenifi将数据从db2加载到cassandra。我的db2表有大约40k条记录,完成到cassandra的数据转储大约需要15分钟。我为这个用例附上了两张当前nifi流的图片。观察到每秒仅读取100多条记录。谁能让我知道-如何调整流/处理器,以便我们可以提高速度(减少时间)的数据转储。
db2到cassandra nifi流-在执行脚本开始之前
执行脚本启动后
我附加了执行脚本,我们正在为cassandra转储准备insert语句。
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
import csv
import io
import datetime
class TransformCallback(StreamCallback):
def _init_(self):
pass
def process(self,inputStream,outputStream):
inputdata = IOUtils.toString(inputStream,StandardCharsets.UTF_8)
text = csv.reader(io.StringIO(inputdata))
l = []
for row in text:
mon = row[0].strip()
modified_date = str(datetime.datetime.strptime(str(mon), "%d%b%Y").strftime("%Y-%m-%d"))
row[0] = modified_date
row[1] = row[1].strip()
row[2] = row[2].strip()
l.append(row)
values_str = json.dumps(l)
leng = len(l)
for i in range(leng):
obj = json.loads(values_str)[i] ## obj = dict
newObj = {
"date": obj[0],
"max": obj[1],
"city": obj[2]
}
insert_query = ("INSERT INTO model.test_data JSON '"+json.dumps(newObj , indent=4)+"';").encode('utf-8')
outputStream.write(bytearray(insert_query))
flowFile = session.get()
if flowFile != None:
flowFile = session.write(flowFile,TransformCallback())
flowFile = session.putAttribute(flowFile, "filename",flowFile.getAttribute('filename').split('.')[0]+'_result.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit()
1条答案
按热度按时间iyr7buue1#
我不得不说,所需的转换可能与两个标准处理器有关:
ConvertRecord
将csv记录转换为jsonReplaceText
添加插入到。。。如果您还想使用脚本,我可以帮助您使用groovy。以下脚本用于
ExecuteGroovyScript
处理器。每次调用处理一个流文件
它转换流文件中的所有行,因此,不需要在此处理器之前按行拆分文件。
每次调用处理多个流文件
与前面的类似,但具有流文件列表处理
fflist.each{ff-> ...}
```import groovy.json.JsonOutput
def fflist=session.get(1000)
if(!fflist)return
fflist.each{ff->
ff.write{rawIn, rawOut->
rawOut.withWriter("UTF-8"){w->
rawIn.withReader("UTF-8"){r->
//iterate lines from input reader and split each with coma
r.splitEachLine( ',' ){row->
//build object (map)
def obj = [
"date": row[0],
"max" : row[1],
"city": row[2]
]
//convert obj to json string
def json = JsonOutput.toJson(obj)
//write data to output
w << "INSERT INTO model.test_data JSON '" << json << "';" << '\n'
}
}
}
}
}