java BigQueryIO写入中的空指针

ejk8hzay  于 2023-01-11  发布在  Java
关注(0)|答案(1)|浏览(84)

我在数据流管道中遇到了一个空指针异常,但是异常位置的所有值都定义正确。在这段代码中,我从数据库读取数据,对结果集进行一些转换,然后尝试基于该结果集中的表行在现有数据集中创建一个表。我已经确认了传递给BigQueryIO的值。writeTableRows()调用都是有效的,但是在我试图执行对Big Query的写操作的那一行中,我仍然得到了一个异常,我在下面的代码中用星号标出了空指针异常的位置。

// Gather First query results
WriteResult results = pipeline
        .apply("Connect", JdbcIO.<TableRow>read()
                .withDataSourceConfiguration(buildDataSourceConfig(options, URL))
                .withQuery(query)
                .withRowMapper(new JdbcIO.RowMapper<TableRow>() {
                  // Convert ResultSet to PCollection
                  public TableRow mapRow(ResultSet rs) throws Exception {

                    ResultSetMetaData md = rs.getMetaData();
                    int columnCount = md.getColumnCount();
                    TableRow tr = new TableRow();
                    for (int i = 1; i <= columnCount; i++ ) {
                      String name = md.getColumnName(i);
                      tr.set(name, rs.getString(name));
                    }
                    return tr;
                  }
                }))
        .setCoder(TableRowJsonCoder.of())
        .apply("Write to BQ",
                BigQueryIO.writeTableRows()
                        .withSchema(schema)
                        .to(dataset)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
023-01-10T20:33:22.4214526Z WARNING: Unable to infer a schema for type com.google.api.services.bigquery.model.TableRow. Attempting to infer a coder without a schema.
2023-01-10T20:33:22.4216783Z Exception in thread "main" java.lang.NullPointerException
2023-01-10T20:33:22.4218945Z    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.validateNoJsonTypeInSchema(BigQueryIO.java:3035)
2023-01-10T20:33:22.4221029Z    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.continueExpandTyped(BigQueryIO.java:2949)
2023-01-10T20:33:22.4222727Z    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:2880)
2023-01-10T20:33:22.4226464Z    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:2776)
2023-01-10T20:33:22.4228072Z    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1786)
2023-01-10T20:33:22.4234778Z    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
2023-01-10T20:33:22.4237961Z    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
2023-01-10T20:33:22.4240010Z    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:376)
2023-01-10T20:33:22.4242466Z    at edu.mayo.mcc.aide.sqaTransfer.SqaTransfer.buildPipeline(SqaTransfer.java:133)
2023-01-10T20:33:22.4244722Z    at edu.mayo.mcc.aide.sqaTransfer.SqaTransfer.main(SqaTransfer.java:99)
2023-01-10T20:33:22.4246444Z . exit status 1
9o685dep

9o685dep1#

当前错误是由于在BigQueryIOwithSchema(schema)方法中传递了错误的架构。
可以使用TableSchema对象创建架构:

TableSchema schema =
        new TableSchema()
            .setFields(
                Arrays.asList(
                    new TableFieldSchema()
                        .setName("string_field")
                        .setType("STRING")
                        .setMode("REQUIRED"),
                    new TableFieldSchema()
                        .setName("int64_field")
                        .setType("INT64")
                        .setMode("NULLABLE"),
                    new TableFieldSchema()
                        .setName("float64_field")
                        .setType("FLOAT64"), // default mode is "NULLABLE"
                    new TableFieldSchema().setName("numeric_field").setType("NUMERIC"),
                    new TableFieldSchema().setName("bool_field").setType("BOOL"),
                    new TableFieldSchema().setName("bytes_field").setType("BYTES"),
                    new TableFieldSchema().setName("date_field").setType("DATE"),
                    new TableFieldSchema().setName("datetime_field").setType("DATETIME"),
                    new TableFieldSchema().setName("time_field").setType("TIME"),
                    new TableFieldSchema().setName("timestamp_field").setType("TIMESTAMP"),
                    new TableFieldSchema().setName("geography_field").setType("GEOGRAPHY"),
                    new TableFieldSchema()
                        .setName("array_field")
                        .setType("INT64")
                        .setMode("REPEATED")
                        .setDescription("Setting the mode to REPEATED makes this an ARRAY<INT64>."),
                    new TableFieldSchema()
                        .setName("struct_field")
                        .setType("STRUCT")
                        .setDescription(
                            "A STRUCT accepts a custom data class, the fields must match the custom class fields.")
                        .setFields(
                            Arrays.asList(
                                new TableFieldSchema().setName("string_value").setType("STRING"),
                                new TableFieldSchema().setName("int64_value").setType("INT64")))));
    return schema;

// In the IO.
rows.apply(
        "Write to BigQuery",
        BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s", project, dataset, table))
            .withSchema(schema)
            ...

您还可以使用Json架构:

String tableSchemaJson =
    ""
        + "{"
        + "  \"fields\": ["
        + "    {"
        + "      \"name\": \"source\","
        + "      \"type\": \"STRING\","
        + "      \"mode\": \"NULLABLE\""
        + "    },"
        + "    {"
        + "      \"name\": \"quote\","
        + "      \"type\": \"STRING\","
        + "      \"mode\": \"REQUIRED\""
        + "    }"
        + "  ]"
        + "}";

// In the IO.
rows.apply(
        "Write to BigQuery",
        BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s", project, dataset, table))
            .withJsonSchema(tableSchemaJson)
            ...

您可以查看documentation以了解更多详细信息。

相关问题