从mqtt读取kafka的输出

polhcujo  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(506)

我正在使用kafka connect进行mqtt-kafka连接。我正在发布mqttlenses的示例数据,并用java编写了kafka的用户代码:

package test;

    import java.io.UnsupportedEncodingException;
    import java.nio.charset.StandardCharsets;
    import java.util.Arrays;
    import java.util.Base64;
    import java.util.Iterator;
    import java.util.Properties;

    import javax.crypto.Cipher;
    import javax.crypto.spec.SecretKeySpec;

    import org.json.*;

    import org.apache.kafka.common.serialization.StringDeserializer;

    import com.fasterxml.jackson.databind.util.JSONWrappedObject;

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;

    @SuppressWarnings("unused")
    public class ConsumerTest {

      public static void main(String[] args) throws UnsupportedEncodingException {
        System.out.println("consumer123");
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "group-1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        @SuppressWarnings("resource")
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        kafkaConsumer.subscribe(Arrays.asList(MQTT TOPIC));
        while (true) {

             ConsumerRecords<String, String> records = kafkaConsumer.poll(100);

          for (ConsumerRecord<String, String> record : records) {

              System.out.println(records);

              try
              {
                  String record_data = record.value().toString();

                  JSONObject obj = new JSONObject(record_data);
                  String payload = obj.getString("payload");

                  String s = new String(payload.getBytes(), StandardCharsets.UTF_8);

                  System.out.println(s);
                  System.out.println(record_data);
                  System.out.println(record.key());
                  //System.out.println(decryptData(payload));

              }
              catch(Exception je)
              {
                  System.out.println(je.toString());
              }
          }

        }

      }

对于输入“hello”,它以consumer=>{“schema”:{“type”:“bytes”,“optional”:false},“payload”:“sgvsbg8k”}打印输出
如何解码来自Kafka消费者的有效载荷?

4bbkushb

4bbkushb1#

您正在接收的有效负载是“base64”格式。只需解码此base64字符串:

byte[] plainText = Base64.getDecoder().decode(payload);
System.out.println(new String(plainTexts));

相关问题