org.apache.kafka.connect.data.Struct.get()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(9.1k)|赞(0)|评价(0)|浏览(235)

本文整理了Java中org.apache.kafka.connect.data.Struct.get()方法的一些代码示例,展示了Struct.get()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Struct.get()方法的具体详情如下:
包路径:org.apache.kafka.connect.data.Struct
类名称:Struct
方法名:get

Struct.get介绍

[英]Get the value of a field, returning the default value if no value has been set yet and a default value is specified in the field's schema. Because this handles fields of all types, the value is returned as an Object and must be cast to a more specific type.
[中]获取字段的值,如果尚未设置任何值,并且在字段的架构中指定了默认值,则返回默认值。因为它处理所有类型的字段,所以该值作为对象返回,并且必须转换为更具体的类型。

代码示例

代码示例来源:origin: debezium/debezium

private void changeSourceToLastSnapshotRecord(SourceRecord currentRecord) {
  final Struct envelope = (Struct)currentRecord.value();
  final Struct source = (Struct)envelope.get("source");
  if (source.getBoolean(SourceInfo.LAST_SNAPSHOT_RECORD_KEY) != null) {
    source.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, true);
  }
}

代码示例来源:origin: debezium/debezium

private Predicate<SourceRecord> stopOnPKPredicate(int pkValue) {
  return record -> {
    Struct key = (Struct) record.key();
    return ((Integer) key.get(PK_FIELD)) == pkValue;
  };
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldSetNullRecordToNull() {
 // When:
 final SchemaAndValue msg = ProcessingLogMessageFactory.deserializationErrorMsg(
   error,
   Optional.empty()
 ).get();
 // Then:
 final Struct struct = (Struct) msg.value();
 final Struct deserializationError = struct.getStruct(DESERIALIZATION_ERROR);
 assertThat(deserializationError.get(DESERIALIZATION_ERROR_FIELD_RECORD), is(nullValue()));
}

代码示例来源:origin: debezium/debezium

private void assertEmptyFieldValue(SourceRecord record, String fieldName) {
  final Struct envelope = (Struct)record.value();
  final Struct after = (Struct)envelope.get("after");
  assertThat(after.getWithoutDefault(fieldName)).isNull();
}

代码示例来源:origin: debezium/debezium

private void assertFieldAbsent(SourceRecord record, String fieldName) {
  Struct value = (Struct) ((Struct) record.value()).get(Envelope.FieldName.AFTER);
  try {
    value.get(fieldName);
    fail("field should not be present");
  } catch (DataException e) {
    //expected
  }
}

代码示例来源:origin: confluentinc/ksql

@Override
public GenericRow toKsqlRow(final Schema connectSchema,
              final Object connectData) {
 if (!schema.type().equals(Schema.Type.STRUCT)) {
  throw new KsqlException("Schema for a KSQL row should be a struct");
 }
 final Struct rowStruct = (Struct) toKsqlValue(schema, connectSchema, connectData, "");
 if (rowStruct == null) {
  return null;
 }
 // streams are expensive, so we don't use them from serdes. build the row using forEach
 final List<Object> fields = new ArrayList<>(schema.fields().size());
 schema.fields().forEach(field -> fields.add(rowStruct.get(field)));
 return new GenericRow(fields);
}

代码示例来源:origin: debezium/debezium

private Struct updateValue(Schema newValueSchema, Struct oldValue) {
  final Struct newValue = new Struct(newValueSchema);
  for (org.apache.kafka.connect.data.Field field : oldValue.schema().fields()) {
    newValue.put(field.name(), oldValue.get(field));
  }
  return newValue;
}

代码示例来源:origin: debezium/debezium

private Struct updateEnvelope(Schema newEnvelopeSchema, Struct oldEnvelope) {
  final Struct newEnvelope = new Struct(newEnvelopeSchema);
  final Schema newValueSchema = newEnvelopeSchema.field(Envelope.FieldName.BEFORE).schema();
  for (org.apache.kafka.connect.data.Field field : oldEnvelope.schema().fields()) {
    final String fieldName = field.name();
    Object fieldValue = oldEnvelope.get(field);
    if ((Objects.equals(fieldName, Envelope.FieldName.BEFORE) || Objects.equals(fieldName, Envelope.FieldName.AFTER))
        && fieldValue != null) {
      fieldValue = updateValue(newValueSchema, requireStruct(fieldValue, "Updating schema"));
    }
    newEnvelope.put(fieldName, fieldValue);
  }
  return newEnvelope;
}

代码示例来源:origin: debezium/debezium

private static Object[] valuesFor(Struct struct) {
  Object[] array = new Object[struct.schema().fields().size()];
  int index = 0;
  for (Field field : struct.schema().fields()) {
    array[index] = struct.get(field);
    ++index;
  }
  return array;
}

代码示例来源:origin: debezium/debezium

SourceRecord getRecordByOperation(Envelope.Operation operation) throws InterruptedException {
  final SourceRecord candidateRecord = getNextRecord();
  if (!((Struct) candidateRecord.value()).get("op").equals(operation.code())) {
    // MongoDB is not providing really consistent snapshot, so the initial insert
    // can arrive both in initial sync snapshot and in oplog
    return getRecordByOperation(operation);
  }
  return candidateRecord;
}

代码示例来源:origin: debezium/debezium

/**
 * Verify that the given {@link SourceRecord} has a valid non-null integer key that matches the expected integer value.
 *
 * @param record the source record; may not be null
 * @param pkField the single field defining the primary key of the struct; may not be null
 * @param pk the expected integer value of the primary key in the struct
 */
public static void hasValidKey(SourceRecord record, String pkField, int pk) {
  Struct key = (Struct) record.key();
  assertThat(key.get(pkField)).isEqualTo(pk);
}

代码示例来源:origin: debezium/debezium

public SourceRecordAssert valueAfterFieldIsEqualTo(Struct expectedValue) {
  Struct value = (Struct) record.value();
  Struct afterValue = (Struct) value.get("after");
  Assertions.assertThat(afterValue).isEqualTo(expectedValue);
  return this;
}

代码示例来源:origin: debezium/debezium

@Test
public void intTypes() throws Exception {
  Testing.debug("Inserted");
  final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
  List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_int");
  assertThat(testTableRecords).hasSize(1);
  // insert
  VerifyRecord.isValidRead(testTableRecords.get(0));
  Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
  assertRecord(after, EXPECTED_INT);
}

代码示例来源:origin: debezium/debezium

@Test
public void stringTypes() throws Exception {
  Testing.debug("Inserted");
  final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
  List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_string");
  assertThat(testTableRecords).hasSize(1);
  // insert
  VerifyRecord.isValidRead(testTableRecords.get(0));
  Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
  assertRecord(after, EXPECTED_STRING);
}

代码示例来源:origin: debezium/debezium

@Test
public void otherTypes() throws Exception {
  Testing.debug("Inserted");
  final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
  List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_xml");
  assertThat(testTableRecords).hasSize(1);
  // insert
  VerifyRecord.isValidRead(testTableRecords.get(0));
  Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
  assertRecord(after, EXPECTED_XML);
}

代码示例来源:origin: debezium/debezium

private void assertHeartBeatRecordInserted() {
  assertFalse("records not generated", consumer.isEmpty());
  SourceRecord heartbeat = consumer.remove();
  assertEquals("__debezium-heartbeat." + TestHelper.TEST_SERVER, heartbeat.topic());
  Struct key = (Struct) heartbeat.key();
  assertThat(key.get("serverName")).isEqualTo(TestHelper.TEST_SERVER);
}

代码示例来源:origin: debezium/debezium

@Test
public void fpTypes() throws Exception {
  Testing.debug("Inserted");
  final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
  List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_fp");
  assertThat(testTableRecords).hasSize(1);
  // insert
  VerifyRecord.isValidRead(testTableRecords.get(0));
  Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
  assertRecord(after, EXPECTED_FP);
}

代码示例来源:origin: debezium/debezium

@Test
public void dateTimeTypes() throws Exception {
  Testing.debug("Inserted");
  final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
  List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_time");
  assertThat(testTableRecords).hasSize(1);
  // insert
  VerifyRecord.isValidRead(testTableRecords.get(0));
  Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
  assertRecord(after, EXPECTED_DATE_TIME);
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shoudlReplacePrimitivesCorrectly() {
 final Schema schema = SchemaBuilder.struct()
   .field("COLUMN_NAME", Schema.OPTIONAL_INT64_SCHEMA)
   .optional()
   .build();
 final AvroDataTranslator dataTranslator = new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME);
 final GenericRow ksqlRow = new GenericRow(Collections.singletonList(123L));
 final Struct struct = dataTranslator.toConnectRow(ksqlRow);
 assertThat(struct.get("COLUMN_NAME"), equalTo(123L));
 final GenericRow translatedRow = dataTranslator.toKsqlRow(struct.schema(), struct);
 assertThat(translatedRow, equalTo(ksqlRow));
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldReplaceNullWithNull() {
 final Schema schema = SchemaBuilder.struct()
   .field(
     "COLUMN_NAME",
     SchemaBuilder.array(Schema.OPTIONAL_INT64_SCHEMA).optional().build())
   .optional()
   .build();
 final AvroDataTranslator dataTranslator = new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME);
 final GenericRow ksqlRow = new GenericRow(Collections.singletonList(null));
 final Struct struct = dataTranslator.toConnectRow(ksqlRow);
 assertThat(struct.get("COLUMN_NAME"), nullValue());
 final GenericRow translatedRow = dataTranslator.toKsqlRow(struct.schema(), struct);
 assertThat(translatedRow, equalTo(ksqlRow));
}

相关文章