嗨,有没有人能帮我用kafka连接给出这个java代码的连接
提前谢谢
package example.producer;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.scribe.builder.*;
import org.scribe.builder.api.*;
import org.scribe.model.*;
import org.scribe.oauth.*;
public class TwitterStreamConsumer extends Thread {
private static final String STREAM_URI = "https://stream.twitter.com/1.1/statuses/filter.json";
public void run(){
try{
System.out.println("Starting Twitter public stream consumer thread.");
// Enter your consumer key and secret below
OAuthService service = new ServiceBuilder()
.provider(TwitterApi.class)
.apiKey("xxxxx")
.apiSecret("xxxxx")
.build();
// Set your access token
Token accessToken = new Token("xxxxx", "xxxxxx");
// Let's generate the request
//System.out.println("Connecting to Twitter Public Stream");
OAuthRequest request = new OAuthRequest(Verb.POST, STREAM_URI);
request.addHeader("version", "HTTP/1.1");
request.addHeader("host", "stream.twitter.com");
request.setConnectionKeepAlive(true);
request.addHeader("user-agent", "Twitter Stream Reader");
request.addBodyParameter("track", "**screenname**"); // Set keywords you'd like to track here
service.signRequest(accessToken, request);
Response response = request.send();
// Create a reader to read Twitter's stream
BufferedReader reader = new BufferedReader(new InputStreamReader(response.getStream()));
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
}
catch (IOException ioe){
ioe.printStackTrace();
}
}
public static void main(String[] args){
final TwitterStreamConsumer streamConsumer = new TwitterStreamConsumer(); // final because we will later pull the latest Tweet
streamConsumer.start();
}
}
嗨,有人能给我建议如何将这个java代码与ApacheKafka连接起来吗。我试过很多方法,但都不对。有人能帮忙吗????
提前谢谢
4条答案
按热度按时间gijlo24d1#
响应类的getstream()方法声明为private,因此无法访问它以获取输入流。
上面的推特Kafka生产者的例子运行良好。我希望这有帮助!
9avjhtql2#
Kafka制作人使用Spark流。。
ldxq2e6h3#
Kafka制作人使用twitter散列标签
包com.multipleproducer.sparkstreaming.multiplekafkaproducersparkstreaming;
hts6caw34#
解决了这个问题会有帮助的