apache-kafka 使用Spring Apache Kafka生成多部分文件

axzmvihb  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(137)

我有一个包含String和MultipartFile的模型类,我想将该类作为REST服务发送到Apache Kafka,并从那里使用它,但我在序列化和反序列化模型类时遇到问题,当没有MultipartFile时,我可以序列化和反序列化该类,但不能使用MultipartFile

package com.example.demo.multipartfile.model;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.web.multipart.MultipartFile;

import java.io.Serializable;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Email implements Serializable {
    private String name;
    private MultipartFile file;

}
toiithl6

toiithl61#

这是一个实现MultipartFile接口的类,当我们想反序列化MultipartFile时,可以使用传输类将字节转换为MultipartFile。

package com.example.demo.multipartfile.config;

import org.springframework.core.io.Resource;
import org.springframework.web.multipart.MultipartFile;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;

public class DecodedMultipartFile implements MultipartFile {
    private final byte [] imgContent;
    private final String originalFileName;

    public DecodedMultipartFile(byte[] imgContent , String originalFileName) {
        this.imgContent = imgContent;
        this.originalFileName = originalFileName;
    }

    @Override
    public String getName() {
        return null;
    }

    @Override
    public String getOriginalFilename() {
        return originalFileName;
    }

    @Override
    public String getContentType() {
        return null;
    }

    @Override
    public boolean isEmpty() {
        return false;
    }

    @Override
    public long getSize() {
        return 0;
    }

    @Override
    public byte[] getBytes() throws IOException {
        return imgContent;
    }

    @Override
    public InputStream getInputStream() throws IOException {
        return null;
    }

    @Override
    public Resource getResource() {
        return MultipartFile.super.getResource();
    }

    @Override
    public void transferTo(File dest) throws IOException, IllegalStateException {
        new FileOutputStream(dest).write(imgContent);

    }

    @Override
    public void transferTo(Path dest) throws IOException, IllegalStateException {
        MultipartFile.super.transferTo(dest);
    }
}

反序列化程序类

package com.example.demo.multipartfile.serialization;

import com.example.demo.multipartfile.config.DecodedMultipartFile;
import com.example.demo.multipartfile.model.Email;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.springframework.web.multipart.MultipartFile;

import java.nio.ByteBuffer;
import java.util.Map;

@Slf4j
@Data
@Getter
@Setter
public class EmailDeserializer implements Deserializer<Email> {

    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        Deserializer.super.configure(configs, isKey);
    }

    @Override
    public Email deserialize(String s, byte[] data) {
        int nameSize;
        int fileSize;
        int originalFileNameSize;

        if (data == null)
            return null;

        ByteBuffer buffer = ByteBuffer.wrap(data);

        nameSize = buffer.getInt();
        byte[] nameBytes = new byte[nameSize];
        buffer.get(nameBytes);

        fileSize = buffer.getInt();
        byte[] fileByte = new byte[fileSize];
        buffer.get(fileByte);

        originalFileNameSize = buffer.getInt();
        byte[] originalFileNameByte = new byte[originalFileNameSize];
        buffer.get(originalFileNameByte);

        try {
            String deserializedName = new String(nameBytes, encoding);
            String deserializedOriginalFileName = new String(originalFileNameByte, encoding);
            MultipartFile file = new DecodedMultipartFile(fileByte, deserializedOriginalFileName);

            return new Email(deserializedName, file);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public Email deserialize(String topic, Headers headers, byte[] data) {
        return Deserializer.super.deserialize(topic, headers, data);
    }

    @Override
    public void close() {
        Deserializer.super.close();
    }
}

序列化程式类别

package com.example.demo.multipartfile.serialization;

import com.example.demo.multipartfile.model.Email;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;

@Slf4j
@Data
@Getter
@Setter
public class EmailSerializer implements Serializer<Email> {
    @Override
    public byte[] serialize(String s, Email email) {

        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(bos);

        try {
            byte[] nameByte = email.getName().getBytes();
            byte[] fileByte = email.getFile().getBytes();
            byte[] originalFileNameByte =   email.getFile().getOriginalFilename().getBytes();

            dos.writeInt(nameByte.length);
            dos.write(nameByte);
            dos.writeInt(fileByte.length);
            dos.write(fileByte);
            dos.writeInt(originalFileNameByte.length);
            dos.write(originalFileNameByte);

            return bos.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

现在我可以发送每一件事,我想与Kafka,序列化是奇怪的,但不要担心它会工作.完整项目https://github.com/ehsanv8/KafkaProject.git

相关问题