kinesis spark qubole无法获得更新的记录

e5nqia27  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(257)

我正在尝试使用qubole kinesis spark library从我的流中获取记录:

val kinesis = sparkContextService.SQLC.sparkSession.readStream
  .format("kinesis")
  .option("streamName", "streamName")
  .option("region", "region")
  .option("endpointUrl", "endpointUrl")
  .option("startingposition", "TRIM_HORIZON")
  .option("awsAccessKeyId", "awsAccessKeyId")
  .option("awsSecretKey", "awsSecretKey")
  .option("avoidEmptyBatches", "true")
  .load()
and then:
   kinesis
  .selectExpr("CAST(data AS STRING)").as[(String)]
  .groupBy("data").count()
  .writeStream
  .format("console")
  .outputMode("complete")
  .start()
  .awaitTermination()

为了让舒尔相信kinesis内的一切正常,我用其他非spark项目测试了数据的输入和获取。我使用putrecordrequest将数据与另一个非spark项目一起放入kinesis流。在将数据放入流之后,在这个非spark项目中,我将测试数据是否正确放入。我尝试查看数据是否与getrecordsrequest一起出现。

while (thereAreShards) {
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
        getRecordsRequest.setShardIterator(shardIterator);
        try {
            GetRecordsResult result = kinesisClient.getRecords(getRecordsRequest);
            records = result.getRecords();
            if(records.size()!=0){
                System.out.println(count);
                count = 0;
            }
            else
            {
                count++;
            }
            shardIterator = result.getNextShardIterator();
        } catch (Throwable t){
            System.out.println(t);
        }
        if (shardIterator.equals(null)) {
            thereAreShards = false;
        }

我注意到,如果获得结果所需的计数是100、112、130(例如,应用程序需要100 getNextSharedIterator()来获取数据),我就无法在spark应用程序中获取数据。但是,如果我的非spark项目返回的计数为5或10,那么我得到的数据没有问题:
批次:2
+----+-----+
|数据|计数|
+----+-----+
| | 400|
|环境足迹900|
|吉吉500|
|你好| 1000|
+-----+-----+
因此,如果数据被放在碎片中很远的地方,我就无法访问它,即使我让spark查询运行了24小时,6300个批通过了,我也没有得到任何数据。但是如果数据在第5个shard迭代器上,我已经在第2批中得到了数据。我放的较新的数据放在碎片中更远的地方,计数大于100,而较旧数据的计数越来越小,它也开始在100左右,但在20小时后,它的计数是例如5。任何人都知道什么可能是问题,为什么我不能得到更新的数据,在进一步的碎片,需要更多的getnextsharditerator()?我认为这需要更多的时间,但20多个小时就足够了。提前谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题