我正在尝试使用qubole kinesis spark library从我的流中获取记录:
val kinesis = sparkContextService.SQLC.sparkSession.readStream
.format("kinesis")
.option("streamName", "streamName")
.option("region", "region")
.option("endpointUrl", "endpointUrl")
.option("startingposition", "TRIM_HORIZON")
.option("awsAccessKeyId", "awsAccessKeyId")
.option("awsSecretKey", "awsSecretKey")
.option("avoidEmptyBatches", "true")
.load()
and then:
kinesis
.selectExpr("CAST(data AS STRING)").as[(String)]
.groupBy("data").count()
.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()
为了让舒尔相信kinesis内的一切正常,我用其他非spark项目测试了数据的输入和获取。我使用putrecordrequest将数据与另一个非spark项目一起放入kinesis流。在将数据放入流之后,在这个非spark项目中,我将测试数据是否正确放入。我尝试查看数据是否与getrecordsrequest一起出现。
while (thereAreShards) {
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
try {
GetRecordsResult result = kinesisClient.getRecords(getRecordsRequest);
records = result.getRecords();
if(records.size()!=0){
System.out.println(count);
count = 0;
}
else
{
count++;
}
shardIterator = result.getNextShardIterator();
} catch (Throwable t){
System.out.println(t);
}
if (shardIterator.equals(null)) {
thereAreShards = false;
}
我注意到,如果获得结果所需的计数是100、112、130(例如,应用程序需要100 getNextSharedIterator()来获取数据),我就无法在spark应用程序中获取数据。但是,如果我的非spark项目返回的计数为5或10,那么我得到的数据没有问题:
批次:2
+----+-----+
|数据|计数|
+----+-----+
| | 400|
|环境足迹900|
|吉吉500|
|你好| 1000|
+-----+-----+
因此,如果数据被放在碎片中很远的地方,我就无法访问它,即使我让spark查询运行了24小时,6300个批通过了,我也没有得到任何数据。但是如果数据在第5个shard迭代器上,我已经在第2批中得到了数据。我放的较新的数据放在碎片中更远的地方,计数大于100,而较旧数据的计数越来越小,它也开始在100左右,但在20小时后,它的计数是例如5。任何人都知道什么可能是问题,为什么我不能得到更新的数据,在进一步的碎片,需要更多的getnextsharditerator()?我认为这需要更多的时间,但20多个小时就足够了。提前谢谢
暂无答案!
目前还没有任何答案,快来回答吧!