我编写了一个kafka生产者代码来生成avro数据,但它在序列化数据时显示了以下错误:
线程“main”org.apache.kafka.common.errors.serializationexception中的异常:序列化avro消息时出错,原因是:java.net.unknownhostexception:sandbox-hdf.hortonworks.com atjava.net.abstractplainsocketimpl.connect(abstractplainsocketimpl)。java:184)在java.net.plainsocketimpl.connect(plainsocketimpl。java:172)在连接。java:392)在java.net.socket.connect(socket。java:589)在java.net.socket.connect(socket。java:538)在sun.net.networkclient.doconnect(networkclient。java:180)在sun.net。www.http.httpclient.openserver(httpclient。java:463)在sun.net。www.http.httpclient.openserver(httpclient。java:558)在太阳网。www.http.httpclient.(httpclient。java:242)在sun.net。www.http.httpclient.new(httpclient。java:339)在sun.net。www.http.httpclient.new(httpclient。java:357)在sun.net。www.protocol.http.httpurlconnection.getnewhttpclient(httpurlconnection。java:1220)在sun.net。www.protocol.http.httpurlconnection.plainconnect0(httpurlconnection。java:1156)在太阳网。www.protocol.http.httpurlconnection.plainconnect(httpurlconnection。java:1050)在sun.net。www.protocol.http.httpurlconnection.connect(httpurlconnection。java:984)......
以下是我的生产商代码:
package com.perfaware.kafka01;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.example.Customer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class producerAvro {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties properties = new Properties();
// setting producer properties
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("acks", "1");
properties.setProperty("retries", "10");
// Serialization(avro part)
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.setProperty("schema.registry.url", "http://sandbox-hdf.hortonworks.com:7788/api/v1");
Producer<String, Customer> producer = new KafkaProducer<String, Customer>(properties);
String topic = "topic1";
Customer customer = Customer.newBuilder()
.setAge(21)
.setAutomatedEmail(false)
.setFirstName("Manish")
.setLastName("B.")
.setHeight(176f)
.setWeight(62f)
.build();
ProducerRecord<String, Customer> producerRecord = new ProducerRecord<String, Customer>("topic1", customer);
System.out.println(customer);
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata.toString());
} else {
exception.printStackTrace();
}
}
}).get();
producer.flush();
producer.close();
}
}
如果有帮助的话,我还附上pom.xml文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Kafka_Avro</groupId>
<artifactId>Kafka_Avro_Practise</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<avro.verion>1.7.4</avro.verion>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<confluent.version>3.1.1</confluent.version>
</properties>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-tools -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-tools</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-compiler -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3.1</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-compiler-plugin -->
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-mapred -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.8.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-ipc -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<version>1.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.11</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
我还尝试更改值序列化程序:
com.hortonworks.registries.schemaregistry.serdes.avro.kafka.kafkaavroserializer
但这并没有解决问题。
1条答案
按热度按时间yzuktlbb1#
未知异常:sandbox-hdf.hortonworks.com
如果您使用的是沙盒,您应该编辑
/etc/hosts
文件以使其成为已知主机不过,如果使用hortonworks的注册表,您肯定会希望使用hortonworks序列化程序。现在还不清楚你在使用它时遇到了什么错误,但如果是同样的错误,那就是网络问题,与avro无关。
"value.serializer","com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer"
另外,bootstrap.servers
可能还需要解析到沙盒的kafka示例,而不仅仅是localhost如果你真的想使用合流的,而我不确定它是否会工作,你将需要使用一致的Kafka版本号:例如,你已经把Kafka
1.1.1
,2.0
,汇合3.1.1
以Kafka为基地0.10.x
.与avro类似-所有内容都应设置为
1.8.1
例如,尽管您的代码不需要ipc或mapred avro库。可能也不是编译器。