java—将flink的mapfunction与cassandra“insert”一起使用,但与“statement”一起使用时出现runtimeexception

vmjh9lq9  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(450)

在与cassandra一起使用flink流进行实验时,我遇到了一个有趣的问题:在mapfunction中生成insert语句。如果我使用 DataStream<Insert> 我会弄糊涂的 RuntimeException 扔给我。但是,通过使用 DataStream<Statement> 相反,尽管我仍然使用 Insert 执行的代码中的示例。
我找到了一个解决方案(使用 DataStream<Statement> )但我仍然不清楚是什么导致了这种情况。是故意的还是窃听器?我一直在谷歌上找不到任何解释,所以不妨在这里问问有没有人知道发生了什么事。
预期输出(使用 DataStream<Statement> ):

  1. log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.typeutils.TypeExtractor).
  2. log4j:WARN Please initialize the log4j system properly.
  3. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
  4. Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-638132790]
  5. 01/17/2017 15:57:42 Job execution switched to status RUNNING.
  6. 01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
  7. 01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
  8. 01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to RUNNING
  9. INSERT INTO tablename (name,age) VALUES ('Test Nameson',27);
  10. 01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to FINISHED
  11. 01/17/2017 15:57:42 Job execution switched to status FINISHED.

错误输出(使用 DataStream<Insert> ):

  1. Exception in thread "main" java.lang.RuntimeException: The field private java.util.List com.datastax.driver.core.querybuilder.BuiltStatement.values is already contained in the hierarchy of the class com.datastax.driver.core.querybuilder.BuiltStatement.Please use unique field names through your classes hierarchy
  2. at org.apache.flink.api.java.typeutils.TypeExtractor.getAllDeclaredFields(TypeExtractor.java:1762)
  3. at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1683)
  4. at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1580)
  5. at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1479)
  6. at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:737)
  7. at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:565)
  8. at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:366)
  9. at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:305)
  10. at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:120)
  11. at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:506)
  12. at se.hiq.bjornper.testenv.cassandra.SOCassandraQueryTest.main(SOCassandraQueryTest.java:51)

代码示例(为两种不同的情况切换注解代码):

  1. import java.util.HashMap;
  2. import java.util.Map;
  3. import java.util.Map.Entry;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  8. import com.datastax.driver.core.Statement;
  9. import com.datastax.driver.core.querybuilder.Insert;
  10. import com.datastax.driver.core.querybuilder.QueryBuilder;
  11. public class SOCassandraQueryTest {
  12. public static void main(String[] args) throws Exception {
  13. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. env.setParallelism(1);
  15. DataStream<Map<String, Object>> myDataStream = env.addSource(new RichSourceFunction<Map<String, Object>>() {
  16. @Override
  17. public void run(SourceContext<Map<String, Object>> ctx) throws Exception {
  18. Map<String, Object> map = new HashMap<String, Object>();
  19. map.put("name", "Test Nameson");
  20. map.put("age", 27);
  21. ctx.collect(map);
  22. }
  23. @Override
  24. public void cancel() {
  25. }
  26. });
  27. /* Works just fine */
  28. DataStream<Statement> debugDatastream = myDataStream.map(new MapFunction<Map<String, Object>, Statement>() {
  29. @Override
  30. public Statement map(Map<String, Object> datarow) throws Exception {
  31. Insert insert = QueryBuilder.insertInto("tablename");
  32. for (Entry<String, Object> e : datarow.entrySet()) {
  33. insert.value(e.getKey(), e.getValue());
  34. }
  35. return insert;
  36. }
  37. });
  38. /* Throws RuntimeException if using "Insert" instead of "Statement" */
  39. // DataStream<Insert> debugDatastream = myDataStream.map(new MapFunction<Map<String, Object>, Insert>() {
  40. //
  41. // @Override
  42. // public Insert map(Map<String, Object> datarow) throws Exception {
  43. // Insert insert = QueryBuilder.insertInto("tablename");
  44. //
  45. // for (Entry<String, Object> e : datarow.entrySet()) {
  46. // insert.value(e.getKey(), e.getValue());
  47. // }
  48. // return insert;
  49. // }
  50. // });
  51. debugDatastream.print();
  52. env.execute("CassandraQueryTest");
  53. }
  54. }

环境:
java 8
flink 1.1.3(来自maven包的cassabdra驱动程序)
现在有了所需的工具

byqmnocz

byqmnocz1#

flink正在分析您通过网络发送的类型,以生成快速序列化程序,并允许在构建窗口或在网络上洗牌数据时访问您的密钥。
这里的问题可能是:-当使用 Insert 作为用户类型,flink尝试生成 PojoSerializer 但是失败了 RuntimeException . 我认为这种行为是不正确的。我已经为这个问题在flink项目中提交了一份错误报告。-对于 Statement ,flink发现它不能将类型序列化为pojo,因此它将返回到泛型序列化程序(在flink的例子kryo中)。
我认为这个文档页面是我们所拥有的最接近描述flink序列化堆栈工作原理的页面:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html

相关问题