嘿,我正在做ksql快速入门示例。问题是我想用avro格式生成数据,它在底部抛出错误列表。
教程在https://docs.ksqldb.io/en/latest/tutorials/basics-docker/
重复问题
git clone https://github.com/confluentinc/ksql.git
cd ksql
git checkout 5.5.0-post
cd docs/tutorials/
docker-compose up -d
如果您使用docker ps,您应该看到以下容器正在运行:
confluentinc/ksqldb-examples:5.5.0
confluentinc/cp-ksql-server:5.4.0
confluentinc/cp-schema-registry:5.4.0,
confluentinc/cp-enterprise-kafka:5.4.0
confluentinc/cp-zookeeper:5.4.0
如果我以这样的分隔格式运行该示例,代码将起作用:
docker run --network tutorials_default --rm --name datagen-users \
confluentinc/ksqldb-examples:5.5.0 \
ksql-datagen \
bootstrap-server=kafka:39092 \
quickstart=users \
format=delimited \
topic=users \
msgRate=1
这个例子使用avro格式,我想使用它。示例如下:
docker run --network tutorials_default --rm --name datagen-users \
confluentinc/ksqldb-examples:5.5.0 \
ksql-datagen \
bootstrap-server=kafka:39092 \
quickstart=users \
format=avro \
topic=users \
msgRate=1
当我使用avro格式时,会出现如下错误:
[2020-06-06 15:46:47,632] INFO AvroDataConfig values:
connect.meta.data = true
enhanced.avro.schema.support = false
schemas.cache.config = 1000
(io.confluent.connect.avro.AvroDataConfig:347)
[2020-06-06 15:46:47,650] INFO JsonSchemaDataConfig values:
decimal.format = BASE64
schemas.cache.size = 1000
(io.confluent.connect.json.JsonSchemaDataConfig:179)
[2020-06-06 15:46:47,651] INFO JsonSchemaDataConfig values:
decimal.format = BASE64
schemas.cache.size = 1000
(io.confluent.connect.json.JsonSchemaDataConfig:179)
[2020-06-06 15:46:47,654] INFO ProtobufDataConfig values:
schemas.cache.config = 1000
(io.confluent.connect.protobuf.ProtobufDataConfig:179)
[2020-06-06 15:46:47,672] INFO KsqlConfig values:
ksql.access.validator.enable = auto
ksql.authorization.cache.expiry.time.secs = 30
ksql.authorization.cache.max.entries = 10000
ksql.connect.url = http://localhost:8083
ksql.connect.worker.config =
ksql.extension.dir = ext
ksql.hidden.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
ksql.insert.into.values.enabled = true
ksql.internal.topic.min.insync.replicas = 1
ksql.internal.topic.replicas = 1
ksql.metric.reporters = []
ksql.metrics.extension = null
ksql.metrics.tags.custom =
ksql.new.api.enabled = false
ksql.output.topic.name.prefix =
ksql.persistence.wrap.single.values = true
ksql.persistent.prefix = query_
ksql.pull.queries.enable = true
ksql.query.persistent.active.limit = 2147483647
ksql.query.pull.enable.standby.reads = false
ksql.query.pull.max.allowed.offset.lag = 9223372036854775807
ksql.readonly.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
ksql.schema.registry.url = http://localhost:8081
ksql.security.extension.class = null
ksql.service.id = default_
ksql.sink.window.change.log.additional.retention = 1000000
ksql.streams.shutdown.timeout.ms = 300000
ksql.transient.prefix = transient_
ksql.udf.collect.metrics = false
ksql.udf.enable.security.manager = true
ksql.udfs.enabled = true
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
(io.confluent.ksql.util.KsqlConfig:347)
[2020-06-06 15:46:47,720] INFO AvroDataConfig values:
connect.meta.data = true
enhanced.avro.schema.support = false
schemas.cache.config = 1
(io.confluent.connect.avro.AvroDataConfig:347)
[2020-06-06 15:46:47,752] INFO ProcessingLogConfig values:
ksql.logging.processing.rows.include = false
ksql.logging.processing.stream.auto.create = false
ksql.logging.processing.stream.name = KSQL_PROCESSING_LOG
ksql.logging.processing.topic.auto.create = false
ksql.logging.processing.topic.name =
ksql.logging.processing.topic.partitions = 1
ksql.logging.processing.topic.replication.factor = 1
(io.confluent.ksql.logging.processing.ProcessingLogConfig:347)
[2020-06-06 15:46:47,767] INFO AvroConverterConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.connect.avro.AvroConverterConfig:179)
[2020-06-06 15:46:47,770] INFO KafkaAvroSerializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.kafka.serializers.KafkaAvroSerializerConfig:179)
[2020-06-06 15:46:47,771] INFO KafkaAvroDeserializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
specific.avro.reader = false
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.kafka.serializers.KafkaAvroDeserializerConfig:179)
[2020-06-06 15:46:47,771] INFO AvroDataConfig values:
connect.meta.data = false
enhanced.avro.schema.support = false
schemas.cache.config = 1000
(io.confluent.connect.avro.AvroDataConfig:347)
[2020-06-06 15:46:47,772] INFO AvroConverterConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.connect.avro.AvroConverterConfig:179)
[2020-06-06 15:46:47,772] INFO KafkaAvroSerializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.kafka.serializers.KafkaAvroSerializerConfig:179)
[2020-06-06 15:46:47,772] INFO KafkaAvroDeserializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
specific.avro.reader = false
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.kafka.serializers.KafkaAvroDeserializerConfig:179)
[2020-06-06 15:46:47,773] INFO AvroDataConfig values:
connect.meta.data = false
enhanced.avro.schema.support = false
schemas.cache.config = 1000
(io.confluent.connect.avro.AvroDataConfig:347)
[2020-06-06 15:46:47,774] INFO AvroConverterConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.connect.avro.AvroConverterConfig:179)
[2020-06-06 15:46:47,775] INFO KafkaAvroSerializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.kafka.serializers.KafkaAvroSerializerConfig:179)
[2020-06-06 15:46:47,775] INFO KafkaAvroDeserializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
specific.avro.reader = false
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.kafka.serializers.KafkaAvroDeserializerConfig:179)
[2020-06-06 15:46:47,776] INFO AvroDataConfig values:
connect.meta.data = false
enhanced.avro.schema.support = false
schemas.cache.config = 1000
(io.confluent.connect.avro.AvroDataConfig:347)
[2020-06-06 15:46:47,776] INFO AvroConverterConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.connect.avro.AvroConverterConfig:179)
[2020-06-06 15:46:47,776] INFO KafkaAvroSerializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.kafka.serializers.KafkaAvroSerializerConfig:179)
[2020-06-06 15:46:47,777] INFO KafkaAvroDeserializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
specific.avro.reader = false
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.kafka.serializers.KafkaAvroDeserializerConfig:179)
[2020-06-06 15:46:47,777] INFO AvroDataConfig values:
connect.meta.data = false
enhanced.avro.schema.support = false
schemas.cache.config = 1000
(io.confluent.connect.avro.AvroDataConfig:347)
[2020-06-06 15:46:47,817] WARN The configuration 'ksql.schema.registry.url' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig:355)
[2020-06-06 15:46:47,817] WARN The configuration 'ksql.schema.registry.url' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig:355)
[2020-06-06 15:46:48,038] ERROR Failed to send HTTP request to endpoint: http://localhost:8081/subjects/users-value/versions (io.confluent.kafka.schemaregistry.client.rest.RestService:268)
java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1162)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1340)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1315)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:264)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:495)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:486)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:459)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:206)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:268)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:244)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:74)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:138)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:84)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:49)
at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:281)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:248)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
at io.confluent.ksql.datagen.DataGenProducer.produceOne(DataGenProducer.java:122)
at io.confluent.ksql.datagen.DataGenProducer.populateTopic(DataGenProducer.java:91)
at io.confluent.ksql.datagen.DataGen.lambda$getProducerTask$1(DataGen.java:111)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: users
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic users :
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:87)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:49)
at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:281)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:248)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
at io.confluent.ksql.datagen.DataGenProducer.produceOne(DataGenProducer.java:122)
at io.confluent.ksql.datagen.DataGenProducer.populateTopic(DataGenProducer.java:91)
at io.confluent.ksql.datagen.DataGen.lambda$getProducerTask$1(DataGen.java:111)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1162)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1340)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1315)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:264)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:495)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:486)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:459)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:206)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:268)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:244)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:74)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:138)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:84)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:49)
at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:281)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:248)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
at io.confluent.ksql.datagen.DataGenProducer.produceOne(DataGenProducer.java:122)
at io.confluent.ksql.datagen.DataGenProducer.populateTopic(DataGenProducer.java:91)
at io.confluent.ksql.datagen.DataGen.lambda$getProducerTask$1(DataGen.java:111)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
1条答案
按热度按时间jjjwad0x1#
看起来docker文件没有正确地公开架构注册表的端口,请尝试添加
ports
Map如下: