apache架构问题

ttvkxqim  于 2021-06-27  发布在  Hive
关注(0)|答案(1)|浏览(431)

我正在尝试执行etl,它涉及从hdfs加载文件、应用转换并将它们写入hive。在使用sqltransforms通过遵循此文档执行转换时,我遇到了以下问题。你能帮忙吗?

  1. java.lang.IllegalStateException: Cannot call getSchema when there is no schema
  2. at org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328)
  3. at org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable.<init>(BeamPCollectionTable.java:34)
  4. at org.apache.beam.sdk.extensions.sql.SqlTransform.toTableMap(SqlTransform.java:105)
  5. at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:90)
  6. at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:77)
  7. at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
  8. at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
  9. at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:339)
  10. at org.apache.beam.examples.SqlTest.runSqlTest(SqlTest.java:107)
  11. at org.apache.beam.examples.SqlTest.main(SqlTest.java:167)
  12. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  13. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  14. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  15. at java.lang.reflect.Method.invoke(Method.java:498)
  16. at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
  17. at java.lang.Thread.run(Thread.java:748)

代码段:

  1. PCollection<String> data = p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
  2. if(options.getOutput().equals("hive")){
  3. Schema hiveTableSchema = Schema.builder()
  4. .addStringField("eid")
  5. .addStringField("name")
  6. .addStringField("salary")
  7. .addStringField("destination")
  8. .build();
  9. data.apply(ParDo.of(new DoFn<String, Row>() {
  10. @ProcessElement
  11. public void processElement(@Element String input, OutputReceiver<Row> out){
  12. String[] values = input.split(",");
  13. System.out.println(values);
  14. Row row = Row.withSchema(hiveTableSchema)
  15. .addValues(values)
  16. .build();
  17. out.output(row);
  18. }
  19. })).apply(SqlTransform.query("select eid, destination from PCOLLECTION"))
  20. .apply(ParDo.of(new DoFn<Row, HCatRecord>() {
  21. @ProcessElement
  22. public void processElement(@Element Row input, OutputReceiver<HCatRecord> out){
  23. HCatRecord record = new DefaultHCatRecord(input.getFieldCount());
  24. for(int i=0; i < input.getFieldCount(); i++){
  25. record.set(i, input.getString(i));
  26. }
  27. out.output(record);
  28. }
  29. }))
  30. .apply("WriteData", HCatalogIO.write()
  31. .withConfigProperties(configProperties)
  32. .withDatabase("wmrpoc")
  33. .withTable(options.getOutputTableName()));
ftf50wuq

ftf50wuq1#

看起来您需要在 PCollection . 在你链接的演练中 Create...withCoder() 处理好了。在您的情况下,架构不能从您的 ParDo ,beam可能看到的唯一信息是它输出类型的元素 Row 但目前还没有任何信息表明 ParDo 甚至对所有输出都遵循单一模式。所以你需要打电话 pcollection.setRowSchema() 在你申请之前 SqlTransform 告诉beam你打算从转换中得到什么样的模式 ParDo .
更新
看起来你以前做的大部分事情 HCatalog 最终很可能会简化很多,例如,假设您只需要指定以下内容 pipeline.apply(TextIO.readCsvRows(schema)).apply(sqlTransform)... . 事实上,beamsql支持读取csv文件而不需要额外的转换 ParDos (通过 TextTableProvider )但它并没有连接到 SqlTransform 但只能通过beamsqlcli访问

相关问题