我正在使用kafka connect进行mqtt-kafka连接。我正在发布mqttlenses的示例数据,并用java编写了kafka的用户代码:
package test;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Iterator;
import java.util.Properties;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import org.json.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.fasterxml.jackson.databind.util.JSONWrappedObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@SuppressWarnings("unused")
public class ConsumerTest {
public static void main(String[] args) throws UnsupportedEncodingException {
System.out.println("consumer123");
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group-1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
@SuppressWarnings("resource")
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList(MQTT TOPIC));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(records);
try
{
String record_data = record.value().toString();
JSONObject obj = new JSONObject(record_data);
String payload = obj.getString("payload");
String s = new String(payload.getBytes(), StandardCharsets.UTF_8);
System.out.println(s);
System.out.println(record_data);
System.out.println(record.key());
//System.out.println(decryptData(payload));
}
catch(Exception je)
{
System.out.println(je.toString());
}
}
}
}
对于输入“hello”,它以consumer=>{“schema”:{“type”:“bytes”,“optional”:false},“payload”:“sgvsbg8k”}打印输出
如何解码来自Kafka消费者的有效载荷?
1条答案
按热度按时间4bbkushb1#
您正在接收的有效负载是“base64”格式。只需解码此base64字符串: