kafka生产者消费者无法产生/使用avro数据

j0pj023g  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(535)

我编写了一个kafka生产者代码来生成avro数据,但它在序列化数据时显示了以下错误:
线程“main”org.apache.kafka.common.errors.serializationexception中的异常:序列化avro消息时出错,原因是:java.net.unknownhostexception:sandbox-hdf.hortonworks.com atjava.net.abstractplainsocketimpl.connect(abstractplainsocketimpl)。java:184)在java.net.plainsocketimpl.connect(plainsocketimpl。java:172)在连接。java:392)在java.net.socket.connect(socket。java:589)在java.net.socket.connect(socket。java:538)在sun.net.networkclient.doconnect(networkclient。java:180)在sun.net。www.http.httpclient.openserver(httpclient。java:463)在sun.net。www.http.httpclient.openserver(httpclient。java:558)在太阳网。www.http.httpclient.(httpclient。java:242)在sun.net。www.http.httpclient.new(httpclient。java:339)在sun.net。www.http.httpclient.new(httpclient。java:357)在sun.net。www.protocol.http.httpurlconnection.getnewhttpclient(httpurlconnection。java:1220)在sun.net。www.protocol.http.httpurlconnection.plainconnect0(httpurlconnection。java:1156)在太阳网。www.protocol.http.httpurlconnection.plainconnect(httpurlconnection。java:1050)在sun.net。www.protocol.http.httpurlconnection.connect(httpurlconnection。java:984)......
以下是我的生产商代码:

package com.perfaware.kafka01;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.example.Customer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class producerAvro {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties properties = new Properties();
        // setting producer properties

        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("acks", "1");
        properties.setProperty("retries", "10");

        // Serialization(avro part)
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
        properties.setProperty("schema.registry.url", "http://sandbox-hdf.hortonworks.com:7788/api/v1");

        Producer<String, Customer> producer = new KafkaProducer<String, Customer>(properties);

        String topic = "topic1";

        Customer customer = Customer.newBuilder()
                .setAge(21)
                .setAutomatedEmail(false)
                .setFirstName("Manish")
                .setLastName("B.")
                .setHeight(176f)
                .setWeight(62f)
                .build();

        ProducerRecord<String, Customer> producerRecord = new ProducerRecord<String, Customer>("topic1", customer);

        System.out.println(customer);
        producer.send(producerRecord, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println(metadata.toString());
                } else {
                    exception.printStackTrace();
                }
            }
        }).get();

        producer.flush();
        producer.close();
    }
}

如果有帮助的话,我还附上pom.xml文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>Kafka_Avro</groupId>
  <artifactId>Kafka_Avro_Practise</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <properties>
    <avro.verion>1.7.4</avro.verion>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <confluent.version>3.1.1</confluent.version>
  </properties>

 <repositories>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
 </repositories>

  <dependencies>

  <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-tools -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-tools</artifactId>
    <version>2.0.0</version>
</dependency>

  <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>3.1.1</version>
        </dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.8.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-compiler -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-compiler</artifactId>
    <version>1.8.2</version>
</dependency>

<dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.3.1</version>
            <scope>provided</scope>
        </dependency>

<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-compiler-plugin -->
<dependency>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.8.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-mapred -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-mapred</artifactId>
    <version>1.8.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-ipc -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-ipc</artifactId>
    <version>1.8.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>1.11</version>
</dependency>

  </dependencies>

   <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

我还尝试更改值序列化程序:
com.hortonworks.registries.schemaregistry.serdes.avro.kafka.kafkaavroserializer
但这并没有解决问题。

yzuktlbb

yzuktlbb1#

未知异常:sandbox-hdf.hortonworks.com
如果您使用的是沙盒,您应该编辑 /etc/hosts 文件以使其成为已知主机
不过,如果使用hortonworks的注册表,您肯定会希望使用hortonworks序列化程序。现在还不清楚你在使用它时遇到了什么错误,但如果是同样的错误,那就是网络问题,与avro无关。 "value.serializer","com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer" 另外, bootstrap.servers 可能还需要解析到沙盒的kafka示例,而不仅仅是localhost
如果你真的想使用合流的,而我不确定它是否会工作,你将需要使用一致的Kafka版本号:例如,你已经把Kafka 1.1.1 , 2.0 ,汇合 3.1.1 以Kafka为基地 0.10.x .
与avro类似-所有内容都应设置为 1.8.1 例如,尽管您的代码不需要ipc或mapred avro库。可能也不是编译器。

相关问题