无法从aws kinesis接收数据

wmtdaxz3  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(386)

我使用以下代码构建flink kinesis连接器:

  1. public class DemoKinesisCA {
  2. public static void main(String[] args) throws Exception {
  3. ParameterTool pt = ParameterTool.fromArgs(args);
  4. StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  5. see.setParallelism(1);
  6. Properties consumerConfig = new Properties();
  7. consumerConfig.put(AWSConfigConstants.AWS_REGION, "cn-north-1");
  8. consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "${my_key}");
  9. consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "${my_secret_key}");
  10. consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
  11. DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
  12. "${my_stream}",
  13. new SimpleStringSchema(),
  14. consumerConfig));
  15. kinesis.print();
  16. see.execute();
  17. }
  18. }

代码运行正常,日志中未发现异常。我已将初始位置设置为 trim_horizon 这意味着使用最早的数据,但我可以从kinesis接收0字节。我被告知这不是一条空流,所以有什么不对。。。
感谢您的帮助。
日志如下所示:

  1. 2019-12-20 09:21:22,814 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request 00b96cfdd8c98b09635926e3643210f3 for job c5449594afa59fde3826b82d6034a71b from resource manager with leader id 00000000000000000000000000000000.
  2. 2019-12-20 09:21:22,815 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for 00b96cfdd8c98b09635926e3643210f3.
  3. 2019-12-20 09:21:22,815 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Add job c5449594afa59fde3826b82d6034a71b for job leader monitoring.
  4. 2019-12-20 09:21:22,815 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Try to register at job manager akka.tcp://flink@localhost:6123/user/jobmanager_6 with leader id 00000000-0000-0000-0000-000000000000.
  5. 2019-12-20 09:21:22,819 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Resolved JobManager address, beginning registration
  6. 2019-12-20 09:21:22,819 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Registration at JobManager attempt 1 (timeout=100ms)
  7. 2019-12-20 09:21:22,826 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Successful registration at job manager akka.tcp://flink@localhost:6123/user/jobmanager_6 for job c5449594afa59fde3826b82d6034a71b.
  8. 2019-12-20 09:21:22,826 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Establish JobManager connection for job c5449594afa59fde3826b82d6034a71b.
  9. 2019-12-20 09:21:22,826 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Offer reserved slots to the leader of job c5449594afa59fde3826b82d6034a71b.
  10. 2019-12-20 09:21:22,830 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable - Activate slot 00b96cfdd8c98b09635926e3643210f3.
  11. 2019-12-20 09:21:22,837 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Source: Custom Source -> Sink: Print to Std. Out (1/1).
  12. 2019-12-20 09:21:22,837 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) switched from CREATED to DEPLOYING.
  13. 2019-12-20 09:21:22,837 INFO org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream leak safety net for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) [DEPLOYING]
  14. 2019-12-20 09:21:22,837 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) [DEPLOYING].
  15. 2019-12-20 09:21:22,838 INFO org.apache.flink.runtime.blob.BlobClient - Downloading c5449594afa59fde3826b82d6034a71b/p-c6e596cf391265b05e5e166e9448a1b36bc8fb74-756c152250698cb8c1c63a809f05d4b3 from localhost/127.0.0.1:44568
  16. 2019-12-20 09:21:23,011 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) [DEPLOYING].
  17. 2019-12-20 09:21:23,012 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) switched from DEPLOYING to RUNNING.
  18. 2019-12-20 09:21:23,013 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
  19. 2019-12-20 09:21:23,037 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber does not contain a setter for field sequenceNumber
  20. 2019-12-20 09:21:23,037 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
  21. 2019-12-20 09:21:23,038 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - No restore state for FlinkKinesisConsumer.
  22. 2019-12-20 09:21:23,779 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='my_stream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49599347585804022687811337727581452022704930306761162754,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
  23. 2019-12-20 09:21:23,780 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='my_stream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49599347585804022687811337727581452022704930306761162754,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
ylamdve6

ylamdve61#

欢迎来到java世界 "${my_stream}" 未评估,可能指向不存在的主题。
如果 my_stream 是参数,则需要使用 pt.get("my_stream") .

相关问题