import { Client } from '@elastic/elasticsearch';
|
import * as fs from 'fs';
|
import * as mysql from 'mysql2';
|
import xlsx from "node-xlsx";
|
const INDEX_NAME = 'zlib_3';
|
|
const client = new Client({ node: 'http://localhost:9200' });
|
import * as rl from 'node:readline/promises'
|
|
async function createIndex() {
|
try {
|
if (await client.indices.exists({ index: INDEX_NAME })) { return; }
|
await client.indices.create({
|
index: INDEX_NAME,
|
body: {
|
mappings: {
|
properties: {
|
id: { type: 'keyword' },
|
zid: { type: 'keyword' },
|
ext: { type: 'text' },
|
title: { type: 'text' },
|
author: { type: 'text' },
|
isbn: { type: 'text' },
|
file: { type: 'text' },
|
filesize: { type: 'long' },
|
publisher: { type: 'text' },
|
language: { type: 'text' },
|
year: { type: 'text' },
|
pages: { type: 'text' }
|
},
|
},
|
}
|
});
|
} catch (e) {
|
console.error(e);
|
}
|
}
|
|
async function startImport() {
|
const file = "E:\\annas_archive_meta__aacid__zlib3_records__20240809T171652Z--20240824T040729Z.jsonl.seekable";
|
const rs = fs.createReadStream(file, 'utf-8');
|
const readLine = rl.createInterface(rs);
|
const books = [];
|
let cnt = 0;
|
let line = '';
|
for await (const line of readLine) {
|
const obj = JSON.parse(line);
|
let filesize = obj.metadata.filesize_reported;
|
if (!filesize) {
|
let size = obj.metadata.annabookinfo?.response?.filesize;
|
if (size) {
|
filesize = parseInt(size);
|
}
|
}
|
books.push({
|
id: obj.aacid ?? '',
|
zid: obj.metadata.zlibrary_id ?? 0,
|
ext: obj.metadata?.extension ?? '',
|
filesize: filesize ?? 0,
|
file: (obj.metadata?.md5_reported ?? '') + '.' + (obj.metadata?.extension ?? ''),
|
title: obj.metadata?.title ?? '',
|
author: obj.metadata?.author ?? '',
|
publisher: obj.metadata?.publisher ?? '',
|
language: obj.metadata?.language ?? '',
|
isbn: obj.metadata?.isbns?.[0] ?? '',
|
year: obj.metadata?.year ?? '',
|
pages: obj.metadata?.pages ?? ''
|
});
|
if (books.length % 100000 === 0) {
|
const body = books.map(book => ([{ index: { _index: INDEX_NAME, _id: book.id } }, book])).flat();
|
try {
|
const response = await client.bulk({ body });
|
console.log(response.errors);
|
cnt += books.length;
|
} catch (e) {
|
console.error(e);
|
}
|
console.log(`已导入${cnt}条记录`);
|
books.length = 0;
|
}
|
}
|
|
readLine.on('close', async () => {
|
const body = books.map(book => ([{ index: { _index: INDEX_NAME, _id: book.id } }, book])).flat();
|
try {
|
const response = await client.bulk({ body });
|
console.log(response.errors);
|
cnt += books.length;
|
} catch (e) {
|
console.error(e);
|
}
|
console.log(`已导入${cnt}条记录`);
|
books.length = 0;
|
connection.destroy();
|
console.log('导入完成');
|
});
|
}
|
// await createIndex();
|
// await startImport();
|
|
function getBookMap() {
|
const bookMap = new Map();
|
const bookLists = [
|
// "c:/Users/lyg/Downloads/书单z-0-100w.xlsx",
|
// "c:/Users/lyg/Downloads/书单z-100w-200w.xlsx",
|
// "c:/Users/lyg/Downloads/书单z-200w-300w.xlsx",
|
// "c:/Users/lyg/Downloads/书单z-300w-400w.xlsx",
|
// "c:/Users/lyg/Downloads/书单z-400w-500w.xlsx",
|
// "c:/Users/lyg/Downloads/书单z-500w-5027926.xlsx",
|
];
|
bookLists.forEach(bookList => {
|
const bookListData = xlsx.parse(bookList)[0].data;
|
bookListData.shift();
|
bookListData.forEach(book => {
|
const isbn = book[1]?.trim() ?? '';
|
const title = book[2]?.trim() ?? '';
|
const author = book[3]?.trim() ?? '';
|
bookMap.set(`${isbn}${title}${author}`, true);
|
});
|
});
|
return bookMap;
|
}
|
|
async function exportBooks() {
|
const hisBookMap = getBookMap();
|
const bookMap = new Map();
|
const responseQueue = [];
|
const books = [];
|
const response = await client.search({
|
index: INDEX_NAME,
|
scroll: '3h',
|
size: 10000,
|
_source: ['id', 'zid', 'language', 'title', 'author', 'publisher', 'isbn', 'ext', 'file'],
|
query: {
|
"bool": {
|
"must": [
|
{
|
"wildcard": {
|
isbn: "*"
|
},
|
},
|
{
|
"wildcard": {
|
"language": "*",
|
},
|
},
|
{
|
"range": {
|
"filesize": {
|
"gt": 1024,
|
"lt": 1024 * 1024 * 300
|
}
|
}
|
}
|
],
|
"must_not": [
|
{
|
"terms": {
|
"language": ["english", "en", "chinese", "cn"]
|
}
|
},
|
]
|
}
|
}
|
})
|
|
responseQueue.push(response);
|
let cnt = 0;
|
let bookCnt = 0;
|
let fileCnt = 1;
|
while (responseQueue.length) {
|
const body = responseQueue.shift()
|
|
body.hits.hits.forEach(function (hit) {
|
const bkId = `${hit._source.isbn}${hit._source.title}${hit._source.author}`;
|
if (hisBookMap.has(bkId) || bookMap.has(bkId)) {
|
return;
|
}
|
if (/^(\d{10,10}|\d{13,13})$/g.test(hit._source.isbn)) {
|
bookMap.set(bkId, hit._source);
|
bookCnt++;
|
books.push(hit._source);
|
if (books.length >= 500000) {
|
saveBooks(books, fileCnt++);
|
books.length = 0;
|
}
|
}
|
|
});
|
cnt += body.hits.hits.length;
|
|
if (body.hits.total.value === cnt && books.length) {
|
saveBooks(books, fileCnt++);
|
break;
|
}
|
|
responseQueue.push(
|
await client.scroll({
|
scroll_id: body._scroll_id,
|
scroll: '3h'
|
})
|
)
|
}
|
|
}
|
|
function saveBooks(books, fileCnt) {
|
let data = [['id', 'zid', '语种', '书名', '作者', '出版社', 'ISBN码', '文件格式', 'file']];
|
data = data.concat(books.map(bk => [bk.id, bk.zid, bk.language, bk.title, bk.author, bk.publisher, bk.isbn, bk.ext, bk.file]));
|
const buf = xlsx.build([{ "data": data }]);
|
fs.writeFileSync(`zlib3_books_${fileCnt}[${books.length}].xlsx`, buf, 'binary');
|
}
|
|
exportBooks();
|