你认为有可能吗?我使用java.util.concurrent和java.net httpurlconnection对池中的每个线程发出请求。但a遇到了一个问题:
有些连接没有结束,并且它保持了该线程很长一段时间。我看到池没有终止,因为一些线程从未实现。
这需要大量的互联网性能,当互联网速度变慢时,几乎所有的请求都会遇到连接超时或读取超时异常。您有更好的解决方案吗。我尝试了不同步的lib,比如java reactive webflux、io.vertx.webclient,但没有更好。
以下是我的部分代码:
public static void cancelMobile(User user, int split, int start, int end, String nickNameRecv, boolean useProxy, int timeout) {
long startTime = System.currentTimeMillis();
makeRandomOTPQueue(start, end);
int size = 50000;//randomOTP.size();
System.out.println("LUONG: " + split + " TOTAL: " + size);
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(size);
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
ThreadPoolExecutor threadPoolExecutor = /*(ThreadPoolExecutor) Executors.newCachedThreadPool();*/
new ThreadPoolExecutor(split,
1000, Long.MAX_VALUE, TimeUnit.NANOSECONDS, workQueue, handler);
ScheduledExecutorService scheduledThreadPoolExecutor = Executors.newScheduledThreadPool(split);
threadPoolExecutor.allowCoreThreadTimeOut(false);
List<TransferSMSOTP> transferSMSOTPList = new LinkedList<>();
for (int j = 0; j < size; j++) {
threadPoolExecutor.submit(new TransferSMSOTP(user, String.valueOf(randomOTP.get(j)),
useProxy ? _Proxy.getRandomProxy(Main._proxy) : null, nickNameRecv, timeout));
// scheduledThreadPoolExecutor.schedule(new TransferSMSOTP(user, String.valueOf(randomOTP.get(j)),
// useProxy ? _Proxy.getRandomProxy(Main._proxy) : null, nickNameRecv, timeout),1000, TimeUnit.MILLISECONDS);
}
threadPoolExecutor.shutdown();
while (!threadPoolExecutor.isTerminated()) {
}
ThreadPoolExecutor poolFail = new ThreadPoolExecutor(split,
randomOTP.size(), 1, TimeUnit.MILLISECONDS, workQueue, handler);
System.out.println("RETRY amount: " + failList.size());
LinkedList<Integer> clone = failList;
System.out.println("waiting pop fail otp");
long backup = System.currentTimeMillis();
for (int i = 0; i < clone.size(); i++) {
try {
poolFail.execute(new TransferSMSOTP(user, String.valueOf(clone.remove()),
useProxy ? _Proxy.getRandomProxy(Main._proxy) : null, nickNameRecv, timeout));
} catch (Exception e) {
System.out.println("cannot pop: " + e.getMessage());
}
// if (System.currentTimeMillis() - backup > TimeUnit.MINUTES.toMillis(2)) {
// System.out.println("Exit pop");
// poolFail.shutdownNow();
// }
}
poolFail.shutdown();
System.out.println("waiting handle failed otp");
while (!poolFail.isTerminated()) {
if (System.currentTimeMillis() - startTime > TimeUnit.MINUTES.toMillis(5)) {
System.out.println("Exit fail handle");
poolFail.shutdownNow();
// System.exit(0);
break;
}
}
System.out.println("UNHANDLED amount: " + failList.size() + ", total time of job: " + (System.currentTimeMillis() - startTime));
}
在我的runnable类中:
public void run() {
try {
String url = this.user.getServer().getUrlTransfer();
Document data = new Document();
data.put("nickNameRecv", this.nickNameRecv);
data.put("otp", otp);
String authorization = "Bearer " +
this.user.getAuth();
URL urlCon = new URL(url);
HttpURLConnection connection;
final String authUser = this._proxy.getProxyUserName();
final String authPassword = this._proxy.getProxyPassword();
Authenticator.setDefault(new ProxyAuthenticator(authUser, authPassword));
Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(this._proxy.getProxyHost(), this._proxy.getProxyPort()));
if (_proxy == null)
connection = (HttpURLConnection) urlCon.openConnection();
else
connection = (HttpURLConnection) urlCon.openConnection(proxy);
connection.setRequestMethod("POST");
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
connection.setRequestProperty(HttpHeaders.ACCEPT, ContentType.APPLICATION_JSON.toString());
connection.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
connection.setRequestProperty(HttpHeaders.AUTHORIZATION, authorization);connection.setRequestProperty("client-browser", "Chrome 83");
// connection.setRequestProperty("client-deviceid", rand(32, -1, "abcdef1234567890"));
connection.setRequestProperty("client-operatingsystem", "Windows");
String param = data.toJson();
connection.setRequestProperty("Content-Length", String.valueOf(param.length()));
connection.setRequestProperty(HttpHeaders.USER_AGENT, Main.agent.get(new Random().nextInt(Main.agent.size())));
connection.setDoOutput(true);
OutputStream outputStream = connection.getOutputStream();
outputStream.write(param.getBytes());
InputStream inputStream = connection.getInputStream();
ObjectMapper mapper = new ObjectMapper();
Map res = mapper.readValue(inputStream, Map.class);
outputStream.close();
inputStream.close();
connection.disconnect();
ProcessGame.done++;
Main.f.updateResult();
} catch (Exception e) {
ProcessGame.failList.add(Integer.parseInt(this.otp));
}
}
我得到的最好结果是大约20000/99999请求获得http状态200。它有更好的解决方案吗?
暂无答案!
目前还没有任何答案,快来回答吧!