camus示例与kafka一起使用

bxfogqkk  于 2021-05-30  发布在  Hadoop
关注(0)|答案(4)|浏览(658)

我的用例是我想把avro数据从kafka推送到hdfs。加缪似乎是正确的工具,但我不能使它工作。我是加缪的新手,想让加缪的榜样发挥作用,https://github.com/linkedin/camus
现在我正在努力使加缪的例子起作用。然而,我仍然面临着一些问题。
dummylogkafkaproducerclient的代码段

package com.linkedin.camus.example.schemaregistry;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageEncoder;
import com.linkedin.camus.example.records.DummyLog;

public class DummyLogKafkaProducerClient {

    public static void main(String[] args) {

        Properties props = new Properties();

        props.put("metadata.broker.list", "localhost:6667");
        // props.put("serializer.class", "kafka.serializer.StringEncoder");
        // props.put("partitioner.class", "example.producer.SimplePartitioner");
        //props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, byte[]> producer = new Producer<String, byte[]>(config);

        KafkaAvroMessageEncoder encoder = get_DUMMY_LOG_Encoder();

        for (int i = 0; i < 500; i++) {
            KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>("DUMMY_LOG", encoder.toBytes(getDummyLog()));
            producer.send(data);

        }
    }

    public static DummyLog getDummyLog() {
        Random random = new Random();
        DummyLog dummyLog = DummyLog.newBuilder().build();
        dummyLog.setId(random.nextLong());
        dummyLog.setLogTime(new Date().getTime());
        Map<CharSequence, CharSequence> machoStuff = new HashMap<CharSequence, CharSequence>();
        machoStuff.put("macho1", "abcd");
        machoStuff.put("macho2", "xyz");
        dummyLog.setMuchoStuff(machoStuff);
        return dummyLog;
    }

    public static KafkaAvroMessageEncoder get_DUMMY_LOG_Encoder() {
        KafkaAvroMessageEncoder encoder = new KafkaAvroMessageEncoder("DUMMY_LOG", null);
        Properties props = new Properties();
        props.put(KafkaAvroMessageEncoder.KAFKA_MESSAGE_CODER_SCHEMA_REGISTRY_CLASS, "com.linkedin.camus.example.schemaregistry.DummySchemaRegistry");
        encoder.init(props, "DUMMY_LOG");
        return encoder;

    }
}

我还添加了dummyschemaregistry的default no arg构造函数,因为它给出了示例化异常

package com.linkedin.camus.example.schemaregistry;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;

import com.linkedin.camus.example.records.DummyLog;
import com.linkedin.camus.example.records.DummyLog2;
import com.linkedin.camus.schemaregistry.MemorySchemaRegistry;

/**
 * This is a little dummy registry that just uses a memory-backed schema registry to store two dummy Avro schemas. You
 * can use this with camus.properties
 */
public class DummySchemaRegistry extends MemorySchemaRegistry<Schema> {
    public DummySchemaRegistry(Configuration conf) {
        super();
        super.register("DUMMY_LOG", DummyLog.newBuilder().build().getSchema());
        super.register("DUMMY_LOG_2", DummyLog2.newBuilder().build()
                .getSchema());
    }
    public DummySchemaRegistry() {
        super();
        super.register("DUMMY_LOG", DummyLog.newBuilder().build().getSchema());
        super.register("DUMMY_LOG_2", DummyLog2.newBuilder().build().getSchema());
    }
}

下面是我运行程序后得到的异常跟踪
线程“main”com.linkedin.camus.coders.messageencoderexception中出现异常:org.apache.avro.avroruntimeexception:org.apache.avro.avroruntimeexception:字段idtype:long pos:0未设置,并且在com.linkedin.camus.etl.kafka.coders.kafkavromessageencoder.init(kafkavromessageencoder)中没有默认值。java:55)在com.linkedin.camus.example.schemaregistry.dummylogkafkafkaproducerclient.get\u dummy\u log\u编码器(dummylogkafkafkaproducerclient)。java:57)在com.linkedin.camus.example.schemaregistry.dummylogkafkafkaproducerclient.main上(dummylogkafkafkaproducerclient.main)。java:32)原因:org.apache.avro.avroruntimeexception:org.apache.avro.avroruntimeexception:字段idtype:long pos:0 not 在com.linkedin.camus.example.records.dummylog$builder.build(dummylog)中设置并没有默认值。java:214)在com.linkedin.camus.example.schemaregistry.dummyschemaregistry。java:16)位于sun.reflect.nativeconstructoraccessorimpl.newinstance0(本机方法)sun.reflect.nativeconstructoraccessorimpl.newinstance(nativeconstructoraccessorimpl。java:62)在sun.reflect.delegatingconstructoraccessorimpl.newinstance(delegatingconstructoraccessorimpl。java:45)在java.lang.reflect.constructor.newinstance(constructor。java:408)在java.lang.class.newinstance(class。java:438)在com.linkedin.camus.etl.kafka.coders.kafkaavromessageencoder.init(kafkaavromessageencoder。java:52) ... 2个以上原因:org.apache.avro.avroruntimeexception:field idtype:long pos:0未设置,并且在org.apache.avro.data.recordbuilderbase.defaultvalue(recordbuilderbase)中没有默认值。java:151)在com.linkedin.camus.example.records.dummylog$builder.build(dummylog。java:209) ... 9个以上

fnatzsnv

fnatzsnv1#

camus不假设模式将具有默认值。我最近用加缪发现了同样的问题。实际上,在默认示例中,它在schema registry中的使用方式是不正确的。我做了一些修改,在加缪代码,你可以检查出来https://github.com/chandanbansal/camus 有一些小的变化,使它的工作。他们没有avro记录的解码器。我也写过。

ogsagwnx

ogsagwnx2#

您可以按如下方式默认任何字符串或长字段

{"type":"record","name":"CounterData","namespace":"org.avro.usage.tutorial","fields":[{"name":"word","type":["string","null"]},{"name":"count","type":["long","null"]}]}
sd2nnvve

sd2nnvve3#

我想gamus希望avro模式有默认值。我已经将dummylog.avsc改为following并重新编译-
{“namespace”:“com.linkedin.camus.example.records”,“type”:“record”,“name”:“dummylog”,“doc”:“不太重要的东西的日志”,“fields”:[{“name”:“id”,“type”:“int”,“default”:0},{“name”:“logtime”,“type”:“int”,“default”:0}
让我知道它是否适合你。
谢谢,大使

jvidinwx

jvidinwx4#

我遇到这个问题是因为我正在初始化注册表,如下所示:

super.register("DUMMY_LOG_2", LogEvent.newBuilder().build().getSchema());

当我把它改成:

super.register("logEventAvro", LogEvent.SCHEMA$);

这让我通过了例外。
我还用了加里的 com.linkedin.camus.etl.kafka.coders.AvroMessageDecoder .
我还发现这个博客(alvinjin的笔记本)非常有用。它指出了每一个问题,你可以与加缪的例子,并解决它!

相关问题