java在hadoop上实现apriori算法

kyvafyod  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(396)

我试图用hadoop实现apriori算法。我已经实现了apriori算法的非分布式版本,但是我对hadoop和mapreduce的不熟悉导致了一些问题。
我想要实现算法的方法分为两个阶段:
1) 在第一阶段,map reduce作业将对原始事务数据集进行操作。此阶段的输出是一个包含所有1项集及其对1的支持的文件。
2) 在第二阶段,我想读入前一阶段的输出,然后构造新的项集。重要的是,我想在mapper中确定在数据集中是否仍然找到任何新的项集。我设想,如果我将原始数据集作为输入发送到Map器,它将对原始文件进行分区,以便每个Map器只扫描部分数据集。然而,候选列表需要根据前一阶段的所有输出构建。这将在循环中迭代固定次数的过程。
我的问题是如何具体地确保我能够访问每个Map器中的完整项集,以及能够访问原始数据集来计算每个阶段中的新支持。
感谢您的任何建议、意见、建议或回答。
编辑:根据反馈,我只想更具体地说明我在这里问什么。

xpcnnkqh

xpcnnkqh1#

在开始之前,我建议您阅读hadoop map reduce教程。
步骤1:将数据文件加载到hdfs。假设您的数据是txt文件,每一组都是一行。

  1. a b c
  2. a c d e
  3. a e f
  4. a f z
  5. ...

步骤2:按照MapReduce教程构建自己的apriori类。

  1. public void map(Object key, Text value, Context context
  2. ) throws IOException, InterruptedException {
  3. // Seprate the line into tokens by space
  4. StringTokenizer itr = new StringTokenizer(value.toString());
  5. while (itr.hasMoreTokens()) {
  6. // Add the token into a writable set
  7. ... put the element into a writable set ...
  8. }
  9. context.write(word, one);
  10. }

步骤3:运行mapreducejar文件。输出将在hdfs中的一个文件中。你会有这样的感觉:

  1. a b 3 (number of occurrence)
  2. a b c 5
  3. a d 2
  4. ...

根据输出文件,可以计算关系。
另一方面,您可能需要考虑使用比map reduce更高级别的抽象,比如级联或apachespark。

展开查看全部
ryhaxcpt

ryhaxcpt2#

我使用hadoop流在apachespark和hadoopmapreduce中实现了aes算法。我知道这和apriori不一样,但是你可以尝试使用我的方法。
使用hadoop streming mapreduce实现aes的简单示例。
aes-hadoop流的项目结构
1n\u reducer.py/1n\u combiner是相同的代码,但没有约束。

  1. import sys
  2. CONSTRAINT = 1000
  3. def do_reduce(word, _values):
  4. return word, sum(_values)
  5. prev_key = None
  6. values = []
  7. for line in sys.stdin:
  8. key, value = line.split("\t")
  9. if key != prev_key and prev_key is not None:
  10. result_key, result_value = do_reduce(prev_key, values)
  11. if result_value > CONSTRAINT:
  12. print(result_key + "\t" + str(result_value))
  13. values = []
  14. prev_key = key
  15. values.append(int(value))
  16. if prev_key is not None:
  17. result_key, result_value = do_reduce(prev_key, values)
  18. if result_value > CONSTRAINT:
  19. print(result_key + "\t" + str(result_value))

基本Map器.py:

  1. import sys
  2. def count_usage():
  3. for line in sys.stdin:
  4. elements = line.rstrip("\n").rsplit(",")
  5. for item in elements:
  6. print("{item}\t{count}".format(item=item, count=1))
  7. if __name__ == "__main__":
  8. count_usage()

2n\u mapper.py使用上一次迭代的结果。在回答您的问题时,您可以读取上一次迭代的输出,以这样的方式形成项集。

  1. import itertools
  2. import sys
  3. sys.path.append('.')
  4. N_DIM = 2
  5. def get_2n_items():
  6. items = set()
  7. with open("part-00000") as inf:
  8. for line in inf:
  9. parts = line.split('\t')
  10. if len(parts) > 1:
  11. items.add(parts[0])
  12. return items
  13. def count_usage_of_2n_items():
  14. all_items_set = get_2n_items()
  15. for line in sys.stdin:
  16. items = line.rstrip("\n").rsplit(",") # 74743 43355 53554
  17. exist_in_items = set()
  18. for item in items:
  19. if item in all_items_set:
  20. exist_in_items.add(item)
  21. for combination in itertools.combinations(exist_in_items, N_DIM):
  22. combination = sorted(combination)
  23. print("{el1},{el2}\t{count}".format(el1=combination[0], el2=combination[1], count=1))
  24. if __name__ == "__main__":
  25. count_usage_of_2n_items()

根据我的经验,如果唯一组合(项目集)的数量太大(100k+),apriori算法不适合hadoop。如果您发现了一个使用hadoopmapreduce(流媒体或javamapreduce实现)实现apriori算法的优雅解决方案,请与社区分享。
如果你需要更多的代码片段请要求。

展开查看全部

相关问题