如何在flink中使用hdfs接收器?

fykwrbwg  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(352)

我在apache flink中使用twitter连接器,现在想在本地hdfs示例中保存一些流数据。在flink文档中有一个小的BuckerSink示例,但我的程序总是退出,并出现以下错误:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ExceptionInInitializerError
at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:92)
at org.apache.hadoop.security.Groups.<init>(Groups.java:76)
at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:239)
at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:2473)
at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:2465)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2331)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:369)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:418)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:352)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:177)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:159)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:105)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:678)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:666)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.lang.StringIndexOutOfBoundsException: begin 0, end 3, length 1
at java.base/java.lang.String.checkBoundsBeginEnd(String.java:3116)
at java.base/java.lang.String.substring(String.java:1885)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:49)
... 25 more

你知道我的代码出了什么问题吗?我使用inital twitter连接器示例进行测试,我的环境是用hdfs的docker容器构建的。端口正确地从dockerMap到我的本地机器,我还可以在webui上检查hdfs的状态。
以下是我的代码方法:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

    Properties props = new Properties();
    props.setProperty(TwitterSource.CONSUMER_KEY, "KEY");
    props.setProperty(TwitterSource.CONSUMER_SECRET, "SECRET");
    props.setProperty(TwitterSource.TOKEN, "TOKEN");
    props.setProperty(TwitterSource.TOKEN_SECRET, "TOKENSECRET");
    DataStream<String> streamSource = env.addSource(new TwitterSource(props));

    DataStream<Tuple2<String, Integer>> tweets = streamSource
            // selecting English tweets and splitting to (word, 1)
            .flatMap(new SelectGermanAndTokenizeFlatMap())
            // group by words and sum their occurrences
            .keyBy(0)
            .timeWindow(Time.minutes(1), Time.seconds(30))
            .sum(1);

    BucketingSink<Tuple2<String, Integer>> sink = new BucketingSink<>("hdfs://localhost:8020/flink/twitter-test");
    sink.setBucketer(new DateTimeBucketer<Tuple2<String, Integer>>("yyyy-MM-dd--HHmm"));
    sink.setBatchSize(1024 * 1024 * 400);

    tweets.addSink(sink);
    //tweets.print();

    env.execute("Twitter Streaming Example");

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题