java—即使在ApacheFlink中实现了appendtablestream接口,也会出现以下错误

arknldoa  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(339)

我已经编写了一个简单的示例来接收表,但是即使在实现了AppendTableLink接口之后,在ApacheFlink中也出现了这个异常。

  1. package com.cc.flink.functionUtils;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.Collection;
  5. import java.util.Iterator;
  6. import org.apache.flink.api.common.functions.IterationRuntimeContext;
  7. import org.apache.flink.api.common.functions.MapFunction;
  8. import org.apache.flink.api.common.functions.RichFunction;
  9. import org.apache.flink.api.common.io.OutputFormat;
  10. import org.apache.flink.api.common.typeinfo.TypeInformation;
  11. import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
  12. import org.apache.flink.api.java.tuple.Tuple2;
  13. import org.apache.flink.api.java.typeutils.TupleTypeInfo;
  14. import org.apache.flink.configuration.Configuration;
  15. import org.apache.flink.contrib.streaming.DataStreamUtils;
  16. import org.apache.flink.streaming.api.datastream.DataStream;
  17. import org.apache.flink.streaming.api.datastream.DataStreamSink;
  18. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  19. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  20. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  21. import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
  22. import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
  23. import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
  24. import org.apache.flink.table.api.Table;
  25. import org.apache.flink.table.api.TableEnvironment;
  26. import org.apache.flink.table.api.java.StreamTableEnvironment;
  27. import org.apache.flink.table.sinks.AppendStreamTableSink;
  28. import org.apache.flink.table.sinks.RetractStreamTableSink;
  29. import org.apache.flink.table.sinks.TableSink;
  30. import org.apache.flink.types.Row;
  31. public class MyTable implements AppendStreamTableSink<Row>{
  32. @Override
  33. public TableSink<Row> configure(String[] arg0, TypeInformation<?>[] arg1) {
  34. // TODO Auto-generated method stub
  35. return null;
  36. }
  37. @Override
  38. public String[] getFieldNames() {
  39. // TODO Auto-generated method stub
  40. return null;
  41. }
  42. @Override
  43. public TypeInformation<?>[] getFieldTypes() {
  44. // TODO Auto-generated method stub
  45. return null;
  46. }
  47. @Override
  48. public TypeInformation<Row> getOutputType() {
  49. // TODO Auto-generated method stub
  50. return null;
  51. }
  52. @Override
  53. public void emitDataStream(DataStream<Row> arg0) {
  54. // TODO Auto-generated method stub
  55. arg0.print();
  56. }
  57. public static void main(String[] args) throws Exception {
  58. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  59. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  60. .setHost("localhost")
  61. .setVirtualHost("/")
  62. .setUserName("guest")
  63. .setPassword("guest")
  64. .setPort(5672)
  65. .build();
  66. final DataStream<String> stream = env
  67. .addSource(new RMQSource<String>(
  68. connectionConfig, // config for the RabbitMQ connection
  69. "test", // name of the RabbitMQ queue to consume
  70. true, // use correlation ids; can be false if only at-least-once is required
  71. new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
  72. .setParallelism(1);
  73. final ArrayList<String> values = new ArrayList<>();
  74. StreamTableEnvironment StreamTableEnv = TableEnvironment.getTableEnvironment(env);
  75. Table fromDataStream = StreamTableEnv.fromDataStream(stream,
  76. "member_id");
  77. StreamTableEnv.registerTable("emp1",fromDataStream);
  78. Table output =StreamTableEnv.sql("select count(*) from emp1 where member_id Like '%test%'");
  79. fromDataStream.writeToSink(new MyTable() );
  80. env.execute();
  81. }
  82. }

log4j:warn找不到logger的appender(org.apache.calcite.sql.parser)。log4j:warn请正确初始化log4j系统。log4j:请参阅http://logging.apache.org/log4j/1.2/faq.html#noconfig 更多信息
线程“main”org.apache.flink.table.api.tableexception中出现异常:流表只能由AppendStreamTableLink、retractstreamtable发出
在org.apache.flink.table.api.streamtableenvironment.writetosink(streamtableenvironment。scala:219)
在org.apache.flink.table.api.table.writetosink(表。scala:800)
在org.apache.flink.table.api.table.writetosink(表。scala:773)
在com.cc.flink.functionutils.mytable.main(mytable。java:103)

xqkwcwgp

xqkwcwgp1#

您的示例中的问题是您试图使用 AppendTableSink 但你的查询产生了撤回。这是由于 COUNT(*) 在你的陈述中。每当新行到达时,旧的发出计数就不再有效,需要收回。
如果只是一个 SELECT * ,则每个传入行将只生成一个不影响前一行的输出行。

相关问题