堆栈:micronaut kafka、proto3、grpc、bloomrpc
目标:接收来自bloomrpc的grpc请求并将其发布到kafka主题
上下文:我编写了一个grpc端点,它成功地接收了一个简单的调用,但是当我试图将这样的对象发布到kafka主题时,我得到了
12:38:18.775 [DefaultDispatcher-worker-1] ERROR i.m.r.intercept.RecoveryInterceptor - Type [com.mybank.producer.DebitProducer$Intercepted] executed with error: Exception sending producer record for method [void sendRequestMessage(String key,DebitRequest message)]: Error serializing object to JSON: Direct self-reference leading to cycle (through reference chain: com.mybank.endpoint.DebitRequest["unknownFields"]->com.google.protobuf.UnknownFieldSet["defaultInstanceForType"])
io.micronaut.messaging.exceptions.MessagingClientException: Exception sending producer record for method [void sendRequestMessage(String key,DebitRequest message)]: Error serializing object to JSON: Direct self-reference leading to cycle (through reference chain: com.mybank.endpoint.DebitRequest["unknownFields"]->com.google.protobuf.UnknownFieldSet["defaultInstanceForType"])
at io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice.wrapException(KafkaClientIntroductionAdvice.java:564)
我想我的问题与“…将对象序列化为json时出错”有关。我真的要把这样的对象转换成json吗?我能不能把它贴到Kafka的主题上?我错过了什么?好吧,我可以通过创建一个新对象并根据请求对象设置每个属性来轻松解决这个问题,如下所示:
端点(控制器)方法
override suspend fun sendDebit(request: DebitRequest): DebitReply {
var dtoDebiter: Debiter = Debiter()
dtoDebiter.id = request.id.toString()
dtoDebiter.name = request.name
transactionService.postDebitTransaction(dtoDebiter)
return DebitReply.newBuilder().setMessage(postStatus).build()
}
仅为转换而创建的模型(这个解决方案看起来不奇怪吗??)
class Debiter {
lateinit var id: String
lateinit var name: String
}
所以,我的直接问题是:我是否可以重用从proto文件中自动生成的相同存根作为模型,用于接收请求并将其发布到kafka主题?如果是这样,下面的代码有什么问题?如果没有,建议采用什么方法?
完整代码:
事务处理.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.mybank.endpoint";
option java_outer_classname = "TransactionProto";
option objc_class_prefix = "HLW";
package com.mybank.endpoint;
service Account {
rpc SendDebit (DebitRequest) returns (DebitReply) {}
}
message DebitRequest {
int64 id = 1;
string name = 2;
}
message DebitReply {
string message = 1;
}
事务终结点(控制器)
package com.mybank.endpoint
import com.mybank.dto.Debiter
import com.mybank.service.TransactionService
import javax.inject.Singleton
@Singleton
@Suppress("unused")
class TransactionEndpoint(val transactionService: TransactionService) : AccountGrpcKt.AccountCoroutineImplBase(){
override suspend fun sendDebit(request: DebitRequest): DebitReply {
var postStatus: String = transactionService.postDebitTransaction(request)
return DebitReply.newBuilder().setMessage(postStatus).build()
}
}
事务处理服务(与此无关)
package com.mybank.service
import com.mybank.dto.Debiter
import com.mybank.dto.Transaction
import com.mybank.dto.Transactions
import com.mybank.endpoint.DebitRequest
import com.mybank.producer.DebitProducer
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
@Singleton
class TransactionService(){
@Inject
@Named("debitProducer")
lateinit var debitProducer : DebitProducer
fun postDebitTransaction(debit: DebitRequest) : String{
debitProducer.sendRequestMessage ("1", debit)
return "posted"
}
}
Kafka制作人
package com.mybank.producer
import com.mybank.dto.Transaction
import com.mybank.dto.Transactions
import com.mybank.endpoint.DebitRequest
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
@KafkaClient
public interface DebitProducer {
@Topic("debit")
fun sendRequestMessage(@KafkaKey key: String?, message: DebitRequest?) {
}
}
构建.gradle
plugins {
id "org.jetbrains.kotlin.jvm" version "1.3.72"
id "org.jetbrains.kotlin.kapt" version "1.3.72"
id "org.jetbrains.kotlin.plugin.allopen" version "1.3.72"
id "application"
id 'com.google.protobuf' version '0.8.13'
}
version "0.2"
group "account-control"
repositories {
mavenLocal()
jcenter()
}
configurations {
// for dependencies that are needed for development only
developmentOnly
}
dependencies {
kapt(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
kapt("io.micronaut:micronaut-inject-java")
kapt("io.micronaut:micronaut-validation")
implementation(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlinVersion}")
implementation("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion")
implementation("io.micronaut:micronaut-runtime")
// implementation("io.micronaut.grpc:micronaut-grpc-runtime")
implementation("io.micronaut.grpc:micronaut-grpc-server-runtime:$micronautGrpcVersion")
implementation("io.micronaut.grpc:micronaut-grpc-client-runtime:$micronautGrpcVersion")
implementation("io.grpc:grpc-kotlin-stub:${grpcKotlinVersion}")
//Kafka
implementation("io.micronaut.kafka:micronaut-kafka")
//vertx
implementation("io.micronaut.sql:micronaut-vertx-mysql-client")
//implementation("io.micronaut.configuration:micronaut-vertx-mysql-client")
compile 'io.vertx:vertx-lang-kotlin:3.9.4'
//mongodb
implementation("org.mongodb:mongodb-driver-reactivestreams:4.1.1")
runtimeOnly("ch.qos.logback:logback-classic:1.2.3")
runtimeOnly("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8")
kaptTest("io.micronaut:micronaut-inject-java")
testImplementation enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.3.0")
testImplementation("io.micronaut.test:micronaut-test-junit5")
testImplementation("org.mockito:mockito-junit-jupiter:2.22.0")
testRuntime("org.junit.jupiter:junit-jupiter-engine:5.3.0")
testRuntime("org.jetbrains.spek:spek-junit-platform-engine:1.1.5")
}
test.classpath += configurations.developmentOnly
mainClassName = "account-control.Application"
test {
useJUnitPlatform()
}
allOpen {
annotation("io.micronaut.aop.Around")
}
compileKotlin {
kotlinOptions {
jvmTarget = '11'
//Will retain parameter names for Java reflection
javaParameters = true
}
}
//compileKotlin.dependsOn(generateProto)
compileTestKotlin {
kotlinOptions {
jvmTarget = '11'
javaParameters = true
}
}
tasks.withType(JavaExec) {
classpath += configurations.developmentOnly
jvmArgs('-XX:TieredStopAtLevel=1', '-Dcom.sun.management.jmxremote')
}
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/grpckt'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
protobuf {
protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
plugins {
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
grpckt { artifact = "io.grpc:protoc-gen-grpc-kotlin:${grpcKotlinVersion}" }
}
generateProtoTasks {
all()*.plugins {
grpc {}
grpckt {}
}
}
}
如果是相关的,这里是proto自动生成的存根的一部分
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: transaction.proto
package com.mybank.endpoint;
/**
* Protobuf type {@code com.mybank.endpoint.DebitRequest}
*/
public final class DebitRequest extends
com.google.protobuf.GeneratedMessageV3 implements
// @@protoc_insertion_point(message_implements:com.mybank.endpoint.DebitRequest)
DebitRequestOrBuilder {
private static final long serialVersionUID = 0L;
// Use DebitRequest.newBuilder() to construct.
private DebitRequest(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
super(builder);
}
private DebitRequest() {
name_ = "";
}
@java.lang.Override
@SuppressWarnings({"unused"})
protected java.lang.Object newInstance(
UnusedPrivateParameter unused) {
return new DebitRequest();
}
@java.lang.Override
public final com.google.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private DebitRequest(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
this();
...
***第一版
@KafkaClient(
id="debit-client",
acks = KafkaClient.Acknowledge.ALL,
properties = [Property(name = ProducerConfig.RETRIES_CONFIG, value = "5")]
//How add these two properties in order to use Protobuf Serializer
//kafka.producers.*.key-serializer
//kafka.producers.*.value-serializer
)
public interface DebitProducer {
@Topic("debit")
fun sendRequestMessage(@KafkaKey key: String?, message: DebitRequest?) {
}
***第二版
我尝试通过application.yaml添加序列化程序
micronaut:
application:
name: account-control
grpc:
server:
port: 8082
kafka:
producers:
product-client:
value:
serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
gradle.build公司
//kafka-protobuf-serializer
implementation("io.confluent:kafka-protobuf-serializer:6.0.0")
现在我在gradle建筑期间
Execution failed for task ':extractIncludeProto'.
> Could not resolve all files for configuration ':compileProtoPath'.
> Could not resolve com.squareup.wire:wire-schema:3.2.2.
Required by:
project : > io.confluent:kafka-protobuf-serializer:6.0.0 > io.confluent:kafka-protobuf-provider:6.0.0
> The consumer was configured to find a component, preferably only the resources files. However we cannot choose between the following variants of com.squareup.wire:wire-schema:3.2.2:
- jvm-api
- jvm-runtime
- metadata-api
All of them match the consumer attributes:
- Variant 'jvm-api' capability com.squareup.wire:wire-schema:3.2.2 declares a component, packaged as a jar:
- Unmatched attributes:
- Provides release status but the consumer didn't ask for it
- Provides an API but the consumer didn't ask for it
- Provides attribute 'org.jetbrains.kotlin.platform.type' with value 'jvm' but the consumer didn't ask for it
- Variant 'jvm-runtime' capability com.squareup.wire:wire-schema:3.2.2 declares a component, packaged as a jar:
- Unmatched attributes:
- Provides release status but the consumer didn't ask for it
- Provides a runtime but the consumer didn't ask for it
- Provides attribute 'org.jetbrains.kotlin.platform.type' with value 'jvm' but the consumer didn't ask for it
- Variant 'metadata-api' capability com.squareup.wire:wire-schema:3.2.2:
- Unmatched attributes:
- Doesn't say anything about its elements (required them preferably only the resources files)
- Provides release status but the consumer didn't ask for it
- Provides a usage of 'kotlin-api' but the consumer didn't ask for it
- Provides attribute 'org.jetbrains.kotlin.platform.type' with value 'common' but the consumer didn't ask for it
* Try:
Run with --info or --debug option to get more log output. Run with --scan to get full insights.
* Exception is:
org.gradle.api.tasks.TaskExecutionException: Execution failed for task ':extractIncludeProto'.
at org.gradle.api.internal.tasks.execution.CatchExceptionTaskExecuter.execute(CatchExceptionTaskExecuter.java:38)
at org.gradle.api.internal.tasks.execution.EventFiringTaskExecuter$1.executeTask(EventFiringTaskExecuter.java:77)
at org.gradle.api.internal.tasks.execution.EventFiringTaskExecuter$1.call(EventFiringTaskExecuter.java:55)
at org.gradle.api.internal.tasks.execution.EventFiringTaskExecuter$1.call(EventFiringTaskExecuter.java:52)
at org.gradle.internal.operations.DefaultBuildOperationExecutor$CallableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:416)
at org.gradle.internal.operations.DefaultBuildOperationExecutor$CallableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:406)
at org.gradle.internal.operations.DefaultBuildOperationExecutor$1.execute(DefaultBuildOperationExecutor.java:165)
at org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:250)
at org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:158)
at org.gradle.internal.operations.DefaultBuildOperationExecutor.call(DefaultBuildOperationExecutor.java:102)
at org.gradle.internal.operations.DelegatingBuildOperationExecutor.call(DelegatingBuildOperationExecutor.java:36)
at org.gradle.api.internal.tasks.execution.EventFiringTaskExecuter.execute(EventFiringTaskExecuter.java:52)
at org.gradle.execution.plan.LocalTaskNodeExecutor.execute(LocalTaskNodeExecutor.java:41)
at org.gradle.execution.taskgraph.DefaultTaskExecutionGraph$InvokeNodeExecutorsAction.execute(DefaultTaskExecutionGraph.java:370)
at org.gradle.execution.taskgraph.DefaultTaskExecutionGraph$InvokeNodeExecutorsAction.execute(DefaultTaskExecutionGraph.java:357)
at org.gradle.execution.taskgraph.DefaultTaskExecutionGraph$BuildOperationAwareExecutionAction.execute(DefaultTaskExecutionGraph.java:350)
at org.gradle.execution.taskgraph.DefaultTaskExecutionGraph$BuildOperationAwareExecutionAction.execute(DefaultTaskExecutionGraph.java:336)
at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.lambda$run$0(DefaultPlanExecutor.java:127)
at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.execute(DefaultPlanExecutor.java:191)
at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.executeNextNode(DefaultPlanExecutor.java:182)
at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.run(DefaultPlanExecutor.java:124)
at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
Caused by: org.gradle.api.internal.artifacts.ivyservice.DefaultLenientConfiguration$ArtifactResolveException: Could not resolve all files for configuration ':compileProtoPath'.
at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration.rethrowFailure(DefaultConfiguration.java:1265)
at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration.access$1800(DefaultConfiguration.java:141)
at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration$ConfigurationFileCollection.visitContents(DefaultConfiguration.java:1242)
at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration$ConfigurationFileCollection.visitContents(DefaultConfiguration.java:1235)
at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration.visitContents(DefaultConfiguration.java:489)
at org.gradle.api.internal.file.AbstractFileCollection.visitStructure(AbstractFileCollection.java:265)
at org.gradle.api.internal.file.CompositeFileCollection.visitContents(CompositeFileCollection.java:152)
at org.gradle.api.internal.file.AbstractFileCollection.visitStructure(AbstractFileCollection.java:265)
at org.gradle.internal.fingerprint.impl.DefaultFileCollectionSnapshotter.snapshot(DefaultFileCollectionSnapshotter.java:50)
at org.gradle.internal.fingerprint.impl.AbstractFileCollectionFingerprinter.fingerprint(AbstractFileCollectionFingerprinter.java:47)
at ...
ultPlanExecutor.java:182)
at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.run(DefaultPlanExecutor.java:124)
at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
暂无答案!
目前还没有任何答案,快来回答吧!