lyg
2024-06-12 70f88e715c980d0a6d411cdfbac4a13e90f03daf
src/main.mjs
@@ -5,7 +5,7 @@
import axios from "axios";
import * as fs from "fs";
import path from "path";
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';
import { Worker, isMainThread, parentPort, workerData, threadId } from 'worker_threads';
import { HttpsProxyAgent } from "https-proxy-agent";
import { resolve } from "path";
@@ -19,7 +19,7 @@
  if (!fs.existsSync('./logs')) {
    fs.mkdirSync('./logs', { recursive: true });
  }
  logFile = fs.createWriteStream(`./logs/logs-${config.startRow}-${config.endRow}.log`, { flags: 'a', encoding: 'utf8' });
  logFile = fs.createWriteStream(`./logs/logs-${config.startRow}-${config.endRow}-thread${threadId}.log`, { flags: 'a', encoding: 'utf8' });
  console.log = function (...text) {
    text = `${new Date().toLocaleString()} ${text.join(' ') ?? ''}`;
    _log(text);
@@ -259,8 +259,13 @@
    return;
  }
  await retry(() => {
    const timeoutTime = 10 * 60 * 1000;
    const source = axios.CancelToken.source();
    const timeout = setTimeout(() => {
      source.cancel("timeout");
    }, timeoutTime);
    return new Promise((resolve, reject) => myAxios
      .get(url, { responseType: "stream" })
      .get(url, { responseType: "stream", timeout: timeoutTime, cancelToken: source.token })
      .then((response) => {
        const len = response.headers['content-length'];
        if (ext !== "pdf" && ext !== "txt" && len > 200 * 1024 * 1024) {
@@ -275,6 +280,7 @@
        const out = fs.createWriteStream(filepath);
        stream.pipe(out);
        stream.on("end", () => {
          clearTimeout(timeout);
          book.state = `下载完成`;
          book.format = ext;
          book.file = filepath;
@@ -283,6 +289,7 @@
          resolve(true);
        });
        stream.on("error", (err) => {
          clearTimeout(timeout);
          console.error(err);
          book.state = "下载失败";
          book.url = url;
@@ -297,6 +304,7 @@
        });
      })
      .catch((e) => {
        clearTimeout(timeout);
        console.error(e);
        book.state = "下载失败";
        book.url = url;
@@ -313,9 +321,29 @@
  return alreadyDownloadedBooks.includes(id);
}
function nextBook() {
  return new Promise(resolve => {
    const cb = (message) => {
      if (message.type === 'book') {
        resolve(message.data);
        parentPort.removeListener('message', cb);
      }
    };
    parentPort.on('message', cb);
    parentPort.postMessage({ type: 'get-book', threadId });
  });
}
async function downloadBooks(books) {
  driver = await createDriver();
  for (const book of books) {
  for (; ;) {
    const book = await nextBook();
    if (!book) {
      break;
    }
    books.push(book);
    if (config.endOfTime && Date.now() - startTime > 1000 * 60 * config.endOfTime) {
      // 定时退出
      break;
@@ -388,7 +416,7 @@
  }
  const buffer = xlsx.build([{ name: "Sheet1", data }]);
  fs.writeFile("./【第二批二次处理后】交付清单.xlsx", buffer, (err) => { });
  fs.writeFileSync("./【第二批二次处理后】交付清单.xlsx", buffer, (err) => { });
  console.log("保存完成: ./【第二批二次处理后】交付清单.xlsx");
}
@@ -438,7 +466,7 @@
function main() {
  initLogger();
  const books = getBooksFromExcel(config.startRow, config.endRow);
  const books = [];
  downloadBooks(books)
    .then(() => {
      console.log(`全部完成,共下载${bookCount}本,成功下载${successCount}本,跳过${skipCount}本,失败${bookCount - skipCount - successCount}本,耗时: ${msFormat(Date.now() - startTime)}。`);
@@ -461,13 +489,13 @@
if (isMainThread) {
  initLogger();
  const alreadyDownloadedBooks = getAlreadyDownloadedBooks();
  console.log(`线程数:${config.threadSize}, 开始行:${config.startRow}, 结束行:${config.endRow}`);
  let startRow = config.startRow;
  let endRow = config.endRow;
  const { startRow, endRow, threadSize } = config;
  console.log(`线程数:${threadSize}, 开始行:${startRow}, 结束行:${endRow}`);
  let finishCnt = 0;
  const finishBooks = [];
  const threadSize = config.threadSize;
  const thBookSize = (endRow - startRow) / threadSize
  const thBookSize = (endRow - startRow) / threadSize;
  const books = getBooksFromExcel(startRow, endRow);
  for (let sr = startRow; sr < endRow; sr += thBookSize) {
    let er = sr + thBookSize;
    if (er > endRow) {
@@ -478,15 +506,15 @@
      if (message.type === 'books') {
        finishBooks.push(...message.data);
        finishCnt++;
        if (finishCnt >= config.threadSize) {
        if (finishCnt >= threadSize) {
          saveBooks(finishBooks);
        }
      } else if (message.type === 'get-book') {
        worker.postMessage({ type: "book", data: books.shift() });
      }
    });
  }
} else {
  config.startRow = workerData.startRow;
  config.endRow = workerData.endRow;
  alreadyDownloadedBooks = workerData.alreadyDownloadedBooks;
  main();
}