使Java中的现有代码并行/多线程

zazmityj  于 2023-04-19  发布在  Java
关注(0)|答案(4)|浏览(150)

我有一个非常简单的爬虫。我想让我目前的代码运行在几个线程。你能提供我一个小教程或文章,以帮助我achive这个测试?
我最初是一名.Net开发人员,在.Net中,我在多线程中运行代码没有任何问题,但不幸的是,我对Java中的线程一无所知。
我的爬虫是一个命令行软件,所以不用担心GUI。
先谢谢你。

y3bcpkx1

y3bcpkx11#

Java通过Thread类来实现多线程。使现有代码多线程化的最常见方法之一是使用Runnable接口来定义在线程启动时要调用的内容,然后启动它。

public class SomeFunctions
{
  public static void FunctionA() {}
  public static void FunctionB() {}
  public static void FunctionC() {}
}

// ...
Thread t1 = new Thread(new Runnable() {
   public void run() {
      SomeFunctions.FunctionA();
   }
});
t1.start();

// (rinse and repeat for the other functions)

当然,一旦你进入多线程领域,你就会遇到并发问题,需要确保一切都是适当同步的,等等,但是任何语言都会有这些问题。
如果您担心同步,可以使用一些工具。最简单的是Java内置的递归互斥函数,即“synchronized”关键字。通过java.util.concurrent和java.util.concurrent.locks包中的各种类(如信号量和ReadWriteLock),也可以使用更经典的方法
http://download.oracle.com/javase/6/docs/api/java/util/concurrent/locks/package-summary.html

svmlkihl

svmlkihl2#

你可以看看我的网络爬虫的例子。

import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
 * A web crawler with a Worker pool
 * 
 * @author Adriaan
 */
public class WebCrawler implements Manager {

        private Set<Worker> workers = new HashSet<Worker>();
        private List<String> toCrawl = new ArrayList<String>();
        private Set<String> crawled = new HashSet<String>();
        private Set<String> hosts = new HashSet<String>();
        private Set<String> results = new HashSet<String>();
        private int maxResults;

        public WebCrawler(String url, int numberOfWorkers, int maxResults) {
                this.maxResults = maxResults;
                toCrawl.add(url);
                createWorkers(numberOfWorkers);
        }

        public void createWorkers(int numberOfWorkers) {
                for (int i = 0; i < numberOfWorkers; i++) {
                        workers.add(new Worker(this));
                }
        }

        private void stopWorkers() {
                for (Worker worker : workers) {
                        worker.terminate();
                }
        }

        public synchronized Job getNewJob() {
                while (toCrawl.size() == 0) {
                        try {
                                wait();
                        } catch (InterruptedException e) {
                                // ignore
                        }
                }
                return new EmailAddressCrawlJob().setDescription(toCrawl.remove(0));
        }

        public synchronized void jobCompleted(Job job) {
                // System.out.println("crawled: " + job.getDescription());
                crawled.add(job.getDescription());
                String host = getHost(job.getDescription());
                boolean knownHost = hosts.contains(host);
                if (!knownHost) {
                        System.out.println("host: " + host);
                        hosts.add(host);
                }
                for (String url : job.getNewDescriptions()) {
                        if (!crawled.contains(url)) {
                                if (knownHost) {
                                        toCrawl.add(toCrawl.size() - 1, url);
                                } else {
                                        toCrawl.add(url);
                                }
                        }
                }
                for (String result : job.getResults()) {
                        if (results.add(result)) {
                                System.out.println("result: " + result);
                        }
                }
                notifyAll();
                if (results.size() >= maxResults) {
                        stopWorkers();
                        System.out.println("Crawled hosts:");
                        for (String crawledHost : hosts) {
                                System.out.println(crawledHost);
                        }
                        Set<String> uncrawledHosts = new HashSet<String>();
                        for (String toCrawlUrl : toCrawl) {
                                uncrawledHosts.add(getHost(toCrawlUrl));
                        }
                        System.out.println("Uncrawled hosts:");
                        for (String unCrawledHost : uncrawledHosts) {
                                System.out.println(unCrawledHost);
                        }
                }
                if (crawled.size() % 10 == 0) {
                        System.out.println("crawled=" + crawled.size() + " toCrawl="
                                        + toCrawl.size() + " results=" + results.size() + " hosts="
                                        + hosts.size() + " lastHost=" + host);
                }
        }

        public String getHost(String host) {
                int hostStart = host.indexOf("://") + 3;
                if (hostStart > 0) {
                        int hostEnd = host.indexOf("/", hostStart);
                        if (hostEnd < 0) {
                                hostEnd = host.length();
                        }
                        host = host.substring(hostStart, hostEnd);
                }
                return host;
        }

        public static void main(String[] args) throws MalformedURLException {
                new WebCrawler("http://www.nu.nl/", 5, 20);
        }
}

工人

**
 * A Worker proactively gets a Job, executes it and notifies its manager that
 * the Job is completed.
 * 
 * @author Adriaan
 */
public class Worker extends Thread {

        private final Manager manager;
        private Job job = null;
        private boolean isWorking;

        public Worker(Manager manager) {
                this.manager = manager;
                isWorking = true;
                start();
        }

        @Override
        public void run() {
                System.out.println("Worker " + Thread.currentThread().getId()
                                + " starting ");
                while (isWorking) {
                        job = manager.getNewJob();
                        job.execute();
                        manager.jobCompleted(job);
                }
        }

        public void terminate() {
                isWorking = false;
        }
}

管理员界面

/**
 * Manager interface for Workers
 * 
 * @author Adriaan
 */
public interface Manager {

        /**
         * Gets a new job
         * 
         * @return
         */
        public Job getNewJob();

        /**
         * Indicates the job is completed
         * 
         * @param job
         */
        public void jobCompleted(Job job);
}

工作

import java.util.HashSet;
import java.util.Set;

/**
 * A Job is a unit of work defined by a String (the description). During execution the 
 * job can obtain results and new job descriptions.
 *
 * @author Adriaan
 */
public abstract class Job {

        private String description;
        private Set<String> results = new HashSet<String>();
        private Set<String> newDescriptions = new HashSet<String>();

        /**
         * Sets the job description
         * 
         * @param description
         * @return this for chaining
         */
        public Job setDescription(String description) {
                this.description = description;
                return this;
        }

        /**
         * Executes the job
         */
        public abstract void execute();

        /**
         * Gets the results obtained
         * 
         * @return
         */
        public Set<String> getResults() {
                return results;
        }

        /**
         * Gets the now job descriptions obtained
         * 
         * @return
         */
        public Set<String> getNewDescriptions() {
                return newDescriptions;
        }

        /**
         * Gets the job description
         * 
         * @return
         */
        public String getDescription() {
                return description;
        }

        /**
         * Allows the implementation to add an obtained result
         * 
         * @param result
         */
        void addResult(String result) {
                results.add(result);
        }

        /**
         * Allows the implementation to add an obtained description
         * 
         * @param result
         */
        void addNewDescription(String newDescription) {
                newDescriptions.add(newDescription);
        }
}

在页面上抓取电子邮件地址的作业:

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * A Job which crawls HTTP or HTTPS URL's for email adresses, collecting new
 * URL's to crawl along the way.
 * 
 * @author Adriaan
 */
public class EmailAddressCrawlJob extends Job {

        @Override
        public void execute() {
                try {
                        URL url = new URL(getDescription());
                        if (url != null) {
                                String text = readText(url);
                                extractNewDescriptions(text, url);
                                extractResults(text);
                        }
                } catch (MalformedURLException e) {
                        System.err.println("Bad url " + getDescription());
                }
        }

        private String readText(URL url) {
                URLConnection connection;
                try {
                        connection = url.openConnection();
                        InputStream input = connection.getInputStream();
                        byte[] buffer = new byte[1000];
                        int num = input.read(buffer);
                        if (num > 0) {
                                StringBuilder builder = new StringBuilder();
                                builder.append(new String(buffer, 0, num));
                                while (num != -1) {
                                        num = input.read(buffer);
                                        if (num != -1) {
                                                builder.append(new String(buffer, 0, num));
                                        }
                                }
                                return builder.toString();
                        }
                } catch (IOException e) {
                        //System.err.println("Could not read from " + url);
                }
                return "";
        }

        private void extractNewDescriptions(String text, URL url) {

                // URL extracting code from Sun example
                String lowerCaseContent = text.toLowerCase();
                int index = 0;
                while ((index = lowerCaseContent.indexOf("<a", index)) != -1) {

                        if ((index = lowerCaseContent.indexOf("href", index)) == -1) {
                                break;
                        }

                        if ((index = lowerCaseContent.indexOf("=", index)) == -1) {
                                break;
                        }

                        index++;
                        String remaining = text.substring(index);
                        StringTokenizer st = new StringTokenizer(remaining, "\t\n\r\">#");
                        String strLink = st.nextToken();

                        if (strLink.startsWith("javascript:")) {
                                continue;
                        }

                        URL urlLink;
                        try {
                                urlLink = new URL(url, strLink);
                                strLink = urlLink.toString();
                        } catch (MalformedURLException e) {
                                // System.err.println("Could not create url: " + target
                                // + " + " + strLink);
                                continue;
                        }
                        // only look at http links
                        String protocol = urlLink.getProtocol();
                        if (protocol.compareTo("http") != 0
                                        && protocol.compareTo("https") != 0) {
                                // System.err.println("Ignoring: " + protocol
                                // + " protocol in " + urlLink);
                                continue;
                        }
                        addNewDescription(urlLink.toString());
                }
        }

        private void extractResults(String text) {
                Pattern p = Pattern
                                .compile("([\\w\\-]([\\.\\w])+[\\w]+@([\\w\\-]+\\.)+[A-Za-z]{2,4})");
                Matcher m = p.matcher(text);
                while (m.find()) {
                        addResult(m.group(1));
                }
        }
}

我知道这个答案有点冗长,但我认为OP可能最好用一个工作示例来帮助,而且不久前我碰巧做了一个。

xxb16uws

xxb16uws3#

一个非常基本的java程序,将给予多线程的抽象概念。

public class MyThread extends Thread {

  String word;

  public MyThread(String rm){
    word = rm;
  }

  public void run(){

    try {

      for(;;){
        System.out.println(word);
        Thread.sleep(1000);
      }

    } catch(InterruptedException e) {

      System.out.println("sleep interrupted");      
    }
  }

  public static void main(String[] args) {

    Thread t1=new MyThread("First Thread");
    Thread t2=new MyThread("Second Thread");
    t1.start();
    t2.start();
  }
}

输出为..

First Thread
Second Thread
First Thread
Second Thread
First Thread

用这个PPT,它会帮助你的基础知识。
Here

vsmadaxz

vsmadaxz4#

使用Runnable而不是扩展Thread:

与其扩展Thread类,不如实现Runnable接口,它将运行线程的任务与线程本身分离。通过实现Runnable接口,run()方法将在传递给构造函数的对象上调用,我们可以使用单个Thread对象执行多个Runnable对象。

睡眠时间使用常量:

与其硬编码睡眠时间(1000毫秒),不如使用一个常量,使其更易读,更易于更改。并在sleep()调用中使用它。

public class MyRunnable implements Runnable {
    private String word;

    public MyRunnable(String word) {
        this.word = word;
    }

    @Override
    public void run() {
        try {
            while (true) {
                System.out.println(word);
                Thread.sleep(SLEEP_TIME);
            }
        } catch (InterruptedException e) {
            System.out.println("sleep interrupted");
        }
    }

    private static final int SLEEP_TIME = 1000;

    public static void main(String[] args) {
        Thread t1 = new Thread(new MyRunnable("First Thread"));
        Thread t2 = new Thread(new MyRunnable("Second Thread"));
        t1.start();
        t2.start();
    }
}

相关问题