lyg
2 天以前 22f370322412074174cde20ecfd14ec03657ab63
main.py
@@ -1,65 +1,34 @@
import math
import asyncio
import os
import sys
from knowledgebase.markitdown import MarkItDown
from doc_to_docx import doc_to_docx
from db_struct_flow import DbStructFlow, tc_data_generate
from knowledgebase.db.doc_db_helper import doc_dbh
from knowledgebase.doc.doc_processor import DocProcessor
from knowledgebase.doc.entity_helper import init_entity_helper
def process_docs(directory):
    # 遍历目录下的所有文件
    for filename in os.listdir(directory):
        # 判断是否为 doc 文件
        if filename.endswith(".doc"):
            # 转换为 docx
            doc_to_docx(directory + filename, directory + filename.replace(".doc", ".docx"))
def doc_split(project_path):
    docs_path = f'{project_path}/docs'
    files = os.listdir(docs_path)
    files = [f'{docs_path}/{x}' for x in filter(lambda x: x.endswith('.docx'), files)]
    for file in files:
        DocProcessor(file).process()
md = MarkItDown()
def to_markdown(dst_dir: str):
    text = ''
    # 遍历文件夹下的所有文件
    for file in os.listdir(dst_dir):
        # 判断是否为 docx 文件
        if file.endswith(".docx"):
            # 转换为 md
            result = md.convert(dst_dir + file)
            text = result.text_content
            out_file = dst_dir + file + '.md'
            with open(out_file, 'w', encoding='utf-8') as f:
                f.write(text)
    return out_file
# 1.解析文档
# 2.输入文档
# 3.启动LangFlow
def main():
    doc_dir = ".\\doc\\"
    # 处理文档
    # process_docs(doc_dir)
    # 文档转换为markdown
    md_file = to_markdown(doc_dir)
    md_file = 'D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\test.md'
    project_path = sys.argv[1]
    if not project_path:
        print("missing project path. eg: python main.py <path/to/project>")
        return
    # 拆分文档
    doc_dbh.set_project_path(project_path)
    init_entity_helper()
    # doc_split(project_path)
    # 启动大模型处理流程
    # ret_text = LangFlow([md_file]).run()
    # 保存结果
    # with open('D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\test.text', 'w', encoding='utf-8') as f:
    #     f.write(ret_text)
def get_bit_mask(start, end):
    bits = math.ceil((end + 1) / 8) * 8
    if bits == 0:
        bits = 8
    mask = 0
    for i in range(start, end + 1):
        mask |= 1 << (bits - i - 1)
    return mask
if __name__ == '__main__':
    asyncio.run(DbStructFlow(project_path).run())
    # 生成指令数据表
    tc_data_generate()
if __name__ == "__main__":
    main()