apache beam的clickhouseio中的数组类型(数据流)

vdgimpew  于 2021-07-15  发布在  ClickHouse
关注(0)|答案(0)|浏览(393)

我使用apachebeam使用json并插入clickhouse。
当前数组数据类型有问题。
在添加数组类型的字段之前,一切都正常

  1. Schema.Field.of("inputs.value", Schema.FieldType.array(Schema.FieldType.INT64).withNullable(true))

转换代码

  1. p.apply(transformNameSuffix + "ReadFromPubSub",
  2. PubsubIO.readStrings().fromSubscription(chainConfig.getPubSubSubscriptionPrefix() + "transactions").withIdAttribute(PUBSUB_ID_ATTRIBUTE))
  3. .apply(transformNameSuffix + "ReadFromPubSub", ParDo.of(new DoFn<String, Row>() {
  4. @ProcessElement
  5. public void processElement(ProcessContext c) {
  6. String item = c.element();
  7. //System.out.print(item);
  8. Transaction transaction = JsonUtils.parseJson(item, Transaction.class);
  9. c.output(Row.withSchema(Schemas.TRANSACTIONS)
  10. .addValues(*****,
  11. *****
  12. .......
  13. transaction.getInputValues()).build());}
  14. })).setRowSchema(Schemas.TRANSACTIONS).apply(
  15. ClickHouseIO.<Row>write(
  16. chainConfig.getClickhouseJDBCURI(),
  17. chainConfig.getTransactionsTable())
  18. .withMaxRetries(3)
  19. .withMaxInsertBlockSize(1)
  20. .withInitialBackoff(Duration.standardSeconds(5))
  21. .withInsertDeduplicate(true)
  22. .withInsertDistributedSync(false));

生成输入的方法

  1. public List<Long> getInputValues() {
  2. List<Long> values = Lists.newArrayList();
  3. for (TransactionInput eachInput : inputs) {
  4. System.out.print(eachInput.getValue());
  5. values.add(eachInput.getValue());
  6. }
  7. return values;
  8. }

我得到的错误是:

  1. ru.yandex.clickhouse.except.ClickHouseException: ClickHouse exception, code: 33, host: 35.202.46.77, port: 8123; Code: 33, e.displayText() = DB::Exception: Cannot read all data. Bytes read: 6. Bytes expected: 15. (version 19.17.4.11 (official build))
  2. at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:58)
  3. at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:28)
  4. at ru.yandex.clickhouse.ClickHouseStatementImpl.checkForErrorAndThrow(ClickHouseStatementImpl.java:875)
  5. at ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:851)
  6. at ru.yandex.clickhouse.Writer.send(Writer.java:106)
  7. at ru.yandex.clickhouse.Writer.send(Writer.java:141)
  8. at ru.yandex.clickhouse.ClickHouseStatementImpl.sendRowBinaryStream(ClickHouseStatementImpl.java:764)
  9. at ru.yandex.clickhouse.ClickHouseStatementImpl.sendRowBinaryStream(ClickHouseStatementImpl.java:758)
  10. at org.apache.beam.sdk.io.clickhouse.ClickHouseIO$WriteFn.flush(ClickHouseIO.java:427)
  11. at org.apache.beam.sdk.io.clickhouse.ClickHouseIO$WriteFn.processElement(ClickHouseIO.java:411)
  12. at org.apache.beam.sdk.io.clickhouse.AutoValue_ClickHouseIO_WriteFn$DoFnInvoker.invokeProcessElement(Unknown Source)
  13. at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:222)
  14. at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
  15. at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78)
  16. at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:216)
  17. at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
  18. at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
  19. at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
  20. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  21. at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  22. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  23. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  24. at java.lang.Thread.run(Thread.java:748)
  25. Caused by: java.lang.Throwable: Code: 33, e.displayText() = DB::Exception: Cannot read all data. Bytes read: 6. Bytes expected: 15. (version 19.17.4.11 (official build))
  26. at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:53)
  27. ... 22 more
  28. Feb 06, 2020 9:04:38 PM org.apache.beam.sdk.io.clickhouse.ClickHouseIO$WriteFn flush
  29. WARNING: Error writing to ClickHouse. Retry attempt[1]
  30. ru.yandex.clickhouse.except.ClickHouseException: ClickHouse exception, code: 33, host: 35.202.46.77, port: 8123; Code: 33, e.displayText() = DB::Exception: Cannot read all data. Bytes read: 6. Bytes expected: 93. (version 19.17.4.11 (official build))
  31. at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:58)
  32. at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:28)
  33. at ru.yandex.clickhouse.ClickHouseStatementImpl.checkForErrorAndThrow(ClickHouseStatementImpl.java:875)
  34. at ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:851)
  35. at ru.yandex.clickhouse.Writer.send(Writer.java:106)
  36. at ru.yandex.clickhouse.Writer.send(Writer.java:141)
  37. at ru.yandex.clickhouse.ClickHouseStatementImpl.sendRowBinaryStream(ClickHouseStatementImpl.java:764)
  38. at ru.yandex.clickhouse.ClickHouseStatementImpl.sendRowBinaryStream(ClickHouseStatementImpl.java:758)
  39. at org.apache.beam.sdk.io.clickhouse.ClickHouseIO$WriteFn.flush(ClickHouseIO.java:427)
  40. at org.apache.beam.sdk.io.clickhouse.ClickHouseIO$WriteFn.processElement(ClickHouseIO.java:411)
  41. at org.apache.beam.sdk.io.clickhouse.AutoValue_ClickHouseIO_WriteFn$DoFnInvoker.invokeProcessElement(Unknown Source)
  42. at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:222)
  43. at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
  44. at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78)
  45. at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:216)
  46. at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
  47. at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
  48. at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
  49. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  50. at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  51. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  52. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  53. at java.lang.Thread.run(Thread.java:748)
  54. Caused by: java.lang.Throwable: Code: 33, e.displayText() = DB::Exception: Cannot read all data. Bytes read: 6. Bytes expected: 93. (version 19.17.4.11 (official build))
  55. at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:53)
  56. ... 22 more
  57. Feb 06, 2020 9:04:39 PM org.apache.beam.sdk.io.clickhouse.ClickHouseIO$WriteFn flush
  58. WARNING: Error writing to ClickHouse. Retry attempt[1]
  59. ru.yandex.clickhouse.except.ClickHouseException: ClickHouse exception, code: 33, host: 35.202.46.77, port: 8123; Code: 33, e.displayText() = DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 2641. (version 19.17.4.11 (official build)

clikhouse架构:

  1. CREATE TABLE IF NOT EXISTS transactions_streaming_small (
  2. *****,
  3. *****,
  4. inputs Nested ( value Nullable(UInt64) ) )
  5. ENGINE = MergeTree() PARTITION BY toYYYYMM(block_date_time)

有什么问题?
[clickhouse版本19.17.4.11(官方版本)]

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题