如果有任何引用项目或代码使用开放源码将kafka消息发送到jms队列,这将非常有用。我试着在junit中调用下面的代码,
private TextMessage getTextMessage(SinkRecord sinkRecord) {
try {
System.out.println("Processing Record: key={} value={}"+sinkRecord.key()+""+sinkRecord.value());
final String payload = sinkRecord.value().toString();
final TextMessage textMessage = webLogicJmsSession.session().createTextMessage(payload);
textMessage.setStringProperty("KafkaTopic", sinkRecord.topic());
textMessage.setIntProperty("KafkaPartition", sinkRecord.kafkaPartition());
textMessage.setLongProperty("KafkaOffset", sinkRecord.kafkaOffset());
Object key = sinkRecord.key();
try {
Schema keySchema = sinkRecord.keySchema();
Struct keyStruct = (Struct) key;
//Struct jmsStruct = keyStruct.getStruct("jms");
Struct jmsProperties = keyStruct.getStruct("properties");
keySchema
.field("properties")
.schema()
.fields()
.forEach(field -> {
String value = jmsProperties.getString(field.name());
try {
textMessage.setStringProperty(field.name(), value);
} catch (JMSException e) {
e.printStackTrace();
}
});
Struct jmsHeaders = keyStruct.getStruct("headers");
textMessage.setJMSCorrelationID(jmsHeaders.getString("JMSCorrelationID"));
final String jmsReplyTo = jmsHeaders.getString("JMSReplyTo");
if (jmsReplyTo != null)
textMessage.setJMSReplyTo(webLogicJmsSession.session().createQueue(jmsReplyTo));
textMessage.setJMSCorrelationID(jmsHeaders.getString("JMSCorrelationID"));
textMessage.setJMSType(jmsHeaders.getString("JMSType"));
} catch (Exception e) {
e.printStackTrace();
}
return textMessage;
} catch (JMSException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
下面是junit代码
Schema dowSchema = SchemaBuilder.array(Schema.STRING_SCHEMA).build();
Schema airingToItemSchema = SchemaBuilder.struct()
.field("dow", dowSchema)
.build();
Schema airingToSchema = SchemaBuilder.array(airingToItemSchema).build();
Schema rootSchema = SchemaBuilder.struct()
.field("airingTo", airingToSchema).build();
Struct item = new Struct(airingToItemSchema)
.put("dow", Collections.singletonList("SATURDAY"));
Struct rootStruct = new Struct(rootSchema)
.put("airingTo", Collections.singletonList(item));
SinkRecord sinkRecord4 = new SinkRecord("test",0,dowSchema,rootStruct,schema,struct,1234L);
webLogicJmsSinkTask.put(Arrays.asList(sinkRecord4));
但它会抛出错误,
处理记录:key={}value={}struct{airingto=[struct{dow=[saturday]}}struct{first\u name=,last\u name=x,电子邮件地址=}org.apache.kafka.connect.errors.dataexception:属性不是有效的字段名
暂无答案!
目前还没有任何答案,快来回答吧!