如何转换log4j消息以适合avro模式并发布到kafka

lrl1mhuk  于 2022-11-06  发布在  Kafka
关注(0)|答案(1)|浏览(152)

我正在开发一个系统,它将所有微服务的所有日志发送到一个单独的主题apache kafka。大多数服务都是用python编写的,但我们现在从一个Streams应用转发日志。所有其他服务都使用avro中定义的相同Schema,并由confluent的Schema Registry管理。我可以将数据作为字符串发送到kafka,但不知道如何上传一个链接到Schema Registry Schema的有效avro对象。我目前正尝试通过一个自定义的log4j插件来实现这一点。出于测试的目的,我将这些日志写入它们自己的主题,并使用kcat -b localhost:9092 -s value=avro -r localhost:8081 -t new_logs -f 'key: %k value: %s Partition: %p\n\n'阅读它们,但我得到了
ERROR: Failed to format message in new_logs [0] at offset 0: Avro/Schema-registry message deserialization: Invalid CP1 magic byte 115, expected 0: message not produced with Schema-Registry Avro framing: terminating
执行此操作时(这个kcat命令确实适用于我的实际服务日志主题和所有其他使用有效avro的主题)。最初我尝试使用org.apache.avro.generic.GenericData.Record类,但无法弄清楚如何使它在AbstractLayout接口所需的方法toSerializabletoByteArray中工作,因为该类没有实现可序列化的类。下面是插件、类定义log4j配置

服务日志布局.java

@Plugin(name="ServiceLogLayout", category= Node.CATEGORY, elementType = Layout.ELEMENT_TYPE, printObject = true)
public class ServiceLogLayout extends AbstractLayout<byte[]> {
    Schema record;
    DatumWriter<GenericData.Record> serviceLogDatumWriter;

    public ServiceLogLayout() {
        // maybe set these for avro
        super(null, null, null);
        Schema timestampMilliType = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));

        // CREATE SCHEMA
        Schema.Field service = new Schema.Field("service", SchemaBuilder.builder().stringType(), "Name of service sending this message");
        Schema.Field environment = new Schema.Field("environment", SchemaBuilder.builder().enumeration("environment_type").symbols("local", "dev", "staging", "prod", "shared_services", "testing", "security"));
        Schema.Field level = new Schema.Field("level", SchemaBuilder.builder().enumeration("level_type").symbols("debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"), "logging level");
        Schema.Field msg = new Schema.Field("msg", SchemaBuilder.builder().stringType(), "Required log message");

        List<Schema.Field> fields = new ArrayList<>();
        fields.add(service);
        fields.add(environment);
        fields.add(level);
        fields.add(msg);
        this.record = Schema.createRecord("service_logs", "", "com.test.avro", false, fields);
        this.serviceLogDatumWriter = new GenericDatumWriter<>(this.record);
    }

    @Override
    public byte[] toByteArray(LogEvent event) {
        LOGGER.warn("toByteArray");

        String env = System.getenv("ENVIRONMENT") != null ? System.getenv("ENVIRONMENT").toLowerCase() : "local";
        // FILL IN RECORD
        GenericRecordBuilder schemaBuilder = new GenericRecordBuilder(this.record);
        schemaBuilder.set("service", "testService");
        schemaBuilder.set("environment", new GenericData.EnumSymbol(this.record.getField("environment").schema(), env));
        schemaBuilder.set("level", new GenericData.EnumSymbol(this.record.getField("level").schema(), event.getLevel().name().toLowerCase()));
        schemaBuilder.set("msg", event.getMessage().getFormattedMessage());

        // SERIALIZE
        byte[] data = new byte[0];
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Encoder jsonEncoder = null;
        try {
            jsonEncoder = EncoderFactory.get().jsonEncoder(
                    this.record, stream);
            this.serviceLogDatumWriter.write(schemaBuilder.build(), jsonEncoder);
            jsonEncoder.flush();
            data = stream.toByteArray();
        } catch (IOException e) {
            LOGGER.error("Serialization error:" + e.getMessage());
        }
        return data;
    }

    @Override
    public byte[] toSerializable(LogEvent event) {
        return toByteArray(event);
    }

    @Override
    public String getContentType() {
        return null;
    }

    @PluginFactory
    public static Layout<?> createLayout() {
        return new ServiceLogLayout();
    }

    private static class PrivateObjectOutputStream extends ObjectOutputStream {

        public PrivateObjectOutputStream(final OutputStream os) throws IOException {
            super(os);
        }

        @Override
        protected void writeStreamHeader() {
            // do nothing
        }
    }

}

日志4j2.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" packages="logging.log4j.custom.plugins">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <ServiceLogLayout />

        </Console>
        <Kafka name="Kafka" topic="new_logs">
            <ServiceLogLayout />
            <Property name="bootstrap.servers">${env:BOOTSTRAP_SERVERS}</Property>
        </Kafka>
    </Appenders>
    <Loggers>
        <Root level="INFO">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="Kafka"/>
        </Root>
        <Logger name="org.apache.kafka" level="WARN"/>
    </Loggers>
</Configuration>
n6lpvg4x

n6lpvg4x1#

OneCricketeer有正确的想法,下面是实现:

public class ServiceLogLayout extends AbstractLayout<byte[]> {
    Schema record;
    SchemaRegistryClient client;
    Schema.Parser parser;

    public ServiceLogLayout() {
        // maybe set these for avro
        super(null, null, null);
        Schema timestampMilliType = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));

        // CREATE SCHEMA
        Schema.Field service = new Schema.Field("service", SchemaBuilder.builder().stringType(), "Name of service sending this message");
        Schema.Field environment = new Schema.Field("environment", SchemaBuilder.builder().enumeration("environment_type").symbols("local", "dev", "staging", "prod", "shared_services", "testing", "security"));
        Schema.Field level = new Schema.Field("level", SchemaBuilder.builder().enumeration("level_type").symbols("debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"), "logging level");
        Schema.Field msg = new Schema.Field("msg", SchemaBuilder.builder().stringType(), "Required log message");
        Schema.Field data = new Schema.Field("data", SchemaBuilder.builder().nullable().stringType(), "Optional extra data, such as stack frames");
        Schema.Field timestamp = new Schema.Field("timestamp", SchemaBuilder.builder().type(timestampMilliType));

        List<Schema.Field> fields = new ArrayList<>();
        fields.add(service);
        fields.add(environment);
        fields.add(level);
        fields.add(msg);
        fields.add(data);
        fields.add(timestamp);
        this.record = Schema.createRecord("service_logs", "", "com.test.avro", false, fields);

        client = new CachedSchemaRegistryClient("http://schema-registry:8081", 10000);
        parser = new Schema.Parser();
    }

    @Override
    public byte[] toByteArray(LogEvent event) {

        String env = System.getenv("ENVIRONMENT") != null ? System.getenv("ENVIRONMENT").toLowerCase() : "local";
        // FILL IN RECORD
        GenericRecordBuilder schemaBuilder = new GenericRecordBuilder(this.record);
        schemaBuilder.set("service", "testService");
        schemaBuilder.set("environment", new GenericData.EnumSymbol(this.record.getField("environment").schema(), env));
        schemaBuilder.set("level", new GenericData.EnumSymbol(this.record.getField("level").schema(), event.getLevel().name()));
        schemaBuilder.set("msg", event.getMessage().getFormattedMessage());
        schemaBuilder.set("data", null);
        schemaBuilder.set("timestamp", event.getTimeMillis());

        // SERIALIZE
        byte[] data;
        KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(client);
        data = kafkaAvroSerializer.serialize("service_logs", schemaBuilder.build());

        return data;
    }

    @Override
    public byte[] toSerializable(LogEvent event) {
        return toByteArray(event);
    }

    @Override
    public String getContentType() {
        return null;
    }

    @PluginFactory
    public static Layout<?> createLayout() {
        return new ServiceLogLayout();
    }

    private static class PrivateObjectOutputStream extends ObjectOutputStream {

        public PrivateObjectOutputStream(final OutputStream os) throws IOException {
            super(os);
        }

        @Override
        protected void writeStreamHeader() {
            // do nothing
        }
    }

}

值得注意的是,使用logstash可能也是一个很好的解决方案

相关问题