使用rdkafka将多个组id分配给多个使用者

fykwrbwg  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(293)

我在php中使用rdkafka来实现并行运行。
但是第一个消费者消耗了主题的所有消息,所以第二个消费者没有得到主题的消息。
所以我对不同的消费者使用了不同的组id,同样的问题也会出现。
请帮帮我。 $ conf->set('group.id','myconsumergroup'); $ conf->set('metadata.broker.list','127.0.0.2'); $ topicconf=new rdkafka\topicconf(); $ topicconf->set('auto.offset.reset','最小'); $ conf->setdefaulttopicconf( $ 主题配置); $ consumer1=新rdkafka\kafkaconsumer( $ 形态); $ consumer1->订阅(['dbtest']);while(真){ $ j=0; $ 消息= $ 消费1->消费(120*1000);
开关( $ 消息->错误){case rd\u kafka\u resp\u err\u no\u error: $ dataex=json\u解码( $ 消息->有效载荷,真);变量转储( $ 数据); $ sql=“插入emp(名称,电子邮件)值('”)。 $ dataex['名称']。“,'”。 $ dataex['电子邮件']。“)”;

`$`servername = "localhost";
    `$`username = "A";
    `$`password = "ASD";
    `$`dbname = "test";
    `$`conn = new mysqli(`$`servername, `$`username, `$`password, `$`dbname);
    if (`$`conn->connect_error) {
        die("Connection failed: " . `$`conn->connect_error);
    }

    if (`$`conn->query(`$`sql) === TRUE) {
        echo "New record created successfully to datbase ".`$`dbname."/n";
    } else {
        echo "Error: " . `$`sql . "<br>" . `$`conn->error;
    }
    `$`conn->close();
    echo "produced `$`j----------------------<br> ";
    break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
    echo "No more messages; will wait for more\n";
    break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
    echo "Timed out\n";
    break;
default:
    throw new \Exception(`$`message->errstr(), `$`message->err);
    break;
}
$j++;

}呼应“这里”; $ conf->set('group.id','myconsumergroup1'); $ conf->set('metadata.broker.list','127.0.0.2'); $ topicconf=new rdkafka\topicconf(); $ topicconf->set('auto.offset.reset','最小'); $ conf->setdefaulttopicconf( $ 主题配置); $ consumer2=新rdkafka\kafkaconsumer( $ 形态);呼应“索纳利”; $ consumer2->订阅(['dbtest']);while(真){ $ 消息2= $ 消费2->消费(120*1000);开关( $ message2->err){case rd\u kafka\u resp\u err\u no\u错误: $ dataex=json\u解码( $ 消息2->有效载荷,真)``$sql=“插入emp(名称,电子邮件)值('”..$dataex['name']。“,'”。 $ dataex['电子邮件']。“)”$servername1=“本地主机”$用户名1=“a”$password1=“asd”$dbname1=“测试1”; $ conn1=新mysqli( $ 服务器名1, $ 用户名1, $ 密码1, $ 数据库名称1);如果( $ conn1->connect\u error){die(“连接失败:”)。 $ conn1->连接(错误);}

if (`$`conn1->query(`$`sql) === TRUE) {
        echo "New record created successfully ".`$`dbname1;
    } else {
        echo "Error: " . `$`sql . "<br>" . `$`conn1->error;
    }
    `$`conn1->close();
    echo "produced `$`j----------------------<br> ";
    break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
    echo "No more messages; will wait for more\n";
    break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
    echo "Timed out\n";
    break;
default:
    throw new \Exception(`$`message->errstr(), `$`message->err);
    break;
}
`$`j++;

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题