应用程序从kafka读取数据并写入mongodb。
下面是我看到的错误线。。。 java.io.NotSerializableException: com.mongodb.MongoCollectionImpl
拓扑开始的主类。
ZkHosts zkHosts=new ZkHosts("localhost:2181");
String topic_name="test";
String consumer_group_id="storm";
String zookeeper_root="";
SpoutConfig kafkaConfig=new SpoutConfig(zkHosts,topic_name, zookeeper_root, consumer_group_id);
kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
//Mongo Setup
MongoClient mongoClient = new MongoClient( "zz.yyy.xx.abc",27017 );
mongoClient.setWriteConcern(WriteConcern.SAFE);
MongoDatabase db = mongoClient.getDatabase("IOT");
MongoCollection<Document> iotSampleColl = db.getCollection("iot_sample");
MongoInsertBolt mongoInsertBolt = new MongoInsertBolt(iotSampleColl);
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("KafkaSpout", kafkaSpout);
builder.setBolt("MongoInsertBolt", mongoInsertBolt).allGrouping("KafkaSpout");
Config conf = new Config();
LocalCluster cluster=new LocalCluster();
try{
cluster.submitTopology("test", conf, builder.createTopology());
cluster.shutdown();
}catch (Exception e) {
System.out.println(e.getMessage());
}
写入mongodb的类,mongoinsertbolt.class:
private static final long serialVersionUID = 2504213456001787553L;
protected MongoCollection<Document> iotSampleColl;
public MongoInsertBolt(MongoCollection<Document> iotSampleColl) {
this.iotSampleColl = iotSampleColl;
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String word=tuple.getString(0);
Document packet = new Document();
packet.put("IOT_trans",word);
if((null == word) || (word.length() == 0))
{
return;
}
iotSampleColl.insertOne(packet);
System.out.println("Word is..."+word);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// TODO Auto-generated method stub
}
如果我做错了,请告诉我。提前谢谢大家。
1条答案
按热度按时间vxqlmq5t1#
只需将mongodb连接代码移到
prepare(...)
方法: