lyg
2 天以前 22f370322412074174cde20ecfd14ec03657ab63
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# -*- coding: utf-8 -*-
# @file: doc_processor.py
# @author: lyg
# @date: 2025-5-13
# @version: 
# @description: 处理文档,拆分文档,将拆分后的章节保存到数据库中。
from langchain_core.messages import HumanMessage
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate
 
from knowledgebase.db.doc_db_models import TEntity
from knowledgebase.doc.docx_split import DocSplit
import asyncio
from knowledgebase.db.doc_db_helper import doc_dbh
from knowledgebase.doc.entity_helper import get_entity_helper
from knowledgebase.doc.entity_recognition import EntityRecognition
import os.path
 
from knowledgebase.doc.models import DocInfo, ParagraphInfo, DocType
from knowledgebase.llm import llm
from knowledgebase.log import Log
from knowledgebase import utils
 
 
class DocProcessor:
    def __init__(self, docx_file: str):
        """
        文档处理
        :param docx_file: 要处理的文档
        """
        Log.info(f'开始处理文档:{docx_file}')
        self.docx_file = docx_file
        self.doc_type = self.get_doc_type()
        self.doc_split = DocSplit(docx_file, self.doc_type)
        self.entity_recognition = EntityRecognition(self.doc_type)
        self.doc_id = 0
 
    def get_doc_type(self):
        entity_helper = get_entity_helper()
        Log.info(f'识别文档类型:{self.docx_file}')
        rules = ';\n'.join([f'- {it}:{entity_helper.doc_prompt_map[it]}' for it in entity_helper.doc_prompt_map.keys()])
        msg = HumanMessage(f'''
# 指令
请从下面的文件名中识别文档类型,如果识别失败不要输出任何字符。
文件名:{os.path.basename(self.docx_file)}
# 识别规则
{rules}
# 示例
遥测大纲
''')
        resp = llm.invoke([msg])
        Log.info(f'识别结果:{resp.content}')
        return resp.content
 
    async def get_tc_info(self, paragraph: ParagraphInfo):
        if self.doc_type not in [DocType.tc_format]:
            return ''
        prompt = HumanMessagePromptTemplate.from_template('''
# 指令
请从下面的文本中识别指令信息,如果识别失败不要输出任何字符。
指令信息包括:指令名称。
# 识别规则
- 文本内容为遥控指令数据域或遥控指令应用数据的定义描述。
# 约束
- 如果文本内容是目录则不要输出任何字符;
- 指令名称在章节标题中,提取指令名称要和文本中的严格一致;
- 如果没有识别到指令信息不要输出任何字符;
- 识别失败,不要输出任何内容,包括解释性文本;
- 输出json格式。
# 示例 - 识别到指令
{{
    "name": "xxx"
}}
# 示例 - 未识别到指令
""
# 文本内容:
{text}
''')
        chain = prompt.prompt | llm | JsonOutputParser()
        resp = await chain.ainvoke({"text": paragraph.full_text})
        import json
        # Log.info(f'>>>>>>指令识别:\n{paragraph.full_text}')
        # Log.info(f'<<<<<<指令:{json.dumps(resp, ensure_ascii=False)}')
        return resp
 
    async def get_tm_pkt_info(self, paragraph: ParagraphInfo):
        if self.doc_type not in [DocType.tm_outline, DocType.tm_pkt_design]:
            return ''
        prompt = HumanMessagePromptTemplate.from_template('''
# 指令
识别遥测包信息,请从下面的文本中识别遥测包信息,如果识别失败不要输出任何字符。
识别规则:章节标题中包含包名称和代号,章节内容为表格,表格中包括包头定义和包参数定义。
提取的遥测包信息包括:包名称,包代号。
# 约束
- 如果文本内容是目录则不要输出任何字符;
- 文本描述的内容是单个遥测包,如果有多个遥测包则不要输出任何字符;
- 文本结构通常是:包名称、代号和APID(应用过程标识)在开头(应用过程标识也有可能在表格中),后面紧接着是包头和参数定义表;
- 如果没有识别到遥测包信息不要输出任何字符;
- 识别失败,不要输出任何内容,包括解释性文本;
- 输出json格式。
# 符合要求的文本结构1
1.1.1 code xxx包(APID=0x123)
```json
表格内容
``` 
# 符合要求的文本结构2
1.1.1 code xxx包
```json
表格内容
应用过程标识
...
``` 
# 示例 - 识别到数据包
{{
    "name": "xxx包",
    "code": "TMS001"
}}
# 示例 - 未识别到数据包
""
# 文本内容:
{text}
''')
        chain = prompt.prompt | llm | JsonOutputParser()
        resp = await chain.ainvoke({"text": paragraph.full_text})
        return resp
 
    async def get_chapter_refs(self, paragraph: ParagraphInfo, toc: [str]) -> [str]:
        if self.doc_type not in [DocType.tc_format]:
            return ''
        toc_text = '\n'.join(toc)
        prompt = HumanMessagePromptTemplate.from_template(f'''
# 角色
你是一名资深的软件工程师。
# 指令
帮助我完成对文本中引用关系的抽取,判断当前文本中是否包含了引用信息,例如包含以下关键字:“详见1.1”、“见1.1”、“具体见1.1”、“见附录”等。
如果包含引用,将引用与“目录内容”中的目录条目进行匹配。
将匹配到的目录条目输出,输出格式为json格式。
# 约束
- 是否包含引用的判断条件中必须包含引用相关的描述,例如:“详见1.1”、“见1.1”、“具体见1.1”、“见附录”等;
- 注意不要自己引用自己;
- 仅提取目录内容中包含的条目,如果目录内容不包含则不提取;
- 如果仅靠标题号码无法确定目录条目的,根据文本内容匹配对应的目录条目;
- 输出的内容必须是目录中的条目;
- 输出json格式,不要输出任何json以外的字符。
# 输出案例
["1.1 xxx"]
# 目录内容:
{toc_text}
# 文本内容:
{{text}}
''')
        chain = prompt.prompt | llm | JsonOutputParser()
        resp = await chain.ainvoke({"text": paragraph.full_text})
        return resp
 
    async def gen_chapter_entities(self, paragraph: ParagraphInfo, paragraphs: [ParagraphInfo], toc: [str]):
        # 获取章节实体词
        entity_names_task = self.entity_recognition.run(paragraph.full_text)
        # 获取指令信息
        cmd_task = self.get_tc_info(paragraph)
        # 获取遥测包信息
        pkt_task = self.get_tm_pkt_info(paragraph)
        # 获取文档引用
        refs_task = self.get_chapter_refs(paragraph, toc)
        entity_names, cmd, pkt, chapter_refs = await asyncio.gather(entity_names_task, cmd_task, pkt_task, refs_task)
 
        Log.info(f'章节{paragraph.title_num}实体词:{entity_names}')
        Log.info(f'章节{paragraph.title_num}引用:{chapter_refs}')
        if entity_names:
            paragraph.entities = doc_dbh.get_entities_by_names(entity_names)
 
        if pkt:
            entity = TEntity(name=pkt['code'], type='遥测包配置', prompts='', doc_type='')
            e = doc_dbh.get_entity(entity)
            if e:
                entity.id = e.id
            else:
                doc_dbh.add_entity(entity)
                Log.info(f"新增Entity:{entity.name},id:{entity.id}")
            paragraph.entities.append(entity)
 
        if cmd:
            entity = TEntity(name=cmd['name'], type='指令格式配置', prompts='', doc_type='')
            e = doc_dbh.get_entity(entity)
            if e:
                entity.id = e.id
            else:
                doc_dbh.add_entity(entity)
                Log.info(f"新增Entity:{entity.name},id:{entity.id}")
            paragraph.entities.append(entity)
        # 获取引用信息
        if chapter_refs:
            for ref in chapter_refs:
                _p = next(filter(lambda p: ref == p.title, self.doc_split.paragraphs), None)
                if _p:
                    if paragraph != _p:
                        paragraph.refs.append(_p)
 
    def process(self):
        self.doc_split.split()
        # 分批并发处理,每批10个
        tasks = []
        toc = []
        for p in self.doc_split.paragraphs:
            if p.title_level:
                toc.append(p.title)
        for paragraph in self.doc_split.paragraphs:
            tasks.append(self.gen_chapter_entities(paragraph, self.doc_split.paragraphs, toc))
 
        async def run():
            await asyncio.gather(*tasks)
 
        asyncio.run(run())
        # 保存到数据库
        self.save_to_db()
 
    def save_to_db(self):
        """
        保存段落和段落实体词关系到数据库。
        """
        Log.info('保存段落和段落实体词关系到数据库...')
        with open(self.docx_file, 'rb') as f:
            file_bytes = f.read()
            md5 = utils.generate_bytes_md5(file_bytes)
        doc = DocInfo(os.path.basename(self.docx_file), md5, self.doc_type, self.doc_split.paragraph_tree)
        self.doc_id = doc_dbh.add_doc(doc)
        for paragraph in doc.paragraphs:
            doc_dbh.add_paragraph(self.doc_id, None, paragraph)
        for paragraph in self.doc_split.paragraphs:
            for ref_paragraph in paragraph.refs:
                doc_dbh.add_paragraph_ref_link(paragraph.id, ref_paragraph.id)
                Log.info(f"{paragraph.title} 引用了-> {ref_paragraph.title}")
        Log.info('保存段落和段落实体词关系到数据库完成')