org.apache.kafka.connect.data.Struct类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(13.3k)|赞(0)|评价(0)|浏览(810)

本文整理了Java中org.apache.kafka.connect.data.Struct类的一些代码示例,展示了Struct类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Struct类的具体详情如下:
包路径:org.apache.kafka.connect.data.Struct
类名称:Struct

Struct介绍

[英]A structured record containing a set of named fields with values, each field using an independent Schema. Struct objects must specify a complete Schema up front, and only fields specified in the Schema may be set.

The Struct's #put(String,Object) method returns the Struct itself to provide a fluent API for constructing complete objects:

Schema schema = SchemaBuilder.struct().name("com.example.Person") 
.field("name", Schema.STRING_SCHEMA).field("age", Schema.INT32_SCHEMA).build() 
Struct struct = new Struct(schema).put("name", "Bobby McGee").put("age", 21)

[中]一种结构化记录,包含一组带值的命名字段,每个字段使用独立的模式。Struct对象必须预先指定完整的模式,并且只能设置模式中指定的字段。
Struct的#put(String,Object)方法返回Struct本身,为构建完整对象提供流畅的API:

Schema schema = SchemaBuilder.struct().name("com.example.Person") 
.field("name", Schema.STRING_SCHEMA).field("age", Schema.INT32_SCHEMA).build() 
Struct struct = new Struct(schema).put("name", "Bobby McGee").put("age", 21)

代码示例

代码示例来源:origin: debezium/debezium

protected Struct schemaChangeRecordKey(String databaseName) {
  Struct result = new Struct(schemaChangeKeySchema);
  result.put(Fields.DATABASE_NAME, databaseName);
  return result;
}

代码示例来源:origin: debezium/debezium

private Struct updateValue(Schema newValueSchema, Struct oldValue) {
  final Struct newValue = new Struct(newValueSchema);
  for (org.apache.kafka.connect.data.Field field : oldValue.schema().fields()) {
    newValue.put(field.name(), oldValue.get(field));
  }
  return newValue;
}

代码示例来源:origin: debezium/debezium

/**
   * Obtain the operation for the given source record.
   *
   * @param record the source record; may not be null
   * @return the operation, or null if no valid operation was found in the record
   */
  public static Operation operationFor(SourceRecord record) {
    Struct value = (Struct) record.value();
    Field opField = value.schema().field(FieldName.OPERATION);
    if (opField != null) {
      return Operation.forCode(value.getString(opField.name()));
    }
    return null;
  }
}

代码示例来源:origin: debezium/debezium

private void changeSourceToLastSnapshotRecord(SourceRecord currentRecord) {
  final Struct envelope = (Struct)currentRecord.value();
  final Struct source = (Struct)envelope.get("source");
  if (source.getBoolean(SourceInfo.LAST_SNAPSHOT_RECORD_KEY) != null) {
    source.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, true);
  }
}

代码示例来源:origin: debezium/debezium

@Test
public void shouldGenerateRecordForInsertEvent() throws InterruptedException {
  CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");
  BsonTimestamp ts = new BsonTimestamp(1000, 1);
  ObjectId objId = new ObjectId();
  Document obj = new Document().append("_id", objId).append("name", "Sally");
  Document event = new Document().append("o", obj)
                  .append("ns", "dbA.c1")
                  .append("ts", ts)
                  .append("h", Long.valueOf(12345678))
                  .append("op", "i");
  RecordsForCollection records = recordMakers.forCollection(collectionId);
  records.recordEvent(event, 1002);
  assertThat(produced.size()).isEqualTo(1);
  SourceRecord record = produced.get(0);
  Struct key = (Struct) record.key();
  Struct value = (Struct) record.value();
  assertThat(key.schema()).isSameAs(record.keySchema());
  assertThat(key.get("id")).isEqualTo("{ \"$oid\" : \"" + objId + "\"}");
  assertThat(value.schema()).isSameAs(record.valueSchema());
  // assertThat(value.getString(FieldName.BEFORE)).isNull();
  assertThat(value.getString(FieldName.AFTER)).isEqualTo(obj.toJson(WRITER_SETTINGS));
  assertThat(value.getString(FieldName.OPERATION)).isEqualTo(Operation.CREATE.code());
  assertThat(value.getInt64(FieldName.TIMESTAMP)).isEqualTo(1002L);
  Struct actualSource = value.getStruct(FieldName.SOURCE);
  Struct expectedSource = source.lastOffsetStruct("rs0", collectionId);
  assertThat(actualSource).isEqualTo(expectedSource);
}

代码示例来源:origin: debezium/debezium

private static Object[] valuesFor(Struct struct) {
  Object[] array = new Object[struct.schema().fields().size()];
  int index = 0;
  for (Field field : struct.schema().fields()) {
    array[index] = struct.get(field);
    ++index;
  }
  return array;
}

代码示例来源:origin: debezium/debezium

private void assertFieldAbsent(SourceRecord record, String fieldName) {
  Struct value = (Struct) ((Struct) record.value()).get(Envelope.FieldName.AFTER);
  try {
    value.get(fieldName);
    fail("field should not be present");
  } catch (DataException e) {
    //expected
  }
}

代码示例来源:origin: debezium/debezium

private void assertBigintUnsignedPrecise(Struct value) {
  Struct after = value.getStruct(Envelope.FieldName.AFTER);
  Integer i = after.getInt32("id");
  assertThat(i).isNotNull();
  //Validate the schema first, we are expecting org.apache.kafka.connect.data.Decimal:Byte  since we are dealing with unsignd-bigint
  //So Unsigned BIGINY would be an int32 type
  assertThat(after.schema().field("c1").schema()).isEqualTo(Decimal.builder(0).schema());
  assertThat(after.schema().field("c2").schema()).isEqualTo(Decimal.builder(0).schema());
  //Validate the schema first, we are expecting int-64 since we are dealing with signed-bigint.
  //So Signed BIGINT would be an INT64 type
  assertThat(after.schema().field("c3").schema()).isEqualTo(Schema.INT64_SCHEMA);
  //Validate candidates values
  switch (i) {
  case 1:
    assertThat(after.get("c1")).isEqualTo(new BigDecimal("18446744073709551615"));
    assertThat(after.get("c2")).isEqualTo(new BigDecimal("18446744073709551615"));
    assertThat(after.getInt64("c3")).isEqualTo(9223372036854775807L);
    break;
  case 2:
    assertThat(after.get("c1")).isEqualTo(new BigDecimal("14446744073709551615"));
    assertThat(after.get("c2")).isEqualTo(new BigDecimal("14446744073709551615"));
    assertThat(after.getInt64("c3")).isEqualTo(-1223372036854775807L);
    break;
  case 3:
    assertThat(after.get("c1")).isEqualTo(new BigDecimal("0"));
    assertThat(after.get("c2")).isEqualTo(new BigDecimal("0"));
    assertThat(after.getInt64("c3")).isEqualTo(-9223372036854775808L);
  }
}

代码示例来源:origin: debezium/debezium

/**
 * Verify that the given {@link SourceRecord} is a {@link Operation#READ READ} record.
 *
 * @param record the source record; may not be null
 */
public static void isValidRead(SourceRecord record) {
  assertThat(record.key()).isNotNull();
  assertThat(record.keySchema()).isNotNull();
  assertThat(record.valueSchema()).isNotNull();
  Struct value = (Struct) record.value();
  assertThat(value).isNotNull();
  assertThat(value.getString(FieldName.OPERATION)).isEqualTo(Operation.READ.code());
  assertThat(value.get(FieldName.AFTER)).isNotNull();
  assertThat(value.get(FieldName.BEFORE)).isNull();
}

代码示例来源:origin: debezium/debezium

private void assertSmallUnsigned(Struct value) {
  Struct after = value.getStruct(Envelope.FieldName.AFTER);
  Integer i = after.getInt32("id");
  assertThat(i).isNotNull();
  //Validate the schema first, we are expecting int-32 since we are dealing with unsignd-smallint
  //So Unsigned SMALLINT would be an int32 type
  assertThat(after.schema().field("c1").schema()).isEqualTo(Schema.INT32_SCHEMA);
  assertThat(after.schema().field("c2").schema()).isEqualTo(Schema.INT32_SCHEMA);
  //Validate the schema first, we are expecting int-16 since we are dealing with signed-smallint.
  //So Signed SMALLINT would be an INT16 type
  assertThat(after.schema().field("c3").schema()).isEqualTo(Schema.INT16_SCHEMA);
  //Validate candidates values
  switch (i) {
  case 1:
    assertThat(after.getInt32("c1")).isEqualTo(65535);
    assertThat(after.getInt32("c2")).isEqualTo(65535);
    assertThat(after.getInt16("c3")).isEqualTo((short)32767);
    break;
  case 2:
    assertThat(after.getInt32("c1")).isEqualTo(45535);
    assertThat(after.getInt32("c2")).isEqualTo(45535);
    assertThat(after.getInt16("c3")).isEqualTo((short)-12767);
    break;
  case 3:
    assertThat(after.getInt32("c1")).isEqualTo(0);
    assertThat(after.getInt32("c2")).isEqualTo(0);
    assertThat(after.getInt16("c3")).isEqualTo((short)-32768);
  }
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldSetNullRecordToNull() {
 // When:
 final SchemaAndValue msg = ProcessingLogMessageFactory.deserializationErrorMsg(
   error,
   Optional.empty()
 ).get();
 // Then:
 final Struct struct = (Struct) msg.value();
 final Struct deserializationError = struct.getStruct(DESERIALIZATION_ERROR);
 assertThat(deserializationError.get(DESERIALIZATION_ERROR_FIELD_RECORD), is(nullValue()));
}

代码示例来源:origin: debezium/debezium

private void assertPoint(Struct value) {
  Struct after = value.getStruct(Envelope.FieldName.AFTER);
  Integer i = after.getInt32("id");
  Testing.debug(after);
  assertThat(i).isNotNull();
  Double expectedX = after.getFloat64("expected_x");
  Double expectedY = after.getFloat64("expected_y");
  Integer expectedSrid = after.getInt32("expected_srid");
  if (after.getStruct("point") != null) {
    Double actualX = after.getStruct("point").getFloat64("x");
    Double actualY = after.getStruct("point").getFloat64("y");
    Integer actualSrid = after.getStruct("point").getInt32("srid");
    //Validate the values
    databaseDifferences.geometryAssertPoints(expectedX, expectedY, actualX, actualY);
    assertThat(actualSrid).isEqualTo(expectedSrid);
    //Test WKB
    Point point = (Point) WkbGeometryReader.readGeometry(new ByteReader((byte[]) after.getStruct("point")
        .get("wkb")));
    databaseDifferences.geometryAssertPoints(expectedX, expectedY, point.getX(), point.getY());
  } else if (expectedX != null) {
    Assert.fail("Got a null geometry but didn't expect to");
  }
}

代码示例来源:origin: confluentinc/ksql

@Override
public byte[] serialize(final String topic, final GenericRow genericRow) {
 if (genericRow == null) {
  return null;
 }
 final Struct struct = translator.toConnectRow(genericRow);
 try {
  return converter.fromConnectData(topic, struct.schema(), struct);
 } catch (final Exception e) {
  throw new SerializationException(
    "Error serializing row to topic " + topic + " using Converter API", e);
 }
}

代码示例来源:origin: debezium/debezium

private Struct offsetStructFor(String replicaSetName, String namespace, Position position, boolean isInitialSync) {
  if (position == null) position = INITIAL_POSITION;
  Struct result = super.struct();
  result.put(SERVER_NAME, serverName);
  result.put(REPLICA_SET_NAME, replicaSetName);
  result.put(NAMESPACE, namespace);
  result.put(TIMESTAMP, position.getTime());
  result.put(ORDER, position.getInc());
  result.put(OPERATION_ID, position.getOperationId());
  if (isInitialSync) {
    result.put(INITIAL_SYNC, true);
  }
  return result;
}

代码示例来源:origin: debezium/debezium

protected static Struct valueFor(SourceRecord record) {
  Struct envelope = (Struct) record.value();
  Field afterField = envelope.schema().field(Envelope.FieldName.AFTER);
  if (afterField != null) {
    return envelope.getStruct(afterField.name());
  }
  return null;
}

代码示例来源:origin: debezium/debezium

private void assertMediumUnsigned(Struct value) {
  Struct after = value.getStruct(Envelope.FieldName.AFTER);
  Integer i = after.getInt32("id");
  assertThat(i).isNotNull();
  //Validate the schema first, we are expecting int-32 since we are dealing with unsignd-mediumint
  //So Unsigned MEDIUMINT would be an int32 type
  assertThat(after.schema().field("c1").schema()).isEqualTo(Schema.INT32_SCHEMA);
  assertThat(after.schema().field("c2").schema()).isEqualTo(Schema.INT32_SCHEMA);
  //Validate the schema first, we are expecting int-32 since we are dealing with signed-mediumint.
  //So Signed MEDIUMINT would be an INT32 type
  assertThat(after.schema().field("c3").schema()).isEqualTo(Schema.INT32_SCHEMA);
  //Validate candidates values
  switch (i) {
  case 1:
    assertThat(after.getInt32("c1")).isEqualTo(16777215);
    assertThat(after.getInt32("c2")).isEqualTo(16777215);
    assertThat(after.getInt32("c3")).isEqualTo(8388607);
    break;
  case 2:
    assertThat(after.getInt32("c1")).isEqualTo(10777215);
    assertThat(after.getInt32("c2")).isEqualTo(10777215);
    assertThat(after.getInt32("c3")).isEqualTo(-6388607);
    break;
  case 3:
    assertThat(after.getInt32("c1")).isEqualTo(0);
    assertThat(after.getInt32("c2")).isEqualTo(0);
    assertThat(after.getInt32("c3")).isEqualTo(-8388608);
  }
}

代码示例来源:origin: debezium/debezium

protected void assertSourceInfo(SourceRecord record) {
  assertTrue(record.value() instanceof Struct);
  Struct source = ((Struct) record.value()).getStruct("source");
  assertNotNull(source.getString("db"));
  assertNotNull(source.getString("schema"));
  assertNotNull(source.getString("table"));
}

代码示例来源:origin: confluentinc/ksql

@Override
 public void serialize(
   final Struct struct,
   final JsonGenerator jsonGenerator,
   final SerializerProvider serializerProvider
 ) throws IOException {
  struct.validate();
  jsonGenerator.writeObject(
    objectMapper.readTree(jsonConverter.fromConnectData("", struct.schema(), struct)));
 }
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldTranslateNullValueCorrectly() {
 final Schema rowSchema = SchemaBuilder.struct()
   .field("INT", SchemaBuilder.OPTIONAL_INT32_SCHEMA)
   .optional()
   .build();
 final Struct connectStruct = new Struct(rowSchema);
 final ConnectDataTranslator connectToKsqlTranslator = new ConnectDataTranslator(rowSchema);
 final GenericRow row = connectToKsqlTranslator.toKsqlRow(rowSchema, connectStruct);
 assertThat(row.getColumns().size(), equalTo(1));
 assertThat(row.getColumnValue(0), is(nullValue()));
}

代码示例来源:origin: debezium/debezium

protected void verifyOperation(SourceRecord record, Operation expected) {
  Struct value = (Struct) record.value();
  assertThat(value.getString(Envelope.FieldName.OPERATION)).isEqualTo(expected.code());
}

相关文章