针对cassandra使用hadoop mapreduce的示例代码

mec1mxoz  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(361)

我一直在尝试获取一个运行cassandra的mapreduce示例代码,但遇到运行时错误。源代码:

  1. import java.io.IOException;
  2. import java.nio.ByteBuffer;
  3. import java.util.*;
  4. import java.util.Map.Entry;
  5. import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
  6. import org.apache.cassandra.hadoop.cql3.CqlOutputFormat;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat;
  10. import org.apache.cassandra.hadoop.ConfigHelper;
  11. import org.apache.cassandra.utils.ByteBufferUtil;
  12. import org.apache.hadoop.conf.Configuration;
  13. import org.apache.hadoop.conf.Configured;
  14. import org.apache.hadoop.fs.Path;
  15. import org.apache.hadoop.io.IntWritable;
  16. import org.apache.hadoop.io.Text;
  17. import org.apache.hadoop.mapreduce.Job;
  18. import org.apache.hadoop.mapreduce.Mapper;
  19. import org.apache.hadoop.mapreduce.Reducer;
  20. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  21. import org.apache.hadoop.util.Tool;
  22. import org.apache.hadoop.util.ToolRunner;
  23. import java.nio.charset.CharacterCodingException;
  24. /**
  25. * This counts the occurrences of words in ColumnFamily
  26. * cql3_worldcount ( user_id text,
  27. * category_id text,
  28. * sub_category_id text,
  29. * title text,
  30. * body text,
  31. * PRIMARY KEY (user_id, category_id, sub_category_id))
  32. *
  33. * For each word, we output the total number of occurrences across all body texts.
  34. *
  35. * When outputting to Cassandra, we write the word counts to column family
  36. * output_words ( row_id1 text,
  37. * row_id2 text,
  38. * word text,
  39. * count_num text,
  40. * PRIMARY KEY ((row_id1, row_id2), word))
  41. * as a {word, count} to columns: word, count_num with a row key of "word sum"
  42. */
  43. public class WordCount extends Configured implements Tool
  44. {
  45. private static final Logger logger = LoggerFactory.getLogger(WordCount.class);
  46. static final String KEYSPACE = "cql3_worldcount";
  47. static final String COLUMN_FAMILY = "inputs";
  48. static final String OUTPUT_REDUCER_VAR = "output_reducer";
  49. static final String OUTPUT_COLUMN_FAMILY = "output_words";
  50. private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count";
  51. private static final String PRIMARY_KEY = "row_key";
  52. public static void main(String[] args) throws Exception
  53. {
  54. // Let ToolRunner handle generic command-line options
  55. ToolRunner.run(new Configuration(), new WordCount(), args);
  56. System.exit(0);
  57. }
  58. public static class TokenizerMapper extends Mapper<Map<String, ByteBuffer>, Map<String, ByteBuffer>, Text, IntWritable>
  59. {
  60. private final static IntWritable one = new IntWritable(1);
  61. private Text word = new Text();
  62. private ByteBuffer sourceColumn;
  63. protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
  64. throws IOException, InterruptedException
  65. {
  66. }
  67. public void map(Map<String, ByteBuffer> keys, Map<String, ByteBuffer> columns, Context context) throws IOException, InterruptedException
  68. {
  69. for (Entry<String, ByteBuffer> column : columns.entrySet())
  70. {
  71. if (!"body".equalsIgnoreCase(column.getKey()))
  72. continue;
  73. String value = ByteBufferUtil.string(column.getValue());
  74. logger.debug("read {}:{}={} from {}",
  75. new Object[] {toString(keys), column.getKey(), value, context.getInputSplit()});
  76. StringTokenizer itr = new StringTokenizer(value);
  77. while (itr.hasMoreTokens())
  78. {
  79. word.set(itr.nextToken());
  80. context.write(word, one);
  81. }
  82. }
  83. }
  84. private String toString(Map<String, ByteBuffer> keys)
  85. {
  86. String result = "";
  87. try
  88. {
  89. for (ByteBuffer key : keys.values())
  90. result = result + ByteBufferUtil.string(key) + ":";
  91. }
  92. catch (CharacterCodingException e)
  93. {
  94. logger.error("Failed to print keys", e);
  95. }
  96. return result;
  97. }
  98. }
  99. public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable>
  100. {
  101. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
  102. {
  103. int sum = 0;
  104. for (IntWritable val : values)
  105. sum += val.get();
  106. context.write(key, new IntWritable(sum));
  107. }
  108. }
  109. public static class ReducerToCassandra extends Reducer<Text, IntWritable, Map<String, ByteBuffer>, List<ByteBuffer>>
  110. {
  111. private Map<String, ByteBuffer> keys;
  112. private ByteBuffer key;
  113. protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
  114. throws IOException, InterruptedException
  115. {
  116. keys = new LinkedHashMap<String, ByteBuffer>();
  117. String[] partitionKeys = context.getConfiguration().get(PRIMARY_KEY).split(",");
  118. keys.put("row_id1", ByteBufferUtil.bytes(partitionKeys[0]));
  119. keys.put("row_id2", ByteBufferUtil.bytes(partitionKeys[1]));
  120. }
  121. public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
  122. {
  123. int sum = 0;
  124. for (IntWritable val : values)
  125. sum += val.get();
  126. context.write(keys, getBindVariables(word, sum));
  127. }
  128. private List<ByteBuffer> getBindVariables(Text word, int sum)
  129. {
  130. List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
  131. keys.put("word", ByteBufferUtil.bytes(word.toString()));
  132. variables.add(ByteBufferUtil.bytes(String.valueOf(sum)));
  133. return variables;
  134. }
  135. }
  136. public int run(String[] args) throws Exception
  137. {
  138. String outputReducerType = "filesystem";
  139. if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR))
  140. {
  141. String[] s = args[0].split("=");
  142. if (s != null && s.length == 2)
  143. outputReducerType = s[1];
  144. }
  145. logger.info("output reducer type: " + outputReducerType);
  146. Job job = new Job(getConf(), "wordcount");
  147. job.setJarByClass(WordCount.class);
  148. job.setMapperClass(TokenizerMapper.class);
  149. if (outputReducerType.equalsIgnoreCase("filesystem"))
  150. {
  151. job.setCombinerClass(ReducerToFilesystem.class);
  152. job.setReducerClass(ReducerToFilesystem.class);
  153. job.setOutputKeyClass(Text.class);
  154. job.setOutputValueClass(IntWritable.class);
  155. FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
  156. }
  157. else
  158. {
  159. job.setReducerClass(ReducerToCassandra.class);
  160. job.setMapOutputKeyClass(Text.class);
  161. job.setMapOutputValueClass(IntWritable.class);
  162. job.setOutputKeyClass(Map.class);
  163. job.setOutputValueClass(List.class);
  164. job.setOutputFormatClass(CqlOutputFormat.class);
  165. ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
  166. job.getConfiguration().set(PRIMARY_KEY, "word,sum");
  167. String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY +
  168. " SET count_num = ? ";
  169. CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
  170. ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
  171. ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
  172. }
  173. job.setInputFormatClass(CqlPagingInputFormat.class);
  174. ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
  175. ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
  176. ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
  177. ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
  178. CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
  179. //this is the user defined filter clauses, you can comment it out if you want count all titles
  180. CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "title='A'");
  181. job.waitForCompletion(true);
  182. return 0;
  183. }
  184. }

它编译得很好,但是我得到了一个错误:

  1. Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat
  2. at WordCount.run(WordCount.java:230)
  3. at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
  4. at WordCount.main(WordCount.java:94)
  5. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  6. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  7. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  8. at java.lang.reflect.Method.invoke(Method.java:606)
  9. at org.apache.hadoop.util.RunJar.main(RunJar.java:160)
  10. Caused by: java.lang.ClassNotFoundException: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
  11. at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
  12. at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  13. at java.security.AccessController.doPrivileged(Native Method)
  14. at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  15. at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
  16. at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  17. ... 8 more

我正在使用hadoop1.2.1和cassandra2.0.4。帮助这个错误或样本代码或指令让hadoopmapreduce与cassandra一起工作将不胜感激。

dy1byipe

dy1byipe1#

请使用以下路径
在//conf/hadoop-env.sh文件中导出hadoop\u classpath=//lib/*:$hadoop\u classpath。

6yjfywim

6yjfywim2#

要解决这个问题,请将cassandrajar文件复制到hadoop lib目录。

相关问题