我正在尝试使用springboot实现一个多租户微服务。我已经实现了web层和持久层。在web层,我实现了一个过滤器,在原型bean中设置租户id(使用threadlocaltargetsource),在持久化层,我使用了hibernate多租户配置(每个租户的模式),它们工作正常,数据持久化在适当的模式中。目前我正在消息层上实现相同的行为,使用springkaka库,到目前为止ir的工作方式与我预期的一样,但是我想知道是否有更好的方法来实现它。
这是我的密码:
这是管理kafkamessagelistenercontainer的类:
@Component
public class MessagingListenerContainer {
private final MessagingProperties messagingProperties;
private KafkaMessageListenerContainer<String, String> container;
@PostConstruct
public void init() {
ContainerProperties containerProps = new ContainerProperties(
messagingProperties.getConsumer().getTopicsAsList());
containerProps.setMessageListener(buildCustomMessageListener());
container = createContainer(containerProps);
container.start();
}
@Bean
public MessageListener<String, String> buildCustomMessageListener() {
return new CustomMessageListener();
}
private KafkaMessageListenerContainer<String, String> createContainer(
ContainerProperties containerProps) {
Map<String, Object> props = consumerProps();
…
return container;
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
…
return props;
}
@PreDestroy
public void finish() {
container.stop();
}
}
这是custommessagelistener:
@Slf4j
public class CustomMessageListener implements MessageListener<String, String> {
@Autowired
private TenantStore tenantStore; // Prototype Bean
@Autowired
private List<ServiceListener> services;
@Override
public void onMessage(ConsumerRecord<String, String> record) {
log.info(“Tenant {} | Payload: {} | Record: {}", record.key(),
record.value(), record.toString());
tenantStore.setTenantId(record.key()); // Currently tenant is been setting as key
services.stream().forEach(sl -> sl.onMessage(record.value()));
}
}
这是一个测试服务,将使用消息数据和租户:
@Slf4j
@Service
public class ConsumerService implements ServiceListener {
private final MessagesRepository messages;
private final TenantStore tenantStore;
@Override
public void onMessage(String message) {
log.info("ConsumerService {}, tenant {}", message, tenantStore.getTenantId());
messages.save(new Message(message));
}
}
谢谢你的时间!
1条答案
按热度按时间2w3kk1z51#
只是要澄清一下(如果我错了,请纠正我):您对所有租户使用相同的主题。根据每个租户区分消息的方法是使用消息键,在您的情况下,消息键是租户id。
通过使用消息头来存储租户id而不是密钥,可以做一些轻微的改进。通过这样做,您将不局限于基于租户对消息进行分区。
虽然您描述的模型可以工作,但它有一个主要的安全问题。如果有人访问了你的主题,那么你将泄露所有租户的数据。
一种更安全的方法是使用主题命名约定和acl(访问控制列表)。你可以在这里找到一个简短的解释。简而言之,您可以使用后缀或前缀将租户的名称包含在主题名称中。e、 g:orders\u tenanta,orders\u tenantb或tenanta\u orders,tenantb\u orders
然后,使用acl可以限制哪些应用程序可以连接到这些特定主题。如果您的一个租户需要将其应用程序直接连接到您的kafka集群,那么此场景也很有用。