我正在用r2dbc探索Spring Data,我正在尝试在我的接口中使用一个自定义存储库方法,该方法是从ReactiveCrudRepository扩展而来的,它将对象的通量作为参数,并返回同一对象的通量。
@Query("SELECT id, name FROM Person WHERE id = ?")
Flux<Person> myMethod(Flux<Person> person);
我希望它具有与Flux findAllById(发布者idStream)方法相同的行为。
这是我的接口存储库定义:
public interface PersonRepository extends ReactiveCrudRepository<Person, Person> {
@Query("SELECT id, name FROM Person WHERE id = ?")
Flux<Person> myMethod(Flux<Person> person);
}
这是我的实体:
@Getter
@Setter
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
@Table(value = "Person", schema = "mySchema")
public class Person {
@Id
@Column("id")
Long id;
@Column("name")
String name;
}
这是我的数据库配置类:
@Configuration
@EnableR2dbcRepositories
@EnableR2dbcAuditing
@EnableTransactionManagement
public class DatabaseConfiguration extends AbstractR2dbcConfiguration {
......
}
这是我的主要应用程序:
@SpringBootApplication
@EnableR2dbcAuditing
@EnableConfigurationProperties({ApplicationProperties.class})
public class MyApplication {
private static final Logger logger = LoggerFactory.getLogger(MyApplication.class);
@Bean
public CommandLineRunner consume(PersonRepository personRepository) {
LongSupplier randomLong = () -> {
return RandomUtils.nextLong(10L,20L);
};
Flux<Long> personIds = Flux.fromStream(LongStream.generate(randomLong).boxed());
Flux<Person> persons = personIds.map( p -> {
Person person = new Person();
person.setId(p);
return person;
});
personRepository.myMethod(persons.take(3)).subscribe( id -> logger.info("Processed Person Id: " + id));
return null;
}
.......
}
当使用接口方法findById(Publisher id)时,它可以工作,但当尝试使用带有quentry注解的自定义方法myMethod(Flux id)时,我遇到以下异常:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: Value must not be null
Caused by: java.lang.IllegalArgumentException: **Value must not be null**
at org.springframework.util.Assert.notNull(Assert.java:201)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoMapFuseable] :
reactor.core.publisher.Mono.map(Mono.java:3411)
org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery.createQuery(StringBasedR2dbcQuery.java:156)
Error has been observed at the following site(s):
*__________Mono.map ⇢ at org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery.createQuery(StringBasedR2dbcQuery.java:156)
|_ Mono.flatMapMany ⇢ at org.springframework.data.r2dbc.repository.query.AbstractR2dbcQuery.execute(AbstractR2dbcQuery.java:88)
*____Flux.usingWhen ⇢ at org.springframework.data.repository.core.support.RepositoryMethodInvoker$ReactiveInvocationListenerDecorator.decorate(RepositoryMethodInvoker.java:242)
Original Stack Trace:
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.r2dbc.core.Parameter.from(Parameter.java:54)
at org.springframework.data.r2dbc.query.QueryMapper.getBindValue(QueryMapper.java:426)
at org.springframework.data.r2dbc.core.DefaultReactiveDataAccessStrategy.getBindValue(DefaultReactiveDataAccessStrategy.java:288)
at org.springframework.data.r2dbc.repository.query.ExpressionEvaluatingParameterBinder.getBindValue(ExpressionEvaluatingParameterBinder.java:134)
at org.springframework.data.r2dbc.repository.query.ExpressionEvaluatingParameterBinder.bindExpressions(ExpressionEvaluatingParameterBinder.java:82)
at org.springframework.data.r2dbc.repository.query.ExpressionEvaluatingParameterBinder.bind(ExpressionEvaluatingParameterBinder.java:72)
at org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery$ExpandedQuery.<init>(StringBasedR2dbcQuery.java:197)
at org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery.lambda$createQuery$0(StringBasedR2dbcQuery.java:156)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:368)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onComplete(FluxConcatMap.java:276)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:148)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:94)
at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8642)
at reactor.core.publisher.Flux.subscribe(Flux.java:8439)
at reactor.core.publisher.Flux.subscribe(Flux.java:8363)
at reactor.core.publisher.Flux.subscribe(Flux.java:8306)
我在Spring参考文档中找不到类似的用例,我想知道这是否受支持。
1条答案
按热度按时间lmvvr0a81#
我用传入和传出
Flux
检查了@Query
注解,发现@Query
注解确实不支持传入Publisher
。我还从下面的测试代码中获得了
Value must not be null
:但是如果你删除
@Query
,它实际上是工作的:其他领域也是如此:
在我看来,
@Query
注解在这一点上似乎不支持订阅传入参数Publisher
(即,它在内部不支持订阅.flatMap()
)。