上下文:我想将发送到micronaut grpc端点的请求主体发送到kafka主题。grpc端点是从io自动生成的抽象类的实现。grpc:protoc-gen-grpc-kotlin
当前方法:为了向kafka主题发送从grpc端点接收的消息,我将其转换为dto,逐个属性。我试着使用proto的消息或者对象自动生成的一些属性作为存根,但是没有成功。我的方法真的是一个合理的方式来将收到的信息转换成Kafka可以接受的对象吗?在我看来,我是在重复工作和代码。
整个代码可以在我的git中找到
proto文件及其消息声明(我可以重用它们而不是在新的kotlin类中复制它们吗)
交易服务
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.mybank.endpoint";
option java_outer_classname = "TransactionsProto";
option objc_class_prefix = "HLW";
package com.mybank.endpoint;
import "google/protobuf/wrappers.proto";
service TransactionsService {
rpc PostTransactions(TransactionsRequest) returns (TransactionsReply);
}
message TransactionsRequest {
string transactionDesc = 1;
repeated Transaction transactions = 2;
}
message Transaction {
string id = 1;
string name = 2;
string description = 3;
}
message TransactionsReply {
string message = 1;
}
自生树桩
package com.mybank.endpoint
import com.mybank.endpoint.TransactionsServiceGrpc.getServiceDescriptor
import io.grpc.CallOptions
import io.grpc.CallOptions.DEFAULT
import io.grpc.Channel
import io.grpc.Metadata
import io.grpc.MethodDescriptor
import io.grpc.ServerServiceDefinition
import io.grpc.ServerServiceDefinition.builder
import io.grpc.ServiceDescriptor
import io.grpc.Status.UNIMPLEMENTED
import io.grpc.StatusException
import io.grpc.kotlin.AbstractCoroutineServerImpl
import io.grpc.kotlin.AbstractCoroutineStub
import io.grpc.kotlin.ClientCalls.unaryRpc
import io.grpc.kotlin.ServerCalls.unaryServerMethodDefinition
import io.grpc.kotlin.StubFor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.jvm.JvmOverloads
import kotlin.jvm.JvmStatic
/**
* Holder for Kotlin coroutine-based client and server APIs for
* com.mybank.endpoint.TransactionsService.
*/
object TransactionsServiceGrpcKt {
@JvmStatic
val serviceDescriptor: ServiceDescriptor
get() = TransactionsServiceGrpc.getServiceDescriptor()
val postTransactionsMethod: MethodDescriptor<TransactionsRequest, TransactionsReply>
@JvmStatic
get() = TransactionsServiceGrpc.getPostTransactionsMethod()
/**
* A stub for issuing RPCs to a(n) com.mybank.endpoint.TransactionsService service as suspending
* coroutines.
*/
@StubFor(TransactionsServiceGrpc::class)
class TransactionsServiceCoroutineStub @JvmOverloads constructor(
channel: Channel,
callOptions: CallOptions = DEFAULT
) : AbstractCoroutineStub<TransactionsServiceCoroutineStub>(channel, callOptions) {
override fun build(channel: Channel, callOptions: CallOptions): TransactionsServiceCoroutineStub
= TransactionsServiceCoroutineStub(channel, callOptions)
/**
* Executes this RPC and returns the response message, suspending until the RPC completes
* with [`Status.OK`][io.grpc.Status]. If the RPC completes with another status, a
* corresponding
* [StatusException] is thrown. If this coroutine is cancelled, the RPC is also cancelled
* with the corresponding exception as a cause.
*
* @param request The request message to send to the server.
*
* @return The single response from the server.
*/
suspend fun postTransactions(request: TransactionsRequest): TransactionsReply = unaryRpc(
channel,
TransactionsServiceGrpc.getPostTransactionsMethod(),
request,
callOptions,
Metadata()
)}
/**
* Skeletal implementation of the com.mybank.endpoint.TransactionsService service based on Kotlin
* coroutines.
*/
abstract class TransactionsServiceCoroutineImplBase(
coroutineContext: CoroutineContext = EmptyCoroutineContext
) : AbstractCoroutineServerImpl(coroutineContext) {
/**
* Returns the response to an RPC for com.mybank.endpoint.TransactionsService.PostTransactions.
*
* If this method fails with a [StatusException], the RPC will fail with the corresponding
* [io.grpc.Status]. If this method fails with a [java.util.concurrent.CancellationException],
* the RPC will fail
* with status `Status.CANCELLED`. If this method fails for any other reason, the RPC will
* fail with `Status.UNKNOWN` with the exception as a cause.
*
* @param request The request from the client.
*/
open suspend fun postTransactions(request: TransactionsRequest): TransactionsReply = throw
StatusException(UNIMPLEMENTED.withDescription("Method com.mybank.endpoint.TransactionsService.PostTransactions is unimplemented"))
final override fun bindService(): ServerServiceDefinition = builder(getServiceDescriptor())
.addMethod(unaryServerMethodDefinition(
context = this.context,
descriptor = TransactionsServiceGrpc.getPostTransactionsMethod(),
implementation = ::postTransactions
)).build()
}
}
实现自动生成存根的类
package com.mybank.endpoint
import com.mybank.dto.Transactions
import com.mybank.dto.Transaction
import com.mybank.service.TransactionService
import javax.inject.Singleton
@Singleton
class TransactionsEndpoint(val transactionService: TransactionService) : TransactionsServiceGrpcKt.TransactionsServiceCoroutineImplBase(){
override suspend fun postTransactions(request: TransactionsRequest): TransactionsReply {
var dtoTransactions : Transactions = Transactions()
var dtoTransaction : Transaction = Transaction()
print(request.getTransactionDesc())
print(request.getTransactionsList())
print(request.getTransactions(0))
dtoTransaction.id = request.getTransactions(0).id
dtoTransaction.name = request.getTransactions(0).name
dtoTransaction.description = request.getTransactions(0).description
dtoTransactions.transactions = listOf(dtoTransaction, dtoTransaction)
dtoTransactions.transactionDesc = request.getTransactionDesc()
transactionService.postDebitTransaction(dtoTransaction)
transactionService.postDebitTransactions(dtoTransactions)
return TransactionsReply.newBuilder().setMessage("testReply").build()
}
}
只不过是从proto复制消息的dto
package com.mybank.dto
class Transaction {
lateinit var id: String
lateinit var name: String
lateinit var description: String
}
和
package com.mybank.dto
class Transactions {
lateinit var transactionDesc: String
var transactions: List<Transaction>? = null
}
与此问题无关,但下面是制作人与DTO的合作
import com.mybank.dto.Transaction
import com.mybank.dto.Transactions
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
@KafkaClient
public interface DebitProducer {
@Topic("debit")
fun sendRequestMessage(@KafkaKey key: String?, message: String?) {
}
@Topic("debit")
fun sendRequestMessage( transaction: Transaction) {
}
@Topic("debit")
fun sendRequestMessage( transactions: Transactions) {
}
fun sendRequestMessage(@Topic topic: String?, @KafkaKey day: String?, message: String?) {}
}
构建.gradle
plugins {
id "org.jetbrains.kotlin.jvm" version "1.3.72"
id "org.jetbrains.kotlin.kapt" version "1.3.72"
id "org.jetbrains.kotlin.plugin.allopen" version "1.3.72"
id "application"
id 'com.google.protobuf' version '0.8.13'
}
version "0.2"
group "account-control"
repositories {
mavenLocal()
jcenter()
}
configurations {
// for dependencies that are needed for development only
developmentOnly
}
dependencies {
kapt(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
kapt("io.micronaut:micronaut-inject-java")
kapt("io.micronaut:micronaut-validation")
implementation(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlinVersion}")
implementation("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion")
implementation("io.micronaut:micronaut-runtime")
// implementation("io.micronaut.grpc:micronaut-grpc-runtime")
implementation("io.micronaut.grpc:micronaut-grpc-server-runtime:$micronautGrpcVersion")
implementation("io.micronaut.grpc:micronaut-grpc-client-runtime:$micronautGrpcVersion")
implementation("io.grpc:grpc-kotlin-stub:${grpcKotlinVersion}")
//Kafka
implementation("io.micronaut.kafka:micronaut-kafka")
runtimeOnly("ch.qos.logback:logback-classic:1.2.3")
runtimeOnly("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8")
kaptTest("io.micronaut:micronaut-inject-java")
testImplementation enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.3.0")
testImplementation("io.micronaut.test:micronaut-test-junit5")
testImplementation("org.mockito:mockito-junit-jupiter:2.22.0")
testRuntime("org.junit.jupiter:junit-jupiter-engine:5.3.0")
testRuntime("org.jetbrains.spek:spek-junit-platform-engine:1.1.5")
}
test.classpath += configurations.developmentOnly
mainClassName = "account-control.Application"
test {
useJUnitPlatform()
}
allOpen {
annotation("io.micronaut.aop.Around")
}
compileKotlin {
kotlinOptions {
jvmTarget = '11'
//Will retain parameter names for Java reflection
javaParameters = true
}
}
//compileKotlin.dependsOn(generateProto)
compileTestKotlin {
kotlinOptions {
jvmTarget = '11'
javaParameters = true
}
}
tasks.withType(JavaExec) {
classpath += configurations.developmentOnly
jvmArgs('-XX:TieredStopAtLevel=1', '-Dcom.sun.management.jmxremote')
}
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/grpckt'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
protobuf {
protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
plugins {
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
grpckt { artifact = "io.grpc:protoc-gen-grpc-kotlin:${grpcKotlinVersion}" }
}
generateProtoTasks {
all()*.plugins {
grpc {}
grpckt {}
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!