我正在开发一个系统,它将所有微服务的所有日志发送到一个单独的主题apache kafka。大多数服务都是用python编写的,但我们现在从一个Streams应用转发日志。所有其他服务都使用avro中定义的相同Schema,并由confluent的Schema Registry管理。我可以将数据作为字符串发送到kafka,但不知道如何上传一个链接到Schema Registry Schema的有效avro对象。我目前正尝试通过一个自定义的log4j插件来实现这一点。出于测试的目的,我将这些日志写入它们自己的主题,并使用kcat -b localhost:9092 -s value=avro -r localhost:8081 -t new_logs -f 'key: %k value: %s Partition: %p\n\n'
阅读它们,但我得到了ERROR: Failed to format message in new_logs [0] at offset 0: Avro/Schema-registry message deserialization: Invalid CP1 magic byte 115, expected 0: message not produced with Schema-Registry Avro framing: terminating
个
执行此操作时(这个kcat命令确实适用于我的实际服务日志主题和所有其他使用有效avro的主题)。最初我尝试使用org.apache.avro.generic.GenericData.Record类,但无法弄清楚如何使它在AbstractLayout接口所需的方法toSerializable
和toByteArray
中工作,因为该类没有实现可序列化的类。下面是插件、类定义log4j配置
服务日志布局.java
@Plugin(name="ServiceLogLayout", category= Node.CATEGORY, elementType = Layout.ELEMENT_TYPE, printObject = true)
public class ServiceLogLayout extends AbstractLayout<byte[]> {
Schema record;
DatumWriter<GenericData.Record> serviceLogDatumWriter;
public ServiceLogLayout() {
// maybe set these for avro
super(null, null, null);
Schema timestampMilliType = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
// CREATE SCHEMA
Schema.Field service = new Schema.Field("service", SchemaBuilder.builder().stringType(), "Name of service sending this message");
Schema.Field environment = new Schema.Field("environment", SchemaBuilder.builder().enumeration("environment_type").symbols("local", "dev", "staging", "prod", "shared_services", "testing", "security"));
Schema.Field level = new Schema.Field("level", SchemaBuilder.builder().enumeration("level_type").symbols("debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"), "logging level");
Schema.Field msg = new Schema.Field("msg", SchemaBuilder.builder().stringType(), "Required log message");
List<Schema.Field> fields = new ArrayList<>();
fields.add(service);
fields.add(environment);
fields.add(level);
fields.add(msg);
this.record = Schema.createRecord("service_logs", "", "com.test.avro", false, fields);
this.serviceLogDatumWriter = new GenericDatumWriter<>(this.record);
}
@Override
public byte[] toByteArray(LogEvent event) {
LOGGER.warn("toByteArray");
String env = System.getenv("ENVIRONMENT") != null ? System.getenv("ENVIRONMENT").toLowerCase() : "local";
// FILL IN RECORD
GenericRecordBuilder schemaBuilder = new GenericRecordBuilder(this.record);
schemaBuilder.set("service", "testService");
schemaBuilder.set("environment", new GenericData.EnumSymbol(this.record.getField("environment").schema(), env));
schemaBuilder.set("level", new GenericData.EnumSymbol(this.record.getField("level").schema(), event.getLevel().name().toLowerCase()));
schemaBuilder.set("msg", event.getMessage().getFormattedMessage());
// SERIALIZE
byte[] data = new byte[0];
ByteArrayOutputStream stream = new ByteArrayOutputStream();
Encoder jsonEncoder = null;
try {
jsonEncoder = EncoderFactory.get().jsonEncoder(
this.record, stream);
this.serviceLogDatumWriter.write(schemaBuilder.build(), jsonEncoder);
jsonEncoder.flush();
data = stream.toByteArray();
} catch (IOException e) {
LOGGER.error("Serialization error:" + e.getMessage());
}
return data;
}
@Override
public byte[] toSerializable(LogEvent event) {
return toByteArray(event);
}
@Override
public String getContentType() {
return null;
}
@PluginFactory
public static Layout<?> createLayout() {
return new ServiceLogLayout();
}
private static class PrivateObjectOutputStream extends ObjectOutputStream {
public PrivateObjectOutputStream(final OutputStream os) throws IOException {
super(os);
}
@Override
protected void writeStreamHeader() {
// do nothing
}
}
}
日志4j2.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" packages="logging.log4j.custom.plugins">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<ServiceLogLayout />
</Console>
<Kafka name="Kafka" topic="new_logs">
<ServiceLogLayout />
<Property name="bootstrap.servers">${env:BOOTSTRAP_SERVERS}</Property>
</Kafka>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Console"/>
<AppenderRef ref="Kafka"/>
</Root>
<Logger name="org.apache.kafka" level="WARN"/>
</Loggers>
</Configuration>
1条答案
按热度按时间n6lpvg4x1#
OneCricketeer有正确的想法,下面是实现:
值得注意的是,使用logstash可能也是一个很好的解决方案