如何为这个twitterapi的kafka producer建立连接

wvt8vs2t  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(268)

嗨,有没有人能帮我用kafka连接给出这个java代码的连接
提前谢谢

package example.producer;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.scribe.builder.*;
import org.scribe.builder.api.*;
import org.scribe.model.*;
import org.scribe.oauth.*;

public class TwitterStreamConsumer  extends Thread {

    private static final String STREAM_URI = "https://stream.twitter.com/1.1/statuses/filter.json";

    public void run(){
        try{
            System.out.println("Starting Twitter public stream consumer thread.");

            // Enter your consumer key and secret below
            OAuthService service = new ServiceBuilder()
                    .provider(TwitterApi.class)
                    .apiKey("xxxxx")
                    .apiSecret("xxxxx")
                    .build();

            // Set your access token
            Token accessToken = new Token("xxxxx", "xxxxxx");

            // Let's generate the request
            //System.out.println("Connecting to Twitter Public Stream");
            OAuthRequest request = new OAuthRequest(Verb.POST, STREAM_URI);
            request.addHeader("version", "HTTP/1.1");
            request.addHeader("host", "stream.twitter.com");
            request.setConnectionKeepAlive(true);
            request.addHeader("user-agent", "Twitter Stream Reader");
            request.addBodyParameter("track", "**screenname**"); // Set keywords you'd like to track here
            service.signRequest(accessToken, request);
            Response response = request.send();

            // Create a reader to read Twitter's stream
            BufferedReader reader = new BufferedReader(new InputStreamReader(response.getStream()));

            String line;
            while ((line = reader.readLine()) != null) {
                System.out.println(line);
            }
        }
        catch (IOException ioe){
            ioe.printStackTrace();
        }

    }

public static void main(String[] args){

    final TwitterStreamConsumer streamConsumer = new TwitterStreamConsumer(); // final because we will later pull the latest Tweet
    streamConsumer.start();
}
}

嗨,有人能给我建议如何将这个java代码与ApacheKafka连接起来吗。我试过很多方法,但都不对。有人能帮忙吗????
提前谢谢

gijlo24d

gijlo24d1#

响应类的getstream()方法声明为private,因此无法访问它以获取输入流。

private static final String topic = "twitter-feed-topic";

private static final String STREAM_URI = "https://api.twitter.com/1.1/statuses/home_timeline.json";

System.out.println("Starting Twitter public stream consumer thread");

    Properties properties = new Properties();

    properties.put("metadata.broker.list", "localhost:9092");
    properties.put("serializer.class", "kafka.serializer.StringEncoder");

    ProducerConfig config = new ProducerConfig(properties);

    Producer<String, String> producer = new Producer<String, String>(config);

    OAuthService service = new ServiceBuilder()
            .provider(TwitterApi.class)
            .apiKey("Api Key")
            .apiSecret("Api Secret key")
            .build();

    Scanner in = new Scanner(System.in);

    // Obtain the Request Token

    Token requestToken = service.getRequestToken();

    // authorize scribe here

    System.out.println(service.getAuthorizationUrl(requestToken));

    System.out.println("enter the verifier number ");
    System.out.print(">>");
    Verifier verifier = new Verifier(in.nextLine());

    // access the token

    Token accessToken = service.getAccessToken(requestToken, verifier);

    OAuthRequest request = new OAuthRequest(Verb.GET, STREAM_URI, service);
    service.signRequest(accessToken, request);
    Response response = request.send();

    System.out.println(response.getBody());

    InputStream is = new ByteArrayInputStream((response.getBody()).getBytes(StandardCharsets.UTF_8));
    BufferedReader inputReader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
    String line;
    try {
        while ((line = inputReader.readLine()) != null) {
            KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic, line);
            producer.send(message);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

上面的推特Kafka生产者的例子运行良好。我希望这有帮助!

9avjhtql

9avjhtql2#

Kafka制作人使用Spark流。。

package com.multipleproducer.sparkstreaming.Multiplekafkaproducersparkstreaming;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;

public class Twitterdata {

    public static void Run(String ConsumerKey, String ConsumerSecret,
            String AccessToken, String AccessTokenSecret  ) {

        Properties properties = new Properties();
        properties.put("metadata.broker.list", "localhost:9092");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("client.id","camus");
        ProducerConfig producerConfig = new ProducerConfig(properties);
        final Producer<String, String> producer = new Producer<String, String>(producerConfig);

        ConfigurationBuilder cb = new ConfigurationBuilder();
        cb.setDebugEnabled(true);
        cb.setOAuthConsumerKey(ConsumerKey);
        cb.setOAuthConsumerSecret(ConsumerSecret);
        cb.setOAuthAccessToken(AccessToken);
        cb.setOAuthAccessTokenSecret(AccessTokenSecret);
        TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
        //kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);

        System.out.println("##################### TWITTER___ STARTED ###########################");
        StatusListener listener = new StatusListener() {

            public void onDeletionNotice(
                    StatusDeletionNotice statusDeletionNotice) {
                // TODO Auto-generated method stub

            }

            public void onException(Exception ex) {
                // TODO Auto-generated method stub

            }

            public void onScrubGeo(long userId, long upToStatusId) {
                // TODO Auto-generated method stub

            }

            public void onStallWarning(StallWarning warning) {
                // TODO Auto-generated method stub

            }

            public void onStatus(Status data) {
                // TODO Auto-generated method stub

                twitter4j.User user = data.getUser();
                String userdata = user.toString();
                userdata = userdata.replaceAll("UserJSONImpl", "");
                System.out.println();

                String topic="twitterdata";
                KeyedMessage<String, String> info = new KeyedMessage<String,String>(topic,userdata); 
                producer.send(info);
                System.out.println(info);

                /*JSONObject obj=new JSONObject();
                obj.put("UserId", user.getId());
                obj.put("Name",user.getName());
                obj.put("ScreenName",user.getScreenName());
                obj.put("CreatedAt",user.getCreatedAt());
                obj.put("Location",user.getLocation());
                obj.put("TimeZone",user.getTimeZone() );
                obj.put("Lang",user.getLang());
                obj.put("UtcOffset",user.getUtcOffset());
                obj.put("Description",user.getDescription());
                obj.put("FavouritesCount", user.getFavouritesCount());
                obj.put("FollowersCount",user.getFollowersCount());
                obj.put("FriendsCount",user.getFriendsCount());
                obj.put("ListedCount", user.getListedCount());
                obj.put("URL",user.getURL());
                obj.put("StatusesCount",user.getStatusesCount());
                obj.put("OriginalProfileImageURL",user.getOriginalProfileImageURLHttps());
                obj.put("Tweets",data.getText());
                obj.put("CurrentUserRetweetId",data.getCurrentUserRetweetId());
                obj.put("InReplyToUserId",data.getInReplyToUserId());
                obj.put("InReplyToScreenName", data.getInReplyToScreenName());
                obj.put("getSource", data.getSource());

                System.out.println(obj);*/

            }

            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
                // TODO Auto-generated method stub

            }
        };
        FilterQuery fq = new FilterQuery();
        String keywords[] = { "BigTappIndia" };

        fq.track(keywords);
        // track Singapore location
        //double[][] location = { { Latitude }, { Longitude } };
        //fq.locations(location);
        twitterStream.addListener(listener);
        twitterStream.filter(fq);
         twitterStream.sample();

    }

     public static void main(String args[]){

     Twitterdata.Run("consumerkey",
      "ConsumerSecret",
      "AccessToken",
      "AccessTokenSecret");

      }

}
ldxq2e6h

ldxq2e6h3#

Kafka制作人使用twitter散列标签
包com.multipleproducer.sparkstreaming.multiplekafkaproducersparkstreaming;

import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;

/**
 * Hello world!
 *
 */
public class TwitterHash_tag 
{
    private static final String topic = "Hash_tag";

    public static void run(String consumerKey, String consumerSecret,
            String token, String secret) throws InterruptedException {

        Properties properties = new Properties();
        properties.put("metadata.broker.list", "localhost:9092");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("client.id","camus");
        ProducerConfig producerConfig = new ProducerConfig(properties);

        Producer<String, String> producer = new Producer<String, String>(producerConfig);
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
        StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();

        // add some track terms in hashtag
        endpoint.trackTerms(Lists.newArrayList("",
                "#India"));

        Authentication auth = new OAuth1(consumerKey, consumerSecret, token,
                secret);
        // Authentication auth = new BasicAuth(username, password);

        // Create a new BasicClient. By default gzip is enabled.
        Client client = new ClientBuilder().hosts(Constants.STREAM_HOST)
                .endpoint(endpoint).authentication(auth)
                .processor(new StringDelimitedProcessor(queue)).build();

        // Establish a connection
        client.connect();

        // Do whatever needs to be done with messages
        for (int msgRead = 0; msgRead < 10; msgRead++) {
            KeyedMessage<String, String> message = null;
            try {
                message = new KeyedMessage<String, String>(topic, queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            producer.send(message);
            System.out.println(message);
        }
        producer.close();
        client.stop();

    }

    public static void main( String[] args ) throws InterruptedException
    {

        TwitterHash_tag.run("consumerKey", "consumerSecretkey",
                " AccessToken", "AccessTokenSecret");

    }
}
hts6caw3

hts6caw34#

package example.producer;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Properties;

import org.scribe.builder.*;
import org.scribe.builder.api.*;
import org.scribe.model.*;
import org.scribe.oauth.*;

import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;

public class TwitterStreamConsumer7  extends Thread {

	private static final String STREAM_URI = "https://stream.twitter.com/1.1/statuses/filter.json";

    public void run(){
        try{
        	 FileReader f = new FileReader("/home/trainings/Desktop/Streamin/input1.csv");
   		  BufferedReader bf = new BufferedReader(f);
   		  String Screen_Name="";
   		  while((Screen_Name=bf.readLine())!=null)
   		  {
            System.out.println("Starting Twitter public stream consumer thread.");
            System.out.println(Screen_Name);

            // Enter your consumer key and secret below
            OAuthService service = new ServiceBuilder()
                    .provider(TwitterApi.class)
                    .apiKey("ZRra7TMrssssssssssssssssxxxxxxxxxxx")
                    .apiSecret("LgUhEY4R8xxxxxxxxxxxxxxxxxxQw069D")
                    .build();

            // Set your access token
            Token accessToken = new Token("349211dddddddddddddddhyv4AL01lMRVN", "gqNqPuWoSxxxxxxxxxxxxxxxxxxxxkz1xCrzxWMUgd3kZ");
            Properties props = new Properties();
    	    props.put("metadata.broker.list", "localhost:9092");
    	    props.put("serializer.class", "kafka.serializer.StringEncoder");
    	    props.put("partitioner.class", "example.producer.SimplePartitioner");
    	    props.put("request.required.acks", "1");
    	    props.put("retry.backoff.ms", "150");
    	    props.put("message.send.max.retries","10");
    	    props.put("topic.metadata.refresh.interval.ms","0");

    	    ProducerConfig config = new ProducerConfig(props);

    	    final Producer<String, String> producer = new Producer<String, String>(config);
            // Let's generate the request
            //System.out.println("Connecting to Twitter Public Stream");
            OAuthRequest request = new OAuthRequest(Verb.POST, STREAM_URI);
            request.addHeader("version", "HTTP/1.1");
            request.addHeader("host", "stream.twitter.com");
            request.setConnectionKeepAlive(true);
            request.addHeader("user-agent", "Twitter Stream Reader");
            request.addBodyParameter("track", Screen_Name); // Set keywords you'd like to track here
            service.signRequest(accessToken, request);
            Response response = request.send();

            // Create a reader to read Twitter's stream
           BufferedReader reader = new BufferedReader(new InputStreamReader(response.getStream()));

           String line;
       while ((line = reader.readLine()) != null) {
    	   KeyedMessage<String, String> data = new KeyedMessage<String, String>("twitter_events5",line);
                System.out.println(line);
                producer.send(data);
                Thread.sleep(1000);
            }
   		  //}
        }
        }     
        catch (IOException ioe){
            ioe.printStackTrace();

        } catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

    }

public static void main(String[] args){

    TwitterStreamConsumer7 streamConsumer = new TwitterStreamConsumer7(); // final because we will later pull the latest Tweet
    streamConsumer.start();

}
}

解决了这个问题会有帮助的

相关问题