org.apache.kafka.connect.data.Struct.put()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(296)

本文整理了Java中org.apache.kafka.connect.data.Struct.put()方法的一些代码示例,展示了Struct.put()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Struct.put()方法的具体详情如下:
包路径:org.apache.kafka.connect.data.Struct
类名称:Struct
方法名:put

Struct.put介绍

[英]Set the value of a field. Validates the value, throwing a DataException if it does not match the field's Schema.
[中]设置字段的值。验证该值,如果该值与字段的架构不匹配,则引发DataException。

代码示例

代码示例来源:origin: debezium/debezium

protected Struct schemaChangeRecordValue(String databaseName, String ddlStatements) {
  Struct result = new Struct(schemaChangeValueSchema);
  result.put(Fields.SOURCE, source.struct());
  result.put(Fields.DATABASE_NAME, databaseName);
  result.put(Fields.DDL_STATEMENTS, ddlStatements);
  return result;
}

代码示例来源:origin: debezium/debezium

/**
 * Produce a key struct based on the server name and KEY_SCHEMA
 *
 */
private Struct serverNameKey(String serverName){
  Struct result = new Struct(KEY_SCHEMA);
  result.put(SERVER_NAME_KEY, serverName);
  return result;
}

代码示例来源:origin: debezium/debezium

protected Struct schemaChangeRecordKey(String databaseName) {
  Struct result = new Struct(schemaChangeKeySchema);
  result.put(Fields.DATABASE_NAME, databaseName);
  return result;
}

代码示例来源:origin: debezium/debezium

protected Struct keyFor(String objId) {
    return new Struct(keySchema).put("id", objId);
  }
}

代码示例来源:origin: debezium/debezium

protected Struct struct() {
    return new Struct(schema())
        .put(DEBEZIUM_VERSION_KEY, version)
        .put(DEBEZIUM_CONNECTOR_KEY, connector());
  }
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldSerializeStructCorrectly() throws JsonProcessingException {
 final Struct address = new Struct(addressSchema);
 address.put("NUMBER", 101L);
 address.put("STREET", "University Ave.");
 address.put("CITY", "Palo Alto");
 address.put("STATE", "CA");
 address.put("ZIPCODE", 94301L);
 final byte[] serializedBytes = objectMapper.writeValueAsBytes(address);
 final String jsonString = new String(serializedBytes, StandardCharsets.UTF_8);
 assertThat(jsonString, equalTo("{\"NUMBER\":101,\"STREET\":\"University Ave.\",\"CITY\":\"Palo Alto\",\"STATE\":\"CA\",\"ZIPCODE\":94301}"));
}

代码示例来源:origin: confluentinc/ksql

public Struct toConnectRow(final GenericRow row) {
  final Struct struct = new Struct(schema);
  schema.fields().forEach(
    field -> struct.put(field, row.getColumns().get(field.index()))
  );
  return struct;
 }
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldSerializeStructWithNestedStructAndNullFieldsCorrectly() throws JsonProcessingException {
 final Struct category = new Struct(categorySchema);
 category.put("ID", 1L);
 category.put("NAME", "Food");
 final Struct item = new Struct(itemInfoSchema);
 item.put("ITEMID", 1L);
 item.put("NAME", "ICE CREAM");
 item.put("CATEGORY", null);
 final byte[] serializedBytes = objectMapper.writeValueAsBytes(item);
 final String jsonString = new String(serializedBytes, StandardCharsets.UTF_8);
 assertThat(jsonString, equalTo("{\"ITEMID\":1,\"NAME\":\"ICE CREAM\",\"CATEGORY\":null}"));
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldSerializeStructWithNestedStructCorrectly() throws JsonProcessingException {
 final Struct category = new Struct(categorySchema);
 category.put("ID", 1L);
 category.put("NAME", "Food");
 final Struct item = new Struct(itemInfoSchema);
 item.put("ITEMID", 1L);
 item.put("NAME", "ICE CREAM");
 item.put("CATEGORY", category);
 final byte[] serializedBytes = objectMapper.writeValueAsBytes(item);
 final String jsonString = new String(serializedBytes, StandardCharsets.UTF_8);
 assertThat(jsonString, equalTo("{\"ITEMID\":1,\"NAME\":\"ICE CREAM\",\"CATEGORY\":{\"ID\":1,\"NAME\":\"Food\"}}"));
}

代码示例来源:origin: confluentinc/ksql

@SuppressWarnings("unchecked")
@Test
public void shouldPrintStructRowCorrectly() {
 final Struct address = new Struct(addressSchema);
 address.put("NUMBER", 101L);
 address.put("STREET", "University Ave.");
 address.put("CITY", "Palo Alto");
 address.put("STATE", "CA");
 address.put("ZIPCODE", 94301L);
 final GenericRow genericRow = new GenericRow(ImmutableList.of(
   address));
 final String rowString = genericRow.toString();
 assertThat(rowString, equalTo(
   "[ Struct{NUMBER=101,STREET=University Ave.,CITY=Palo Alto,STATE=CA,ZIPCODE=94301} ]"));
}

代码示例来源:origin: debezium/debezium

private void changeSourceToLastSnapshotRecord(SourceRecord currentRecord) {
  final Struct envelope = (Struct)currentRecord.value();
  final Struct source = (Struct)envelope.get("source");
  if (source.getBoolean(SourceInfo.LAST_SNAPSHOT_RECORD_KEY) != null) {
    source.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, true);
  }
}

代码示例来源:origin: confluentinc/ksql

private Struct enforceFieldTypeForStruct(
  final Schema fieldSchema,
  final Map<String, ?> structMap) {
 final Struct columnStruct = new Struct(fieldSchema);
 final Map<String, String> caseInsensitiveStructFieldNameMap =
   getCaseInsensitiveFieldNameMap(structMap, false);
 fieldSchema.fields()
   .forEach(
     field -> columnStruct.put(field.name(),
       enforceFieldType(
         field.schema(), structMap.get(
           caseInsensitiveStructFieldNameMap.get(field.name().toUpperCase())
         ))));
 return columnStruct;
}

代码示例来源:origin: debezium/debezium

private Struct updateValue(Schema newValueSchema, Struct oldValue) {
  final Struct newValue = new Struct(newValueSchema);
  for (org.apache.kafka.connect.data.Field field : oldValue.schema().fields()) {
    newValue.put(field.name(), oldValue.get(field));
  }
  return newValue;
}

代码示例来源:origin: debezium/debezium

private Struct updateEnvelope(Schema newEnvelopeSchema, Struct oldEnvelope) {
  final Struct newEnvelope = new Struct(newEnvelopeSchema);
  final Schema newValueSchema = newEnvelopeSchema.field(Envelope.FieldName.BEFORE).schema();
  for (org.apache.kafka.connect.data.Field field : oldEnvelope.schema().fields()) {
    final String fieldName = field.name();
    Object fieldValue = oldEnvelope.get(field);
    if ((Objects.equals(fieldName, Envelope.FieldName.BEFORE) || Objects.equals(fieldName, Envelope.FieldName.AFTER))
        && fieldValue != null) {
      fieldValue = updateValue(newValueSchema, requireStruct(fieldValue, "Updating schema"));
    }
    newEnvelope.put(fieldName, fieldValue);
  }
  return newEnvelope;
}

代码示例来源:origin: debezium/debezium

private SourceRecord createUnknownUnnamedSchemaRecord() {
  final Schema recordSchema = SchemaBuilder.struct()
      .field("id", SchemaBuilder.int8())
      .build();
  final Struct before = new Struct(recordSchema);
  before.put("id", (byte)1);
  return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", recordSchema, before);
}

代码示例来源:origin: debezium/debezium

private SourceRecord createUnknownRecord() {
  final Schema recordSchema = SchemaBuilder.struct().name("unknown")
      .field("id", SchemaBuilder.int8())
      .build();
  final Struct before = new Struct(recordSchema);
  before.put("id", (byte)1);
  return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", recordSchema, before);
}

代码示例来源:origin: debezium/debezium

private SourceRecord createCreateRecord() {
  final Schema recordSchema = SchemaBuilder.struct().field("id", SchemaBuilder.int8()).build();
  Envelope envelope = Envelope.defineSchema()
      .withName("dummy.Envelope")
      .withRecord(recordSchema)
      .withSource(SchemaBuilder.struct().build())
      .build();
  final Struct before = new Struct(recordSchema);
  before.put("id", (byte)1);
  final Struct payload = envelope.create(before, null, System.nanoTime());
  return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", envelope.schema(), payload);
}

代码示例来源:origin: debezium/debezium

private SourceRecord createDeleteRecord() {
  final Schema recordSchema = SchemaBuilder.struct().field("id", SchemaBuilder.int8()).build();
  Envelope envelope = Envelope.defineSchema()
      .withName("dummy.Envelope")
      .withRecord(recordSchema)
      .withSource(SchemaBuilder.struct().build())
      .build();
  final Struct before = new Struct(recordSchema);
  before.put("id", (byte)1);
  final Struct payload = envelope.delete(before, null, System.nanoTime());
  return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", envelope.schema(), payload);
}

代码示例来源:origin: debezium/debezium

@Test
  @FixFor("DBZ-759")
  public void correctlySerializesStructWithByteArray() {
    Schema schema = SchemaBuilder.struct()
          .field("some_field", SchemaBuilder.bytes().build())
          .build();

    Struct struct = new Struct(schema).put("some_field", new byte[]{1, 3, 5, 7});
    assertThat(SchemaUtil.asString(struct)).isEqualTo("{\"some_field\" : [1, 3, 5, 7]}");
  }
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldDeserializeUnionToStruct() {
 final org.apache.avro.Schema avroSchema = org.apache.avro.SchemaBuilder.unionOf()
   .intType().and()
   .stringType()
   .endUnion();
 final Schema ksqlSchema = SchemaBuilder.struct()
   .field("int", Schema.OPTIONAL_INT32_SCHEMA)
   .field("string", Schema.OPTIONAL_STRING_SCHEMA)
   .optional()
   .build();
 final Struct ksqlValue = new Struct(ksqlSchema).put("string", "foobar");
 shouldDeserializeTypeCorrectly(avroSchema, "foobar", ksqlSchema, ksqlValue);
}

相关文章