[Java] Java21 VirtualThread 虚拟线程池爬取 salttiger电子书百度盘链接 Spring Boot 3.2列表

Source(源码实现需JDK21 SpringBoot3.2)

salttiger电子书 网站的电子书挺精华的,想到用爬虫一次批量都拿下.原理: 找到列表页面,并发爬取详细页面的百度盘链接.

VirtualThread虚拟线程的最佳实践 不用线程池,因为太轻量级了 百万并发....耐不住爬虫题对方网站的各种资源限制 例如一秒并发ip限制等.

这里借用了FixedVirtualThreadExecutorService的 固定线程数量的虚拟线程池类

原理是利用Semaphore信号量控制资源size, 如果想要等待所有提交的资源结束在运行,使用invokeAll() 或 使用 shutdown + awaitTermation

executorService.shutdown();
try {
    if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
        executorService.shutdownNow();
    } 
} catch (InterruptedException e) {
    executorService.shutdownNow();
}

虚拟线程池

public class FixedVirtualThreadExecutorService implements ExecutorService {
    private final ExecutorService VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE = Executors.newVirtualThreadPerTaskExecutor();

    private Semaphore semaphore;

    private int poolSize;

    public FixedVirtualThreadExecutorService(int poolSize) {
        this.poolSize = poolSize;
        this.semaphore = new Semaphore(this.poolSize);
     }

    @Override
    public void shutdown() {
        VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        return VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, java.util.concurrent.TimeUnit unit) throws InterruptedException {
        return VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE.awaitTermination(timeout, unit);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                semaphore.acquire();
                return task.call();
            } catch (Exception e) {
                throw new IllegalStateException(e);
            } finally {
                semaphore.release();
            }
        }, VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE);

    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                semaphore.acquire();
                task.run();
                return result;
            } catch (Exception e) {
                throw new IllegalStateException(e);
            } finally {
                semaphore.release();
            }
        }, VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                semaphore.acquire();
                task.run();
                return null;
            } catch (Exception e) {
                throw new IllegalStateException(e);
            } finally {
                semaphore.release();
            }
        }, VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE);
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return tasks.stream().map(t -> CompletableFuture.supplyAsync(() -> {
            try {
                semaphore.acquire();
                return t.call();
            } catch (Exception e) {
                throw new IllegalStateException(e);
            } finally {
                semaphore.release();
            }
        }, VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE)).collect(Collectors.toList());
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, java.util.concurrent.TimeUnit unit) throws InterruptedException {
        return tasks.stream().map(t -> CompletableFuture.supplyAsync(() -> {
            try {
                semaphore.acquire();
                return t.call();
            } catch (Exception e) {
                throw new IllegalStateException(e);
            } finally {
                semaphore.release();
            }
        }, VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE).orTimeout(timeout, unit)).collect(Collectors.toList());
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return tasks.stream().map(t -> CompletableFuture.supplyAsync(() -> {
            try {
                semaphore.acquire();
                return t.call();
            } catch (Exception e) {
                throw new IllegalStateException(e);
            } finally {
                semaphore.release();
            }
        }, VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE)).map(f -> {
            try {
                return f.get();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }).findAny().get();
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, java.util.concurrent.TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return invokeAll(tasks, timeout, unit).stream().map(f -> {
            try {
                return f.get();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }).findAny().get();
    }

    @Override
    public void close() {
        VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE.close();
    }

    @Override
    public void execute(Runnable command) {
        VIRTUAL_THREAD_POOL_EXECUTOR_SERVICE.execute(() -> {
            try {
                semaphore.acquire();
                command.run();
            } catch (Exception e) {
                throw new IllegalStateException(e);
            } finally {
                semaphore.release();
            }
        });

    }
}

爬虫实现 JSoup + 虚拟线程

找到列表页面,jsoup解析页面获得所有书籍的链接,并发生成tasks,invokeAll执行所有任务完毕,获得详情页的百度盘链接.

package com.gakaki.demo.service;

import cn.hutool.core.util.ObjectUtil;
import com.gakaki.demo.model.SaltTigerBookItem;
import com.gakaki.demo.model.SaltTigerBookTag;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jsoup.Jsoup;
import org.jsoup.internal.StringUtil;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

@Slf4j
@Service
public class SatigerBookService {

    @SneakyThrows
    public List<SaltTigerBookItem> fetchBooks() {

        String url = "https://salttiger.com/archives/";
        Document doc = Jsoup.connect(url).get();

        List<SaltTigerBookItem> items = new ArrayList<>();
        Elements listItems = doc.select("ul.car-list li"); // Example selector

        for (Element listItem : listItems) {
            String createdAt = listItem.select("span.car-yearmonth").text();
            Elements bookLinks = listItem.select("ul.car-monthlisting li a");

            for (Element bookLink : bookLinks) {
                String title = bookLink.text();
                String bookUrl = bookLink.attr("href");
//                System.out.println("Title: " + title + ", URL: " + bookUrl + ", Created At: " + createdAt);

                SaltTigerBookItem item = SaltTigerBookItem.builder().build();
                item.setTitle(title);
                item.setUrl(bookUrl);
                items.add(item);
            }
        }
        return items;
    }

    public static String regexFind(String regexStr, String str) {
        Pattern pattern = Pattern.compile(regexStr);
        Matcher matcher = pattern.matcher(str);
        if (matcher.find()) {
            return matcher.group();
        }
        return null;
    }

    @SneakyThrows
    public SaltTigerBookItem fetchDetail(SaltTigerBookItem item) {
        try {
            Document doc = Jsoup.connect(item.getUrl()).get();
            Elements articles = doc.select("article");
            for (Element article : articles) {
                item.setId(article.id());
                item.setThumbnil(article.select("div > p:nth-child(1) > img").attr("src"));
                String tmpText = article.select("strong").text();
                item.setPubDate(regexFind("\\d{4}\\.\\d{1,2}", tmpText)); // 出版时间:2020.12
                Element officalA = article.select("div > p:nth-child(1) > strong > a:nth-child(2)").first();

                if (ObjectUtil.isNotNull(officalA)) {
                    item.setOfficalPress(officalA.text());
                    item.setOfficalUrl(officalA.attr("href"));
                }

                for (Element it : article.select("article strong > a[href*=ed2k]")) {
                    item.getOtherLinks().add(it.attr("href"));
                }
                Element officalBaidu = article.select("article strong > a[href*=baidu]").first();

                if (ObjectUtil.isNotNull(officalBaidu)) {
                    item.setBaiduUrl(officalBaidu.attr("href"));
                    if (item.getBaiduUrl().contains("pwd")) {
                        item.setBaiduCode(regexFind("pwd=.*", item.getBaiduUrl()).replace("pwd=", ""));
                    } else {
                        item.setBaiduCode(regexFind("提取码    :\\w{1,4}", tmpText).replace("提取码    :", ""));
                    }
                }

                var description = regexFind("<p>内容简介([\\s\\S]*)", article.select("div.entry-content").html());
                if (!StringUtil.isBlank(description)) {
                    item.setDescription(description.replace("<p>内容简介:</p>", ""));
                }

                item.setCreatedAt(article.select("footer > a:nth-child(1) > time").attr("datetime"));
                item.setZlibSearchUrl(String.format("https://zlibrary-asia.se/s/%s?", item.getTitle()));
                for (Element e : article.select("footer > a[rel*=tag]")) {
                    SaltTigerBookTag tag = SaltTigerBookTag.builder().build(); // 假设您有一个Tag类来存储标签数据
                    tag.setUrl(e.attr("href"));
                    tag.setName(e.text());
                    item.getTags().add(tag);
                }
                // 假设您有一个方法来处理JSON输出
//                JSONArray salttigerItems = new JSONArray();
//                JSONObject itemJson = item.toJson();
//                salttigerItems.put(itemJson);
//                // 写入JSON数据到文件
//                writeToJsonFile(salttigerItems.toString(), "saltTiger.json");
//                // 处理zlibrary链接
//                List<String> totalZlibraryLinks = new ArrayList<>();
//                for (Object salttigerItemObj : salttigerItems.toList()) {
//                    JSONObject salttigerItem = (JSONObject) salttigerItemObj;
//                    totalZlibraryLinks.add(salttigerItem.getString("zlibSearchUrl"));
//                }
//                writeToJsonFile(new JSONArray(totalZlibraryLinks).toString(), "zlibrary.json");
            }
        } catch (RuntimeException e) {
            e.printStackTrace();
        }
        return item;
    }

    @SneakyThrows
    public List<SaltTigerBookItem> fetchBookDetails() {

        List<SaltTigerBookItem> items = this.fetchBooks();
        ExecutorService executorService = new FixedVirtualThreadExecutorService(3);
        //为了 代码简单 只用30个
        items = items.stream().limit(15).collect(Collectors.toList());

        // 创建一个任务列表
//        List<Callable<SaltTigerBookItem>> tasks = items.stream().map(item ->
//                new Callable<SaltTigerBookItem>() {
//                    @Override
//                    public SaltTigerBookItem call() throws Exception {
//                        return fetchDetail(item);
//                    }
//        }).collect(Collectors.toList());

        List<Callable<SaltTigerBookItem>> tasks = items.stream()
                .map(item -> (Callable<SaltTigerBookItem>) () -> fetchDetail(item))
                .collect(Collectors.toList());

        List<SaltTigerBookItem> itemsFinal = new ArrayList<>();

        // 使用invokeAll执行所有任务,并等待它们完成
        List<Future<SaltTigerBookItem>> futures = executorService.invokeAll(tasks);

        // 遍历Future列表,获取每个任务的结果
        for (Future<SaltTigerBookItem> future : futures) {
            try {
                System.out.println(future.get());
                itemsFinal.add(future.get());
            } catch (InterruptedException | ExecutionException e) {
                // 异常处理逻辑
                e.printStackTrace();
            }

        }
        executorService.shutdown();
        return items;

    }
}