bigdata—分布式运行模式下kafka producer的唯一事务ID

bqjvbblv  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(408)

我有一个大数据应用程序,它是基于流程consume->process->Product的。我在摄取管道中使用kafka,并且使用事务生产者来生成消息。我的应用程序的所有部分都运行良好,但是在为事务生产者生成id时有一个小问题。脚本:
假设我的应用程序运行在一台机器上,我示例化了2个消费者,他们有自己的生产者,因此例如,假设生产者1有事务id->consumer-0-producer生产者2有事务id->consumer-1-producer现在这两个生产者发起的事务不会相互干扰,这就是我想要的。伪代码如下所示:

ExecutorService executorService// responsible for starting my consumers
for (int i = 0; i < 2; i++) {
    prod_trans_id = "consumer-" + str(i) + "-producer"
    Custom_Consumer consumer = new Custom_Consumer(prod_trans_id)
    executorService.submit(consumer)
}

如果我的应用程序在一台机器上运行,那么这就非常好了,但是,情况并非如此,因为应用程序需要在多台机器上运行,因此当相同的代码在机器2上运行时,由机器2上的使用者示例化的生产者将具有与机器1上相同的事务id。我希望事务ID的生成方式既不相互冲突,又具有可复制性,这意味着如果应用程序崩溃/停止(比如有人这样做) service application stop 然后 service application start )当它重新联机时,它应该使用与以前相同的事务ID。我想到了基于uuid的方法,但是,uuid是随机的,当一台机器上的应用程序死机并重新联机时,uuid就不一样了。

w1jd8yoj

w1jd8yoj1#

private final static String HOSTNAME_COMMAND = "hostname";

public static String getHostName() {
        BufferedReader inputStreamReader = null;
        BufferedReader errorStreamReader = null;
        try {
            Process process = Runtime.getRuntime().exec(HOSTNAME_COMMAND);
            inputStreamReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
            errorStreamReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
            if (errorStreamReader.readLine() != null) {
                throw new RuntimeException(String.format("Failed to get the hostname, exception message: %s",
                        errorStreamReader.readLine()));
            }
            return inputStreamReader.readLine();
        } catch (IOException e) {
            try {
                if (inputStreamReader != null) {
                    inputStreamReader.close();
                }
                if (errorStreamReader != null) {
                    errorStreamReader.close();
                }
            } catch (IOException e1) {
                LogExceptionTrace.logExceptionStackTrace(e1);
                throw new RuntimeException(e1);
            }
            LogExceptionTrace.logExceptionStackTrace(e);
            throw new RuntimeException(e);
        }
    }

然后按如下方式使用主机名:

final String producerTransactionalID = String.format("%s_producer", this.consumerName);

其中使用者名称设置如下:

for (int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
            String consumerName = String.format("%s-worker-%d", hostName, i);
            Executor executor = new Executor(
                    Configuration, consumerName
            );
            Executors.add(executor);
            futures.add(executorService.submit(executor));
        }

相关问题