我正在尝试将kafka用作控制器中的消息队列(so任务):
@RequestMapping("users")
@RestController
public class UserController extends BaseController {
private static final String KAFKA_TOPIC = "virto_users";
@Autowired
private KafkaTemplate<String, String> mKafkaTemplate;
@PutMapping(value = "{id}")
public ResponseEntity<String> put(@PathVariable UUID id,
@RequestBody String profile) {
String url = ServerConfig.USERS_HOST + "/users/" + id;
ResponseEntity<String> entity = request.put(url, profile);
HttpStatus status = entity.getStatusCode();
if (status == HttpStatus.CREATED || status == HttpStatus.NO_CONTENT) {
return entity;
}
JSONObject json = new JSONObject();
json.put("id", id);
json.put("profile", profile);
sendMessage(json.toString());
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
}
public void sendMessage(String msg) {
mKafkaTemplate.send(KAFKA_TOPIC, msg);
}
@KafkaListener(topics = KAFKA_TOPIC, groupId = KafkaConfig.GROUP_ID)
public void listen(String message) {
JSONObject json = new JSONObject(message);
UUID id = UUID.fromString(json.getString("id"));
String profile = json.getString("profile");
put(id, profile);
}
}
如果我的内部服务可用并返回已创建或无内容,则一切正常,我可以返回结果,否则我需要返回无内容状态,并将此消息放入我的队列中重试,直到它被处理为止。我像上面那样做的,但这看起来不是个好办法。我只是想问一些建议,我如何才能改善这个解决方案,或它是正常的。
暂无答案!
目前还没有任何答案,快来回答吧!