我用了一个动觉生产者在一个组件测试。当我在调试模式下运行kinesis producer,并在将数据插入流之前中断,因此每次推送之间都有时间间隔,一切正常。但是当我在没有任何中断的情况下运行它(或者不在调试模式下)时,会发生一些事情,当我在kinesis consumer中读取数据时,我会看到一条消息更少,一条消息的大小是两条消息的两倍,就好像两条消息合并成一条一样,当我尝试反序列化它时,该消息会引发异常
private AmazonKinesis getClient() {
if(client != null) {
return client;
}
System.setProperty("com.amazonaws.sdk.disableCertChecking", "true");
AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(
configuration.getKinesisEndpoint(), awsCredentialsConf.getRegion());
try {
client = AmazonKinesisClientBuilder.standard()
.withCredentials(awsCredentialsProvider)
.withEndpointConfiguration(endpointConfiguration)
.build();
} catch (Exception ex) {
log.error("Error reading message from Kafka: ", ex);
}
return client;
}
public List<Record> readMessage() {
try {
AmazonKinesis client = getClient();
DescribeStreamResult data = client.describeStream(configuration.getConsumerStreamName());
List<Shard> shards = data.getStreamDescription().getShards();
Shard shId = shards.get(0);
GetShardIteratorRequest iteratorRequest = new GetShardIteratorRequest();
iteratorRequest.setStreamName(configuration.getConsumerStreamName());
iteratorRequest.setShardId(shId.getShardId());
iteratorRequest.setShardIteratorType("TRIM_HORIZON");
GetShardIteratorResult iteratorResponse = client.getShardIterator(iteratorRequest);
String iterator = iteratorResponse.getShardIterator();
GetRecordsRequest getRequest = new GetRecordsRequest();
getRequest.setLimit(1000);
getRequest.setShardIterator(iterator);
Thread.sleep(SECONDS.toMillis(3));
// call "get" operation and get everything in this shard range
GetRecordsResult getResponse = client.getRecords(getRequest);
return getResponse.getRecords();
} catch(Exception rnfe) {
log.error("Failed to communicate with stream " + configuration.getConsumerStreamName(), rnfe);
}
return new ArrayList<>();
}
com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:102) ~[protobuf-java-3.8.0.jar:na]
at com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:625) ~[protobuf-java-3.8.0.jar:na]
at com.google.protobuf.CodedInputStream$ArrayDecoder.skipMessage(CodedInputStream.java:723) ~[protobuf-java-3.8.0.jar:na]
at com.google.protobuf.CodedInputStream$ArrayDecoder.skipField(CodedInputStream.java:655) ~[protobuf-java-3.8.0.jar:na]
at com.panpwr.platform.protobuf.messages.rawstream.ProtobufStreamEventHeader$ProtobufStreamEvent.<init>(ProtobufStreamEventHeader.java:92) ~[protobuf-4.0.0-SNAPSHOT.jar:na]
at com.panpwr.platform.protobuf.messages.rawstream.ProtobufStreamEventHeader$ProtobufStreamEvent.<init>(ProtobufStreamEventHeader.java:57) ~[protobuf-4.0.0-SNAPSHOT.jar:na]
at com.panpwr.platform.protobuf.messages.rawstream.ProtobufStreamEventHeader$ProtobufStreamEvent$1.parsePartialFrom(ProtobufStreamEventHeader.java:811) ~[protobuf-4.0.0-SNAPSHOT.jar:na]
at com.panpwr.platform.protobuf.messages.rawstream.ProtobufStreamEventHeader$ProtobufStreamEvent$1.parsePartialFrom(ProtobufStreamEventHeader.java:806) ~[protobuf-4.0.0-SNAPSHOT.jar:na]
at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158) ~[protobuf-java-3.8.0.jar:na]
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191) ~[protobuf-java-3.8.0.jar:na]
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:203) ~[protobuf-java-3.8.0.jar:na]
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:208) ~[protobuf-java-3.8.0.jar:na]
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48) ~[protobuf-java-3.8.0.jar:na]
at com.panpwr.platform.protobuf.messages.rawstream.ProtobufStreamEventHeader$ProtobufStreamEvent.parseFrom(ProtobufStreamEventHeader.java:334) ~[protobuf-4.0.0-SNAPSHOT.jar:na]
at com.panpwr.platform.streaming.impl.ProtoStreamEventHeader.deserialize(ProtoStreamEventHeader.java:43) ~[streaming-common-4.0.0-SNAPSHOT.jar:na]
at com.panpwr.platform.streaming.impl.StreamProtobufSerializer.deserialize(StreamProtobufSerializer.java:41) ~[streaming-common-4.0.0-SNAPSHOT.jar:na]
at com.panpwr.collector.CollectorSteps.lambda$retrieveDataFromStream$2(CollectorSteps.java:414) [main/:na]
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[na:1.8.0_221]
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) ~[na:1.8.0_221]
at com.panpwr.collector.CollectorSteps.retrieveDataFromStream(CollectorSteps.java:413) [main/:na]
at com.panpwr.collector.CollectorSteps.readMessages(CollectorSteps.java:366) [main/:na]
at com.panpwr.collector.CollectorSteps.checkErrorCorrection(CollectorSteps.java:330) [main/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_221]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_221]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_221]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_221]
at cucumber.runtime.Utils$1.call(Utils.java:26) ~[cucumber-core-4.2.0.jar:na]
at cucumber.runtime.Timeout.timeout(Timeout.java:16) ~[cucumber-core-4.2.0.jar:na]
at cucumber.runtime.Utils.invoke(Utils.java:20) ~[cucumber-core-4.2.0.jar:na]
at cucumber.runtime.java.JavaStepDefinition.execute(JavaStepDefinition.java:49) ~[cucumber-java-4.2.0.jar:na]
at cucumber.runner.PickleStepDefinitionMatch.runStep(PickleStepDefinitionMatch.java:50) ~[cucumber-core-4.2.0.jar:na]
at cucumber.runner.TestStep.executeStep(TestStep.java:63) ~[cucumber-core-4.2.0.jar:na]
at cucumber.runner.TestStep.run(TestStep.java:49) ~[cucumber-core-4.2.0.jar:na]
at cucumber.runner.PickleStepTestStep.run(PickleStepTestStep.java:43) ~[cucumber-core-4.2.0.jar:na]
at cucumber.runner.TestCase.run(TestCase.java:44) ~[cucumber-core-4.2.0.jar:na]
at cucumber.runner.Runner.runPickle(Runner.java:40) ~[cucumber-core-4.2.0.jar:na]
at cucumber.runtime.junit.PickleRunners$NoStepDescriptions.run(PickleRunners.java:146) ~[cucumber-junit-4.2.0.jar:na]
at cucumber.runtime.junit.FeatureRunner.runChild(FeatureRunner.java:68) ~[cucumber-junit-4.2.0.jar:na]
at cucumber.runtime.junit.FeatureRunner.runChild(FeatureRunner.java:23) ~[cucumber-junit-4.2.0.jar:na]
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) ~[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) ~[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) ~[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) ~[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) ~[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) ~[junit-4.12.jar:4.12]
at cucumber.runtime.junit.FeatureRunner.run(FeatureRunner.java:73) ~[cucumber-junit-4.2.0.jar:na]
at cucumber.api.junit.Cucumber.runChild(Cucumber.java:124) ~[cucumber-junit-4.2.0.jar:na]
at cucumber.api.junit.Cucumber.runChild(Cucumber.java:65) ~[cucumber-junit-4.2.0.jar:na]
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) ~[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) ~[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) ~[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) ~[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) ~[junit-4.12.jar:4.12]
at cucumber.api.junit.Cucumber$1.evaluate(Cucumber.java:133) ~[cucumber-junit-4.2.0.jar:na]
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) ~[junit-4.12.jar:4.12]
at org.junit.runner.JUnitCore.run(JUnitCore.java:137) ~[junit-4.12.jar:4.12]
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) ~[junit-rt.jar:na]
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) ~[junit-rt.jar:na]
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) ~[junit-rt.jar:na]
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) ~[junit-rt.jar:na]
暂无答案!
目前还没有任何答案,快来回答吧!