java—在没有第三方插件或库的情况下将avro消息从kafka写入hdfs

huwehgph  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(232)

我已经生成了一个apacheavro类,用于生成和使用客户端应用程序的avro消息以及服务器端代码。
我在互联网上搜索了几个用例。
将avro消息作为二进制数据写入hdfs。
通过使用avro生成的类将字节数组二进制数据转换为字符串形式的消息并将其写入hdfs。
在没有任何第三方库(如confluent或twitter双射api)的情况下,是否还有其他方法可以实现上述用例。
顺便说一句,我想写所有的avro消息到一个单一的hdfs文件。
avro生成类:

/**
 * Autogenerated by Avro
 *
 * DO NOT EDIT DIRECTLY
 */
package com.avrotohdfs.classes.avro;

import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.SchemaStore;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecordBase;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class AvroSyslogMessage extends SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = -793689732516755717L;
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"AvroSyslogMessage\",\"namespace\":\"com.cisco.sso.ssodata.avro\",\"fields\":[{\"name\":\"partyID\",\"type\":\"string\"},{\"name\":\"partyName\",\"type\":[\"string\",\"null\"]},{\"name\":\"applianceID\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"},{\"name\":\"inventoryName\",\"type\":[\"string\",\"null\"]},{\"name\":\"senttime\",\"type\":[\"string\",\"null\"]}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

  private static SpecificData MODEL$ = new SpecificData();

  private static final BinaryMessageEncoder<AvroSyslogMessage> ENCODER =
      new BinaryMessageEncoder<AvroSyslogMessage>(MODEL$, SCHEMA$);

  private static final BinaryMessageDecoder<AvroSyslogMessage> DECODER =
      new BinaryMessageDecoder<AvroSyslogMessage>(MODEL$, SCHEMA$);

  /**
   * Return the BinaryMessageDecoder instance used by this class.
   */
  public static BinaryMessageDecoder<AvroSyslogMessage> getDecoder() {
    return DECODER;
  }

  /**
   * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
   */
  public static BinaryMessageDecoder<AvroSyslogMessage> createDecoder(SchemaStore resolver) {
    return new BinaryMessageDecoder<AvroSyslogMessage>(MODEL$, SCHEMA$, resolver);
  }

  /**Serializes this AvroSyslogMessage to a ByteBuffer. */
  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
    return ENCODER.encode(this);
  }

  /**Deserializes a AvroSyslogMessage from a ByteBuffer. */
  public static AvroSyslogMessage fromByteBuffer(
      java.nio.ByteBuffer b) throws java.io.IOException {
    return DECODER.decode(b);
  }

  @Deprecated public java.lang.CharSequence partyID;
  @Deprecated public java.lang.CharSequence partyName;
  @Deprecated public java.lang.CharSequence applianceID;
  @Deprecated public java.lang.CharSequence message;
  @Deprecated public java.lang.CharSequence inventoryName;
  @Deprecated public java.lang.CharSequence senttime;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use <code>newBuilder()</code>.
   */
  public AvroSyslogMessage() {}

  /**
   * All-args constructor.
   * @param partyID The new value for partyID
   * @param partyName The new value for partyName
   * @param applianceID The new value for applianceID
   * @param message The new value for message
   * @param inventoryName The new value for inventoryName
   * @param senttime The new value for senttime
   */
  public AvroSyslogMessage(java.lang.CharSequence partyID, java.lang.CharSequence partyName, java.lang.CharSequence applianceID, java.lang.CharSequence message, java.lang.CharSequence inventoryName, java.lang.CharSequence senttime) {
    this.partyID = partyID;
    this.partyName = partyName;
    this.applianceID = applianceID;
    this.message = message;
    this.inventoryName = inventoryName;
    this.senttime = senttime;
  }

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call.
  public java.lang.Object get(int field$) {
    switch (field$) {
    case 0: return partyID;
    case 1: return partyName;
    case 2: return applianceID;
    case 3: return message;
    case 4: return inventoryName;
    case 5: return senttime;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  // Used by DatumReader.  Applications should not call.
  @SuppressWarnings(value="unchecked")
  public void put(int field$, java.lang.Object value$) {
    switch (field$) {
    case 0: partyID = (java.lang.CharSequence)value$; break;
    case 1: partyName = (java.lang.CharSequence)value$; break;
    case 2: applianceID = (java.lang.CharSequence)value$; break;
    case 3: message = (java.lang.CharSequence)value$; break;
    case 4: inventoryName = (java.lang.CharSequence)value$; break;
    case 5: senttime = (java.lang.CharSequence)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  /**
   * Gets the value of the 'partyID' field.
   * @return The value of the 'partyID' field.
   */
  public java.lang.CharSequence getPartyID() {
    return partyID;
  }

  /**
   * Sets the value of the 'partyID' field.
   * @param value the value to set.
   */
  public void setPartyID(java.lang.CharSequence value) {
    this.partyID = value;
  }

  /**
   * Gets the value of the 'partyName' field.
   * @return The value of the 'partyName' field.
   */
  public java.lang.CharSequence getPartyName() {
    return partyName;
  }

  /**
   * Sets the value of the 'partyName' field.
   * @param value the value to set.
   */
  public void setPartyName(java.lang.CharSequence value) {
    this.partyName = value;
  }

  /**
   * Gets the value of the 'applianceID' field.
   * @return The value of the 'applianceID' field.
   */
  public java.lang.CharSequence getApplianceID() {
    return applianceID;
  }

  /**
   * Sets the value of the 'applianceID' field.
   * @param value the value to set.
   */
  public void setApplianceID(java.lang.CharSequence value) {
    this.applianceID = value;
  }

  /**
   * Gets the value of the 'message' field.
   * @return The value of the 'message' field.
   */
  public java.lang.CharSequence getMessage() {
    return message;
  }

  /**
   * Sets the value of the 'message' field.
   * @param value the value to set.
   */
  public void setMessage(java.lang.CharSequence value) {
    this.message = value;
  }

  /**
   * Gets the value of the 'inventoryName' field.
   * @return The value of the 'inventoryName' field.
   */
  public java.lang.CharSequence getInventoryName() {
    return inventoryName;
  }

  /**
   * Sets the value of the 'inventoryName' field.
   * @param value the value to set.
   */
  public void setInventoryName(java.lang.CharSequence value) {
    this.inventoryName = value;
  }

  /**
   * Gets the value of the 'senttime' field.
   * @return The value of the 'senttime' field.
   */
  public java.lang.CharSequence getSenttime() {
    return senttime;
  }

  /**
   * Sets the value of the 'senttime' field.
   * @param value the value to set.
   */
  public void setSenttime(java.lang.CharSequence value) {
    this.senttime = value;
  }

  /**
   * Creates a new AvroSyslogMessage RecordBuilder.
   * @return A new AvroSyslogMessage RecordBuilder
   */
  public static com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder newBuilder() {
    return new com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder();
  }

  /**
   * Creates a new AvroSyslogMessage RecordBuilder by copying an existing Builder.
   * @param other The existing builder to copy.
   * @return A new AvroSyslogMessage RecordBuilder
   */
  public static com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder newBuilder(com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder other) {
    return new com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder(other);
  }

  /**
   * Creates a new AvroSyslogMessage RecordBuilder by copying an existing AvroSyslogMessage instance.
   * @param other The existing instance to copy.
   * @return A new AvroSyslogMessage RecordBuilder
   */
  public static com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder newBuilder(com.cisco.sso.ssodata.avro.AvroSyslogMessage other) {
    return new com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder(other);
  }

  /**
   * RecordBuilder for AvroSyslogMessage instances.
   */
  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<AvroSyslogMessage>
    implements org.apache.avro.data.RecordBuilder<AvroSyslogMessage> {

    private java.lang.CharSequence partyID;
    private java.lang.CharSequence partyName;
    private java.lang.CharSequence applianceID;
    private java.lang.CharSequence message;
    private java.lang.CharSequence inventoryName;
    private java.lang.CharSequence senttime;

    /**Creates a new Builder */
    private Builder() {
      super(SCHEMA$);
    }

    /**
     * Creates a Builder by copying an existing Builder.
     * @param other The existing Builder to copy.
     */
    private Builder(com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder other) {
      super(other);
      if (isValidValue(fields()[0], other.partyID)) {
        this.partyID = data().deepCopy(fields()[0].schema(), other.partyID);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.partyName)) {
        this.partyName = data().deepCopy(fields()[1].schema(), other.partyName);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.applianceID)) {
        this.applianceID = data().deepCopy(fields()[2].schema(), other.applianceID);
        fieldSetFlags()[2] = true;
      }
      if (isValidValue(fields()[3], other.message)) {
        this.message = data().deepCopy(fields()[3].schema(), other.message);
        fieldSetFlags()[3] = true;
      }
      if (isValidValue(fields()[4], other.inventoryName)) {
        this.inventoryName = data().deepCopy(fields()[4].schema(), other.inventoryName);
        fieldSetFlags()[4] = true;
      }
      if (isValidValue(fields()[5], other.senttime)) {
        this.senttime = data().deepCopy(fields()[5].schema(), other.senttime);
        fieldSetFlags()[5] = true;
      }
    }

    /**
     * Creates a Builder by copying an existing AvroSyslogMessage instance
     * @param other The existing instance to copy.
     */
    private Builder(com.cisco.sso.ssodata.avro.AvroSyslogMessage other) {
            super(SCHEMA$);
      if (isValidValue(fields()[0], other.partyID)) {
        this.partyID = data().deepCopy(fields()[0].schema(), other.partyID);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.partyName)) {
        this.partyName = data().deepCopy(fields()[1].schema(), other.partyName);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.applianceID)) {
        this.applianceID = data().deepCopy(fields()[2].schema(), other.applianceID);
        fieldSetFlags()[2] = true;
      }
      if (isValidValue(fields()[3], other.message)) {
        this.message = data().deepCopy(fields()[3].schema(), other.message);
        fieldSetFlags()[3] = true;
      }
      if (isValidValue(fields()[4], other.inventoryName)) {
        this.inventoryName = data().deepCopy(fields()[4].schema(), other.inventoryName);
        fieldSetFlags()[4] = true;
      }
      if (isValidValue(fields()[5], other.senttime)) {
        this.senttime = data().deepCopy(fields()[5].schema(), other.senttime);
        fieldSetFlags()[5] = true;
      }
    }

    /**
      * Gets the value of the 'partyID' field.
      * @return The value.
      */
    public java.lang.CharSequence getPartyID() {
      return partyID;
    }

    /**
      * Sets the value of the 'partyID' field.
      * @param value The value of 'partyID'.
      * @return This builder.
      */
    public com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder setPartyID(java.lang.CharSequence value) {
      validate(fields()[0], value);
      this.partyID = value;
      fieldSetFlags()[0] = true;
      return this;
    }

    /**
      * Checks whether the 'partyID' field has been set.
      * @return True if the 'partyID' field has been set, false otherwise.
      */
    public boolean hasPartyID() {
      return fieldSetFlags()[0];
    }

    /**
      * Clears the value of the 'partyID' field.
      * @return This builder.
      */
    public com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder clearPartyID() {
      partyID = null;
      fieldSetFlags()[0] = false;
      return this;
    }

    /**
      * Gets the value of the 'partyName' field.
      * @return The value.
      */
    public java.lang.CharSequence getPartyName() {
      return partyName;
    }

    /**
      * Sets the value of the 'partyName' field.
      * @param value The value of 'partyName'.
      * @return This builder.
      */
    public com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder setPartyName(java.lang.CharSequence value) {
      validate(fields()[1], value);
      this.partyName = value;
      fieldSetFlags()[1] = true;
      return this;
    }

    /**
      * Checks whether the 'partyName' field has been set.
      * @return True if the 'partyName' field has been set, false otherwise.
      */
    public boolean hasPartyName() {
      return fieldSetFlags()[1];
    }

    /**
      * Clears the value of the 'partyName' field.
      * @return This builder.
      */
    public com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder clearPartyName() {
      partyName = null;
      fieldSetFlags()[1] = false;
      return this;
    }

    /**
      * Gets the value of the 'applianceID' field.
      * @return The value.
      */
    public java.lang.CharSequence getApplianceID() {
      return applianceID;
    }

    /**
      * Sets the value of the 'applianceID' field.
      * @param value The value of 'applianceID'.
      * @return This builder.
      */
    public com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder setApplianceID(java.lang.CharSequence value) {
      validate(fields()[2], value);
      this.applianceID = value;
      fieldSetFlags()[2] = true;
      return this;
    }

    /**
      * Checks whether the 'applianceID' field has been set.
      * @return True if the 'applianceID' field has been set, false otherwise.
      */
    public boolean hasApplianceID() {
      return fieldSetFlags()[2];
    }

    /**
      * Clears the value of the 'applianceID' field.
      * @return This builder.
      */
    public com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder clearApplianceID() {
      applianceID = null;
      fieldSetFlags()[2] = false;
      return this;
    }

    /**
      * Gets the value of the 'message' field.
      * @return The value.
      */
    public java.lang.CharSequence getMessage() {
      return message;
    }

    /**
      * Sets the value of the 'message' field.
      * @param value The value of 'message'.
      * @return This builder.
      */
    public com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder setMessage(java.lang.CharSequence value) {
      validate(fields()[3], value);
      this.message = value;
      fieldSetFlags()[3] = true;
      return this;
    }

    /**
      * Checks whether the 'message' field has been set.
      * @return True if the 'message' field has been set, false otherwise.
      */
    public boolean hasMessage() {
      return fieldSetFlags()[3];
    }

    /**
      * Clears the value of the 'message' field.
      * @return This builder.
      */
    public com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder clearMessage() {
      message = null;
      fieldSetFlags()[3] = false;
      return this;
    }

    /**
      * Gets the value of the 'inventoryName' field.
      * @return The value.
      */
    public java.lang.CharSequence getInventoryName() {
      return inventoryName;
    }

    /**
      * Sets the value of the 'inventoryName' field.
      * @param value The value of 'inventoryName'.
      * @return This builder.
      */
    public com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder setInventoryName(java.lang.CharSequence value) {
      validate(fields()[4], value);
      this.inventoryName = value;
      fieldSetFlags()[4] = true;
      return this;
    }

    /**
      * Checks whether the 'inventoryName' field has been set.
      * @return True if the 'inventoryName' field has been set, false otherwise.
      */
    public boolean hasInventoryName() {
      return fieldSetFlags()[4];
    }

    /**
      * Clears the value of the 'inventoryName' field.
      * @return This builder.
      */
    public com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder clearInventoryName() {
      inventoryName = null;
      fieldSetFlags()[4] = false;
      return this;
    }

    /**
      * Gets the value of the 'senttime' field.
      * @return The value.
      */
    public java.lang.CharSequence getSenttime() {
      return senttime;
    }

    /**
      * Sets the value of the 'senttime' field.
      * @param value The value of 'senttime'.
      * @return This builder.
      */
    public com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder setSenttime(java.lang.CharSequence value) {
      validate(fields()[5], value);
      this.senttime = value;
      fieldSetFlags()[5] = true;
      return this;
    }

    /**
      * Checks whether the 'senttime' field has been set.
      * @return True if the 'senttime' field has been set, false otherwise.
      */
    public boolean hasSenttime() {
      return fieldSetFlags()[5];
    }

    /**
      * Clears the value of the 'senttime' field.
      * @return This builder.
      */
    public com.cisco.sso.ssodata.avro.AvroSyslogMessage.Builder clearSenttime() {
      senttime = null;
      fieldSetFlags()[5] = false;
      return this;
    }

    @Override
    @SuppressWarnings("unchecked")
    public AvroSyslogMessage build() {
      try {
        AvroSyslogMessage record = new AvroSyslogMessage();
        record.partyID = fieldSetFlags()[0] ? this.partyID : (java.lang.CharSequence) defaultValue(fields()[0]);
        record.partyName = fieldSetFlags()[1] ? this.partyName : (java.lang.CharSequence) defaultValue(fields()[1]);
        record.applianceID = fieldSetFlags()[2] ? this.applianceID : (java.lang.CharSequence) defaultValue(fields()[2]);
        record.message = fieldSetFlags()[3] ? this.message : (java.lang.CharSequence) defaultValue(fields()[3]);
        record.inventoryName = fieldSetFlags()[4] ? this.inventoryName : (java.lang.CharSequence) defaultValue(fields()[4]);
        record.senttime = fieldSetFlags()[5] ? this.senttime : (java.lang.CharSequence) defaultValue(fields()[5]);
        return record;
      } catch (java.lang.Exception e) {
        throw new org.apache.avro.AvroRuntimeException(e);
      }
    }
  }

  @SuppressWarnings("unchecked")
  private static final org.apache.avro.io.DatumWriter<AvroSyslogMessage>
    WRITER$ = (org.apache.avro.io.DatumWriter<AvroSyslogMessage>)MODEL$.createDatumWriter(SCHEMA$);

  @Override public void writeExternal(java.io.ObjectOutput out)
    throws java.io.IOException {
    WRITER$.write(this, SpecificData.getEncoder(out));
  }

  @SuppressWarnings("unchecked")
  private static final org.apache.avro.io.DatumReader<AvroSyslogMessage>
    READER$ = (org.apache.avro.io.DatumReader<AvroSyslogMessage>)MODEL$.createDatumReader(SCHEMA$);

  @Override public void readExternal(java.io.ObjectInput in)
    throws java.io.IOException {
    READER$.read(this, SpecificData.getDecoder(in));
  }

}

将avro消息作为字节数组写入hdfs的代码:

@Autowired
private PropertyConfig config;

FSDataOutputStream out = null;

public void consume() throws IOException {

    String topic = config.getDedupServiceConsumerTopic();
    String consGroup = config.getDedupServiceConsGroup();

    KafkaConsumer<String, AvroSyslogMessage> consumer = new GenericConsumer<String, AvroSyslogMessage>()
                .initialize(topic, consGroup, STREAMSERDE.STRINGDESER, STREAMSERDE.AVRODESER);

    logger.debug("Dedupe Kafka Consumer Initialized......");

    try {
        while (true) {
            ConsumerRecords<String, AvroSyslogMessage> records = consumer.poll(100);

        for (ConsumerRecord<String, AvroSyslogMessage> record : records) {

                    logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key());

                    AvroSyslogMessage avroMessage = record.value();

                    logger.info("avro Message = " + avroMessage);

                    Configuration config = new Configuration();
                    FileSystem fs = FileSystem.get(config);
                    String s = fs.getHomeDirectory() + "/syslog";
                    Path path = new Path(s);
                    DistributedFileSystem dfs = new DistributedFileSystem();
                    if (!dfs.exists(path)) {
                        dfs.createNewFile(path);
                        out = fs.create(path);
                    }
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    ObjectOutput ooput = new ObjectOutputStream(bos);
                    ooput.writeObject(avroMessage);
                    logger.info("Writing avro message to hdfs");
                    out.write(bos.toByteArray());

                }
            }

        } catch (Exception e) {
            logger.error("Error occured while processing message", e);
        } finally {
            logger.debug("debupe kafka consume is closing");
            consumer.close();
            out.close();
        }

    }

我的问题是,这是否是将avro消息作为字节数组写入hdfs的正确方法。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题