Source(源码实现需JDK21 SpringBoot3.2)
salttiger电子书 网站的电子书挺精华的,想到用爬虫一次批量都拿下.原理: 找到列表页面,并发爬取详细页面的百度盘链接.
VirtualThread虚拟线程的最佳实践 不用线程池,因为太轻量级了 百万并发....耐不住爬虫题对方网站的各种资源限制 例如一秒并发ip限制等.
这里借用了FixedVirtualThreadExecutorService的 固定线程数量的虚拟线程池类
原理是利用Semaphore信号量控制资源size, 如果想要等待所有提交的资源结束在运行,使用invokeAll() 或 使用 shutdown + awaitTermation
executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
public class FixedVirtualThreadExecutorService implements ExecutorService {
private final ExecutorService VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE = Executors.newVirtualThreadPerTaskExecutor();
private Semaphore semaphore;
private int poolSize;
public FixedVirtualThreadExecutorService(int poolSize) {
this.poolSize = poolSize;
this.semaphore = new Semaphore(this.poolSize);
}
@Override
public void shutdown() {
VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE.shutdownNow();
}
@Override
public boolean isShutdown() {
return VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE.isShutdown();
}
@Override
public boolean isTerminated() {
return VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, java.util.concurrent.TimeUnit unit) throws InterruptedException {
return VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE.awaitTermination(timeout, unit);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return CompletableFuture.supplyAsync(() -> {
try {
semaphore.acquire();
return task.call();
} catch (Exception e) {
throw new IllegalStateException(e);
} finally {
semaphore.release();
}
}, VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return CompletableFuture.supplyAsync(() -> {
try {
semaphore.acquire();
task.run();
return result;
} catch (Exception e) {
throw new IllegalStateException(e);
} finally {
semaphore.release();
}
}, VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE);
}
@Override
public Future<?> submit(Runnable task) {
return CompletableFuture.supplyAsync(() -> {
try {
semaphore.acquire();
task.run();
return null;
} catch (Exception e) {
throw new IllegalStateException(e);
} finally {
semaphore.release();
}
}, VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return tasks.stream().map(t -> CompletableFuture.supplyAsync(() -> {
try {
semaphore.acquire();
return t.call();
} catch (Exception e) {
throw new IllegalStateException(e);
} finally {
semaphore.release();
}
}, VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE)).collect(Collectors.toList());
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, java.util.concurrent.TimeUnit unit) throws InterruptedException {
return tasks.stream().map(t -> CompletableFuture.supplyAsync(() -> {
try {
semaphore.acquire();
return t.call();
} catch (Exception e) {
throw new IllegalStateException(e);
} finally {
semaphore.release();
}
}, VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE).orTimeout(timeout, unit)).collect(Collectors.toList());
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return tasks.stream().map(t -> CompletableFuture.supplyAsync(() -> {
try {
semaphore.acquire();
return t.call();
} catch (Exception e) {
throw new IllegalStateException(e);
} finally {
semaphore.release();
}
}, VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE)).map(f -> {
try {
return f.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}).findAny().get();
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, java.util.concurrent.TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return invokeAll(tasks, timeout, unit).stream().map(f -> {
try {
return f.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}).findAny().get();
}
@Override
public void close() {
VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE.close();
}
@Override
public void execute(Runnable command) {
VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE.execute(() -> {
try {
semaphore.acquire();
command.run();
} catch (Exception e) {
throw new IllegalStateException(e);
} finally {
semaphore.release();
}
});
}
}
找到列表页面,jsoup解析页面获得所有书籍的链接,并发生成tasks,invokeAll执行所有任务完毕,获得详情页的百度盘链接.
package com.gakaki.demo.service;
import cn.hutool.core.util.ObjectUtil;
import com.gakaki.demo.model.SaltTigerBookItem;
import com.gakaki.demo.model.SaltTigerBookTag;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jsoup.Jsoup;
import org.jsoup.internal.StringUtil;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Slf4j
@Service
public class SatigerBookService {
@SneakyThrows
public List<SaltTigerBookItem> fetchBooks() {
String url = "https://salttiger.com/archives/";
Document doc = Jsoup.connect(url).get();
List<SaltTigerBookItem> items = new ArrayList<>();
Elements listItems = doc.select("ul.car-list li"); // Example selector
for (Element listItem : listItems) {
String createdAt = listItem.select("span.car-yearmonth").text();
Elements bookLinks = listItem.select("ul.car-monthlisting li a");
for (Element bookLink : bookLinks) {
String title = bookLink.text();
String bookUrl = bookLink.attr("href");
// System.out.println("Title: " + title + ", URL: " + bookUrl + ", Created At: " + createdAt);
SaltTigerBookItem item = SaltTigerBookItem.builder().build();
item.setTitle(title);
item.setUrl(bookUrl);
items.add(item);
}
}
return items;
}
public static String regexFind(String regexStr, String str) {
Pattern pattern = Pattern.compile(regexStr);
Matcher matcher = pattern.matcher(str);
if (matcher.find()) {
return matcher.group();
}
return null;
}
@SneakyThrows
public SaltTigerBookItem fetchDetail(SaltTigerBookItem item) {
try {
Document doc = Jsoup.connect(item.getUrl()).get();
Elements articles = doc.select("article");
for (Element article : articles) {
item.setId(article.id());
item.setThumbnil(article.select("div > p:nth-child(1) > img").attr("src"));
String tmpText = article.select("strong").text();
item.setPubDate(regexFind("\\d{4}\\.\\d{1,2}", tmpText)); // 出版时间:2020.12
Element officalA = article.select("div > p:nth-child(1) > strong > a:nth-child(2)").first();
if (ObjectUtil.isNotNull(officalA)) {
item.setOfficalPress(officalA.text());
item.setOfficalUrl(officalA.attr("href"));
}
for (Element it : article.select("article strong > a[href*=ed2k]")) {
item.getOtherLinks().add(it.attr("href"));
}
Element officalBaidu = article.select("article strong > a[href*=baidu]").first();
if (ObjectUtil.isNotNull(officalBaidu)) {
item.setBaiduUrl(officalBaidu.attr("href"));
if (item.getBaiduUrl().contains("pwd")) {
item.setBaiduCode(regexFind("pwd=.*", item.getBaiduUrl()).replace("pwd=", ""));
} else {
item.setBaiduCode(regexFind("提取码 :\\w{1,4}", tmpText).replace("提取码 :", ""));
}
}
var description = regexFind("<p>内容简介([\\s\\S]*)", article.select("div.entry-content").html());
if (!StringUtil.isBlank(description)) {
item.setDescription(description.replace("<p>内容简介:</p>", ""));
}
item.setCreatedAt(article.select("footer > a:nth-child(1) > time").attr("datetime"));
item.setZlibSearchUrl(String.format("https://zlibrary-asia.se/s/%s?", item.getTitle()));
for (Element e : article.select("footer > a[rel*=tag]")) {
SaltTigerBookTag tag = SaltTigerBookTag.builder().build(); // 假设您有一个Tag类来存储标签数据
tag.setUrl(e.attr("href"));
tag.setName(e.text());
item.getTags().add(tag);
}
// 假设您有一个方法来处理JSON输出
// JSONArray salttigerItems = new JSONArray();
// JSONObject itemJson = item.toJson();
// salttigerItems.put(itemJson);
// // 写入JSON数据到文件
// writeToJsonFile(salttigerItems.toString(), "saltTiger.json");
// // 处理zlibrary链接
// List<String> totalZlibraryLinks = new ArrayList<>();
// for (Object salttigerItemObj : salttigerItems.toList()) {
// JSONObject salttigerItem = (JSONObject) salttigerItemObj;
// totalZlibraryLinks.add(salttigerItem.getString("zlibSearchUrl"));
// }
// writeToJsonFile(new JSONArray(totalZlibraryLinks).toString(), "zlibrary.json");
}
} catch (RuntimeException e) {
e.printStackTrace();
}
return item;
}
@SneakyThrows
public List<SaltTigerBookItem> fetchBookDetails() {
List<SaltTigerBookItem> items = this.fetchBooks();
ExecutorService executorService = new FixedVirtualThreadExecutorService(3);
//为了 代码简单 只用30个
items = items.stream().limit(15).collect(Collectors.toList());
// 创建一个任务列表
// List<Callable<SaltTigerBookItem>> tasks = items.stream().map(item ->
// new Callable<SaltTigerBookItem>() {
// @Override
// public SaltTigerBookItem call() throws Exception {
// return fetchDetail(item);
// }
// }).collect(Collectors.toList());
List<Callable<SaltTigerBookItem>> tasks = items.stream()
.map(item -> (Callable<SaltTigerBookItem>) () -> fetchDetail(item))
.collect(Collectors.toList());
List<SaltTigerBookItem> itemsFinal = new ArrayList<>();
// 使用invokeAll执行所有任务,并等待它们完成
List<Future<SaltTigerBookItem>> futures = executorService.invokeAll(tasks);
// 遍历Future列表,获取每个任务的结果
for (Future<SaltTigerBookItem> future : futures) {
try {
System.out.println(future.get());
itemsFinal.add(future.get());
} catch (InterruptedException | ExecutionException e) {
// 异常处理逻辑
e.printStackTrace();
}
}
executorService.shutdown();
return items;
}
}