在zookeeper中使用zab协议进行广播

0tdrvxhp  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(279)

早上好,
我是新的zookeeper和它的协议,我对它的广播协议zab感兴趣。
你能给我提供一个简单的java代码,它使用zookeeper的zab协议吗?我一直在搜索,但我没有成功地找到一个代码,显示如何使用zab。
事实上,我需要的很简单,我有一个mapreduce代码,我希望所有的Map者在找到更好的x值(即更大的值)时更新一个变量(比如x)。在这种情况下,领导者必须比较旧值和新值,然后向所有Map者广播实际的最佳值。我怎么能用java做这样的事?
提前谢谢,谢谢

vatpfxk5

vatpfxk51#

你不需要使用zab协议。您可以按照以下步骤操作:
你在zookeeper上有一个znode say/bigvalue。所有的Map程序在启动时读取存储在其中的值。他们还监视znode上的数据变化。每当Map器获得更好的值时,它就会用更好的值更新znode。所有Map程序都将收到数据更改事件的通知,它们将读取新的最佳值,并重新建立数据更改监视。这样,它们与最新的最佳值同步,并且可以在有更好的值时更新最新的最佳值。
实际上,zkclient是一个非常好的库,可以与zookeeper一起使用,它隐藏了很多复杂性(https://github.com/sgroschupf/zkclient ). 下面的示例演示了如何监视znode“/bigvalue”的任何数据更改。

  1. package geet.org;
  2. import java.io.UnsupportedEncodingException;
  3. import org.I0Itec.zkclient.IZkDataListener;
  4. import org.I0Itec.zkclient.ZkClient;
  5. import org.I0Itec.zkclient.exception.ZkMarshallingError;
  6. import org.I0Itec.zkclient.exception.ZkNodeExistsException;
  7. import org.I0Itec.zkclient.serialize.ZkSerializer;
  8. import org.apache.zookeeper.data.Stat;
  9. public class ZkExample implements IZkDataListener, ZkSerializer {
  10. public static void main(String[] args) {
  11. String znode = "/bigvalue";
  12. ZkExample ins = new ZkExample();
  13. ZkClient cl = new ZkClient("127.0.0.1", 30000, 30000,
  14. ins);
  15. try {
  16. cl.createPersistent(znode);
  17. } catch (ZkNodeExistsException e) {
  18. System.out.println(e.getMessage());
  19. }
  20. // Change the data for fun
  21. Stat stat = new Stat();
  22. String data = cl.readData(znode, stat);
  23. System.out.println("Current data " + data + "version = " + stat.getVersion());
  24. cl.writeData(znode, "My new data ", stat.getVersion());
  25. cl.subscribeDataChanges(znode, ins);
  26. try {
  27. Thread.sleep(36000);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. @Override
  33. public void handleDataChange(String dataPath, Object data) throws Exception {
  34. System.out.println("Detected data change");
  35. System.out.println("New data for " + dataPath + " " + (String)data);
  36. }
  37. @Override
  38. public void handleDataDeleted(String dataPath) throws Exception {
  39. System.out.println("Data deleted " + dataPath);
  40. }
  41. @Override
  42. public byte[] serialize(Object data) throws ZkMarshallingError {
  43. if (data instanceof String){
  44. try {
  45. return ((String) data).getBytes("UTF-8");
  46. } catch (UnsupportedEncodingException e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. return null;
  51. }
  52. @Override
  53. public Object deserialize(byte[] bytes) throws ZkMarshallingError {
  54. try {
  55. return new String(bytes, "UTF-8");
  56. } catch (UnsupportedEncodingException e) {
  57. e.printStackTrace();
  58. }
  59. return null;
  60. }
  61. }
展开查看全部

相关问题