本文整理了Java中org.apache.kafka.connect.data.Struct.get()
方法的一些代码示例,展示了Struct.get()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Struct.get()
方法的具体详情如下:
包路径:org.apache.kafka.connect.data.Struct
类名称:Struct
方法名:get
[英]Get the value of a field, returning the default value if no value has been set yet and a default value is specified in the field's schema. Because this handles fields of all types, the value is returned as an Object and must be cast to a more specific type.
[中]获取字段的值,如果尚未设置任何值,并且在字段的架构中指定了默认值,则返回默认值。因为它处理所有类型的字段,所以该值作为对象返回,并且必须转换为更具体的类型。
代码示例来源:origin: debezium/debezium
private void changeSourceToLastSnapshotRecord(SourceRecord currentRecord) {
final Struct envelope = (Struct)currentRecord.value();
final Struct source = (Struct)envelope.get("source");
if (source.getBoolean(SourceInfo.LAST_SNAPSHOT_RECORD_KEY) != null) {
source.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, true);
}
}
代码示例来源:origin: debezium/debezium
private Predicate<SourceRecord> stopOnPKPredicate(int pkValue) {
return record -> {
Struct key = (Struct) record.key();
return ((Integer) key.get(PK_FIELD)) == pkValue;
};
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldSetNullRecordToNull() {
// When:
final SchemaAndValue msg = ProcessingLogMessageFactory.deserializationErrorMsg(
error,
Optional.empty()
).get();
// Then:
final Struct struct = (Struct) msg.value();
final Struct deserializationError = struct.getStruct(DESERIALIZATION_ERROR);
assertThat(deserializationError.get(DESERIALIZATION_ERROR_FIELD_RECORD), is(nullValue()));
}
代码示例来源:origin: debezium/debezium
private void assertEmptyFieldValue(SourceRecord record, String fieldName) {
final Struct envelope = (Struct)record.value();
final Struct after = (Struct)envelope.get("after");
assertThat(after.getWithoutDefault(fieldName)).isNull();
}
代码示例来源:origin: debezium/debezium
private void assertFieldAbsent(SourceRecord record, String fieldName) {
Struct value = (Struct) ((Struct) record.value()).get(Envelope.FieldName.AFTER);
try {
value.get(fieldName);
fail("field should not be present");
} catch (DataException e) {
//expected
}
}
代码示例来源:origin: confluentinc/ksql
@Override
public GenericRow toKsqlRow(final Schema connectSchema,
final Object connectData) {
if (!schema.type().equals(Schema.Type.STRUCT)) {
throw new KsqlException("Schema for a KSQL row should be a struct");
}
final Struct rowStruct = (Struct) toKsqlValue(schema, connectSchema, connectData, "");
if (rowStruct == null) {
return null;
}
// streams are expensive, so we don't use them from serdes. build the row using forEach
final List<Object> fields = new ArrayList<>(schema.fields().size());
schema.fields().forEach(field -> fields.add(rowStruct.get(field)));
return new GenericRow(fields);
}
代码示例来源:origin: debezium/debezium
private Struct updateValue(Schema newValueSchema, Struct oldValue) {
final Struct newValue = new Struct(newValueSchema);
for (org.apache.kafka.connect.data.Field field : oldValue.schema().fields()) {
newValue.put(field.name(), oldValue.get(field));
}
return newValue;
}
代码示例来源:origin: debezium/debezium
private Struct updateEnvelope(Schema newEnvelopeSchema, Struct oldEnvelope) {
final Struct newEnvelope = new Struct(newEnvelopeSchema);
final Schema newValueSchema = newEnvelopeSchema.field(Envelope.FieldName.BEFORE).schema();
for (org.apache.kafka.connect.data.Field field : oldEnvelope.schema().fields()) {
final String fieldName = field.name();
Object fieldValue = oldEnvelope.get(field);
if ((Objects.equals(fieldName, Envelope.FieldName.BEFORE) || Objects.equals(fieldName, Envelope.FieldName.AFTER))
&& fieldValue != null) {
fieldValue = updateValue(newValueSchema, requireStruct(fieldValue, "Updating schema"));
}
newEnvelope.put(fieldName, fieldValue);
}
return newEnvelope;
}
代码示例来源:origin: debezium/debezium
private static Object[] valuesFor(Struct struct) {
Object[] array = new Object[struct.schema().fields().size()];
int index = 0;
for (Field field : struct.schema().fields()) {
array[index] = struct.get(field);
++index;
}
return array;
}
代码示例来源:origin: debezium/debezium
SourceRecord getRecordByOperation(Envelope.Operation operation) throws InterruptedException {
final SourceRecord candidateRecord = getNextRecord();
if (!((Struct) candidateRecord.value()).get("op").equals(operation.code())) {
// MongoDB is not providing really consistent snapshot, so the initial insert
// can arrive both in initial sync snapshot and in oplog
return getRecordByOperation(operation);
}
return candidateRecord;
}
代码示例来源:origin: debezium/debezium
/**
* Verify that the given {@link SourceRecord} has a valid non-null integer key that matches the expected integer value.
*
* @param record the source record; may not be null
* @param pkField the single field defining the primary key of the struct; may not be null
* @param pk the expected integer value of the primary key in the struct
*/
public static void hasValidKey(SourceRecord record, String pkField, int pk) {
Struct key = (Struct) record.key();
assertThat(key.get(pkField)).isEqualTo(pk);
}
代码示例来源:origin: debezium/debezium
public SourceRecordAssert valueAfterFieldIsEqualTo(Struct expectedValue) {
Struct value = (Struct) record.value();
Struct afterValue = (Struct) value.get("after");
Assertions.assertThat(afterValue).isEqualTo(expectedValue);
return this;
}
代码示例来源:origin: debezium/debezium
@Test
public void intTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_int");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidRead(testTableRecords.get(0));
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_INT);
}
代码示例来源:origin: debezium/debezium
@Test
public void stringTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_string");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidRead(testTableRecords.get(0));
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_STRING);
}
代码示例来源:origin: debezium/debezium
@Test
public void otherTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_xml");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidRead(testTableRecords.get(0));
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_XML);
}
代码示例来源:origin: debezium/debezium
private void assertHeartBeatRecordInserted() {
assertFalse("records not generated", consumer.isEmpty());
SourceRecord heartbeat = consumer.remove();
assertEquals("__debezium-heartbeat." + TestHelper.TEST_SERVER, heartbeat.topic());
Struct key = (Struct) heartbeat.key();
assertThat(key.get("serverName")).isEqualTo(TestHelper.TEST_SERVER);
}
代码示例来源:origin: debezium/debezium
@Test
public void fpTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_fp");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidRead(testTableRecords.get(0));
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_FP);
}
代码示例来源:origin: debezium/debezium
@Test
public void dateTimeTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_time");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidRead(testTableRecords.get(0));
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_DATE_TIME);
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shoudlReplacePrimitivesCorrectly() {
final Schema schema = SchemaBuilder.struct()
.field("COLUMN_NAME", Schema.OPTIONAL_INT64_SCHEMA)
.optional()
.build();
final AvroDataTranslator dataTranslator = new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME);
final GenericRow ksqlRow = new GenericRow(Collections.singletonList(123L));
final Struct struct = dataTranslator.toConnectRow(ksqlRow);
assertThat(struct.get("COLUMN_NAME"), equalTo(123L));
final GenericRow translatedRow = dataTranslator.toKsqlRow(struct.schema(), struct);
assertThat(translatedRow, equalTo(ksqlRow));
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldReplaceNullWithNull() {
final Schema schema = SchemaBuilder.struct()
.field(
"COLUMN_NAME",
SchemaBuilder.array(Schema.OPTIONAL_INT64_SCHEMA).optional().build())
.optional()
.build();
final AvroDataTranslator dataTranslator = new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME);
final GenericRow ksqlRow = new GenericRow(Collections.singletonList(null));
final Struct struct = dataTranslator.toConnectRow(ksqlRow);
assertThat(struct.get("COLUMN_NAME"), nullValue());
final GenericRow translatedRow = dataTranslator.toKsqlRow(struct.schema(), struct);
assertThat(translatedRow, equalTo(ksqlRow));
}
内容来源于网络,如有侵权,请联系作者删除!