Spring Boot 为什么不是KenisisClient,putRecord()不接受有效的PutRecordRequest

wribegjk  于 2023-04-30  发布在  Spring
关注(0)|答案(2)|浏览(118)

这是我的代码

KinesisClient kinesisClient = KinesisClient.builder().build();
      PutRecordRequest putRecordRequest = new PutRecordRequest();
      putRecordRequest.setStreamName("stream-name");
      putRecordRequest.setPartitionKey("SomeString-" + UUID.randomUUID());
      putRecordRequest.setData(ByteBuffer.wrap(data));

      PutRecordResponse putRecordResponse = kinesisClient.putRecord(putRecordRequest);

由于一些奇怪的原因,它说它不能解析putRecord(PutRecordRequest)方法,即使当我查看反编译的jar文件时,它显然在那里。当我开始输入它时,它甚至会自动完成该方法。

这里会出什么问题?

gg58donl

gg58donl1#

要使用AWS服务和Java,最好的做法是使用AWS SDK for Java V2。软件包**。amazon.awssdk.services.kinesis.型号**为V2。您使用的是V1,不建议使用此AWS页面上的指定:
AWS SDK代码示例
此操作的完整Java V2代码示例为:

package com.example.kinesis;

//snippet-start:[kinesis.java2.putrecord.import]
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
//snippet-end:[kinesis.java2.putrecord.import]

/**
 * Before running this Java V2 code example, set up your development environment, including your credentials.
 *
 * For more information, see the following documentation topic:
 *
 * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
 */
public class StockTradesWriter {

    public static void main(String[] args) {

        final String usage = "\n" +
            "Usage:\n" +
            "    <streamName>\n\n" +
            "Where:\n" +
            "    streamName - The Amazon Kinesis data stream to which records are written (for example, StockTradeStream)\n\n";

            if (args.length != 1) {
                System.out.println(usage);
                System.exit(1);
            }

            String streamName = args[0];
            Region region = Region.US_EAST_1;
            KinesisClient kinesisClient = KinesisClient.builder()
                .region(region)
                .credentialsProvider(ProfileCredentialsProvider.create())
                .build();

            // Ensure that the Kinesis Stream is valid.
            validateStream(kinesisClient, streamName);
            setStockData( kinesisClient, streamName);
            kinesisClient.close();
    }

    // snippet-start:[kinesis.java2.putrecord.main]
    public static void setStockData( KinesisClient kinesisClient, String streamName) {

        try {
            // Repeatedly send stock trades with a 100 milliseconds wait in between.
            StockTradeGenerator stockTradeGenerator = new StockTradeGenerator();

            // Put in 50 Records for this example.
            int index = 50;
            for (int x=0; x<index; x++){
                StockTrade trade = stockTradeGenerator.getRandomTrade();
                sendStockTrade(trade, kinesisClient, streamName);
                Thread.sleep(100);
            }

        } catch (KinesisException | InterruptedException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
        System.out.println("Done");
    }

    private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient,
                                       String streamName) {
        byte[] bytes = trade.toJsonAsBytes();

        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            System.out.println("Could not get JSON bytes for stock trade");
            return;
        }

        System.out.println("Putting trade: " + trade);
        PutRecordRequest request = PutRecordRequest.builder()
            .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
            .streamName(streamName)
            .data(SdkBytes.fromByteArray(bytes))
            .build();

        try {
            kinesisClient.putRecord(request);
        } catch (KinesisException e) {
            System.err.println(e.getMessage());
        }
    }

    private static void validateStream(KinesisClient kinesisClient, String streamName) {
        try {
            DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                .streamName(streamName)
                .build();

            DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest);

            if(!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) {
                System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again.");
                System.exit(1);
            }

        }catch (KinesisException e) {
            System.err.println("Error found while describing the stream " + streamName);
            System.err.println(e);
            System.exit(1);
        }
    }
    // snippet-end:[kinesis.java2.putrecord.main]
}

您可以在AWS代码库中找到此AWS SDK for Java V2示例:
Kinesis examples using SDK for Java 2.x
如果您更喜欢使用Github,这里是所有AWS Java V2代码示例的位置:
https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/javav2

w8f9ii69

w8f9ii692#

我想明白了由于某种原因,我在我的项目中使用了com.amazonaws.services.kinesis.modelsoftware.amazon.awssdk.services.kinesis.model。我使用的是一个包中的KinesisClient和另一个包中的PutRecordRequest

相关问题