Merge branch 'master' of http://182.92.203.7:2001/r/KnowledgeBase
| | |
| | | self.session.commit() |
| | | return paragraph_entity_link.id |
| | | |
| | | def get_entity(self, entity): |
| | | ret = self.session.query(TEntity).where( |
| | | TEntity.name == entity.name and TEntity.type == entity.type and TEntity.doc_type == entity.doc_type).first() |
| | | if ret: |
| | | return ret |
| | | |
| | | def add_entity(self, entity): |
| | | """ |
| | | 添加实体 |
| | |
| | | def get_docs(self) -> list[TDoc]: |
| | | return self.session.query(TDoc).all() |
| | | |
| | | def get_text_with_entities(self, entity_names: list[str]) -> str: |
| | | def get_texts_with_entities(self, entity_names: list[str]): |
| | | """ |
| | | 根据实体词获取文本内容 |
| | | 根据实体词获取文本内容列表 |
| | | :param entity_names: list[str] - 实体词 |
| | | :return: str - 文本 |
| | | :return: list[str] - 文本列表 |
| | | """ |
| | | if not entity_names: |
| | | return "" |
| | |
| | | _entitie_ids = [entity.id for entity in _entities] |
| | | links = self.session.query(TParagraphEntityLink).where(TParagraphEntityLink.entity_id.in_(_entitie_ids)).all() |
| | | _paragraphs = [link.paragraph for link in links] |
| | | return [self.get_paragraph_full_text(p) for p in _paragraphs] |
| | | def get_text_with_entities(self, entity_names: list[str]) -> str: |
| | | """ |
| | | 根据实体词获取文本内容 |
| | | :param entity_names: list[str] - 实体词 |
| | | :return: str - 文本 |
| | | """ |
| | | texts = self.get_texts_with_entities(entity_names) |
| | | return '\n'.join(texts) |
| | | |
| | | return '\n'.join([self.get_paragraph_full_text(p) for p in _paragraphs]) |
| | | def get_entities_by_names(self, names: list[str]): |
| | | _entities = self.session.query(TEntity).where(TEntity.name.in_(names)).all() |
| | | return _entities |
| | | |
| | | def get_paragraph_full_text(self, p: TParagraph): |
| | | result = p.text if p.title_level == 0 else p.title_num + ' ' + p.text |
| | | return result + '\n' + '\n'.join([self.get_paragraph_full_text(p) for p in p.children]) |
| | | |
| | | def get_entities_by_doc_type(self, doc_type): |
| | | _entities = self.session.query(TEntity).where(TEntity.doc_type == doc_type).all() |
| | | return _entities |
| | | |
| | | def commit(self): |
| | | self.session.commit() |
| | | |
| | | |
| | | doc_dbh = DocDbHelper() |
| | | |
| | | # if __name__ == '__main__': |
| | | # text = doc_dbh.get_text_with_entities(['遥控包格式']) |
| | | # print(text) |
| | | # doc_db = DocDbHelper() |
| | | # # doc_db.insert_entities() |
| | | # doc = doc_db.add_doc(DocInfo(file='aaa', file_name='test')) |
| | | # p1 = doc_db.add_paragraph(doc.id, None, ParagraphInfo(text='test1', title_level=1, num=1, num_level=1)) |
| | | # p2 = doc_db.add_paragraph(doc.id, p1.id, ParagraphInfo(text='test2', title_level=2, num=1, num_level=2)) |
| | | # p3 = doc_db.add_paragraph(doc.id, p2.id, ParagraphInfo(text='test3', title_level=3, num=1, num_level=3)) |
| | | # doc_db.add_paragraph_ref_link(TParagraphRefLink(parent_id=p1.id, child_id=p3.id)) |
| | |
| | | is_del = Column(Integer) |
| | | |
| | | |
| | | # class TTmPacket(Base): |
| | | # __tablename__ = 't_tm_packets' |
| | | # id = Column(Integer, primary_key=True) |
| | | # name = Column(Text) |
| | | # code = Column(Text) |
| | | # apid = Column(Integer) |
| | | # is_del = Column(Integer) |
| | | # |
| | | # |
| | | # class TTmPacketParagraphLink(Base): |
| | | # __tablename__ = 't_tm_packet_paragraph_links' |
| | | # id = Column(Integer, primary_key=True) |
| | | # tm_packet_id = Column(Integer, ForeignKey('t_tm_packets.id')) |
| | | # paragraph_id = Column(Integer, ForeignKey('t_paragraphs.id')) |
| | | # tm_packet = relationship("TTmPacket", foreign_keys=[tm_packet_id], uselist=False) |
| | | # paragraph = relationship("TParagraph", foreign_keys=[paragraph_id], uselist=False) |
| | | # is_del = Column(Integer) |
| | | |
| | | |
| | | def init_doc_db(): |
| | | """ |
| | | 初始化文档数据库 |
| | |
| | | # mysql |
| | | Log.info("连接并初始化文档数据库...") |
| | | engine = create_engine('mysql+pymysql://root:123456@192.168.3.145:3306/knowledgebase', echo=False) |
| | | # engine = create_engine('sqlite:///doc_db.db', echo=False) |
| | | Base.metadata.create_all(engine) |
| | | SessionFactory = sessionmaker(bind=engine) |
| | | Session = scoped_session(SessionFactory) |
| | |
| | | # @version: |
| | | # @description: 处理文档,拆分文档,将拆分后的章节保存到数据库中。 |
| | | from langchain_core.messages import HumanMessage |
| | | from langchain_core.output_parsers import JsonOutputParser |
| | | from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate |
| | | |
| | | from knowledgebase.db.doc_db_models import TEntity |
| | | from knowledgebase.doc.docx_split import DocSplit |
| | | import asyncio |
| | | from knowledgebase.db.doc_db_helper import doc_dbh |
| | |
| | | from knowledgebase.doc.entity_recognition import EntityRecognition |
| | | import os.path |
| | | |
| | | from knowledgebase.doc.models import DocInfo, ParagraphInfo |
| | | from knowledgebase.doc.models import DocInfo, ParagraphInfo, DocType |
| | | from knowledgebase.llm import llm |
| | | from knowledgebase.log import Log |
| | | from knowledgebase import utils |
| | |
| | | """ |
| | | Log.info(f'开始处理文档:{docx_file}') |
| | | self.docx_file = docx_file |
| | | self.doc_split = DocSplit(docx_file) |
| | | self.doc_type = self.get_doc_type() |
| | | self.doc_split = DocSplit(docx_file, self.doc_type) |
| | | self.entity_recognition = EntityRecognition(self.doc_type) |
| | | self.doc_id = 0 |
| | | |
| | |
| | | Log.info(f'识别结果:{resp.content}') |
| | | return resp.content |
| | | |
| | | async def gen_sect_entities(self, paragraph: ParagraphInfo): |
| | | # Log.info(f'生成章节实体词:{paragraph.full_text}') |
| | | def get_tm_pkt_info(self, paragraph: ParagraphInfo): |
| | | if self.doc_type not in [DocType.tm_outline, DocType.tm_pkt_design]: |
| | | return '' |
| | | prompt = HumanMessagePromptTemplate.from_template(''' |
| | | # 指令 |
| | | 识别遥测包信息,请从下面的文本中识别遥测包信息,如果识别失败不要输出任何字符。 |
| | | 识别规则:章节标题中包含包名称和代号,章节内容为表格,表格中包括包头定义和包参数定义。 |
| | | 提取的遥测包信息包括:包名称,包代号,APID。 |
| | | # 约束 |
| | | - 如果文本内容是目录则不要输出任何字符; |
| | | - 文本描述的内容是单个遥测包,如果有多个遥测包则不要输出任何字符; |
| | | - 文本结构通常是:包名称、代号和APID在开头,后面紧接着是包头和参数定义表; |
| | | - 如果没有识别到遥测包信息不要输出任何字符; |
| | | - 识别失败,不要输出任何内容,包括解释性文本; |
| | | - 输出json格式。 |
| | | # 复合要求的文本结构 |
| | | 1.1.1 code xxx包(APID=0x123) |
| | | ```json |
| | | 表格内容 |
| | | ``` |
| | | # 示例 - 识别到数据包 |
| | | {{ |
| | | "name": "xxx包", |
| | | "code": "xxx", |
| | | "apid": 123 |
| | | }} |
| | | # 示例 - 未识别到数据包 |
| | | "" |
| | | # 文本内容: |
| | | {text} |
| | | ''') |
| | | chain = prompt.prompt | llm | JsonOutputParser() |
| | | resp = chain.invoke({"text": paragraph.full_text}) |
| | | return resp |
| | | |
| | | async def gen_chapter_entities(self, paragraph: ParagraphInfo): |
| | | # 获取章节实体词 |
| | | entities = await asyncio.to_thread(lambda: self.entity_recognition.run(paragraph.full_text)) |
| | | Log.info(f'章节实体词:{entities}') |
| | | if entities: |
| | | paragraph.entities = [next(filter(lambda x: x.name == e, entity_helper.entities), None) for e in entities] |
| | | paragraph.entities = [e for e in paragraph.entities if e] |
| | | entity_names = await asyncio.to_thread(lambda: self.entity_recognition.run(paragraph.full_text)) |
| | | Log.info(f'章节{paragraph.title_num}实体词:{entity_names}') |
| | | if entity_names: |
| | | paragraph.entities = doc_dbh.get_entities_by_names(entity_names) |
| | | # 获取遥测包信息 |
| | | pkt = self.get_tm_pkt_info(paragraph) |
| | | if pkt: |
| | | entity = TEntity(name=pkt['code'], type='遥测包配置', prompts='', doc_type='') |
| | | e = doc_dbh.get_entity(entity) |
| | | if e: |
| | | entity.id = e.id |
| | | return e |
| | | doc_dbh.add_entity(entity) |
| | | Log.info(f"新增Entity:{entity.name},id:{entity.id}") |
| | | paragraph.entities.append(entity) |
| | | |
| | | def process(self): |
| | | self.doc_split.split() |
| | |
| | | batch_paragraphs = self.doc_split.paragraphs[i:i + batch_size] |
| | | tasks = [] |
| | | for paragraph in batch_paragraphs: |
| | | tasks.append(self.gen_sect_entities(paragraph)) |
| | | tasks.append(self.gen_chapter_entities(paragraph)) |
| | | |
| | | async def run(): |
| | | await asyncio.gather(*tasks) |
| | |
| | | for paragraph in doc.paragraphs: |
| | | doc_dbh.add_paragraph(self.doc_id, None, paragraph) |
| | | Log.info('保存段落和段落实体词关系到数据库完成') |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | files = [ |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机1553B总线传输通信帧分配(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机分系统遥测源包设计报告(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机软件用户需求(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机遥测大纲(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机遥测信号分配表(公开).docx", |
| | | # r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机指令格式与编码定义(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\指令格式(公开).docx" |
| | | ] |
| | | for file in files: |
| | | doc_processor = DocProcessor(file) |
| | | doc_processor.process() |
| | | |
| | | # doc_dbh.get_docs() |
| | |
| | | |
| | | """ |
| | | |
| | | def __init__(self, docx_file: str): |
| | | def __init__(self, docx_file: str, docx_type: str): |
| | | """ |
| | | docx文档拆分 |
| | | :param docx_file: 要拆分的docx文件路径 |
| | | :param docx_type: 文档类型 |
| | | """ |
| | | self.docx_file = docx_file |
| | | self.docx_type = docx_type |
| | | self.image_to_text = ImageToText() |
| | | self.paragraphs: list[ParagraphInfo] = [] |
| | | self.paragraph_tree: list[ParagraphInfo] = [] |
| | |
| | | # 替换原始列表内容,避免多次 remove 操作 |
| | | self.paragraphs[:] = _paragraphs |
| | | self.paragraph_tree = result |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | docx_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机1553B总线传输通信帧分配(公开).docx' |
| | | # docx_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\table_test.docx' |
| | | doc_split = DocSplit(docx_file) |
| | | doc_split.split() |
| | | # er = EntityRecognition() |
| | | # db = Neo4jHelper() |
| | | # for trunk in doc_split.trunks: |
| | | # print('段落文本:') |
| | | # print(trunk) |
| | | # print('实体词:') |
| | | # print(er.run(trunk)) |
| | | # entities = er.run(trunk) |
| | | # db.create_page_node() |
| | | print("\n".join([x.full_text_with_children for x in doc_split.paragraphs])) |
| | | print() |
| | |
| | | class EntityHelper: |
| | | # 文档类型和识别提示词map |
| | | doc_prompt_map: dict |
| | | # 所有实体 |
| | | entities: list[TEntity] |
| | | |
| | | def __init__(self): |
| | | Log.info("初始化EntityHelper") |
| | | current_dir = os.path.dirname(__file__) |
| | | self.entities = doc_dbh.get_all_entities() |
| | | self.doc_prompt_map = {} |
| | | entity_names = [entity.name for entity in self.entities] |
| | | with open(f'{current_dir}/../../tpl/entities.json', 'r', encoding='utf-8') as f: |
| | | text = f.read() |
| | | obj = json.loads(text) |
| | |
| | | prompts = obj2[doc_ty]['prompts'] |
| | | self.doc_prompt_map[doc_ty] = prompts |
| | | for entity in obj2[doc_ty]['entities']: |
| | | if entity in entity_names: |
| | | continue |
| | | _entity = TEntity(name=entity, type=ty, doc_type=doc_ty, |
| | | prompts=obj2[doc_ty]['entities'][entity]) |
| | | if doc_dbh.get_entity(_entity): |
| | | continue |
| | | doc_dbh.add_entity(_entity) |
| | | self.entities.append(_entity) |
| | | Log.info(f"新增Entity:{entity},id:{_entity.id}") |
| | | |
| | | |
| | | entity_helper = EntityHelper() |
| | |
| | | import json |
| | | |
| | | from knowledgebase import utils |
| | | from knowledgebase.doc.entity_helper import entity_helper |
| | | from knowledgebase.db.doc_db_helper import doc_dbh |
| | | from knowledgebase.log import Log |
| | | |
| | | llm = ChatOpenAI(temperature=0, |
| | |
| | | |
| | | def __init__(self, doc_type: str): |
| | | # 实体词列表 |
| | | entities = list(filter(lambda x: x.doc_type == doc_type, entity_helper.entities)) |
| | | entities = doc_dbh.get_entities_by_doc_type(doc_type) |
| | | entity_list = ','.join([entity.name for entity in entities]) + "。" |
| | | entity_rules = ";\n".join([f"- {entity.name}:{entity.prompts}" for entity in entities]) + "。" |
| | | tpl = """ |
| | |
| | | # @description: 文档相关数据类 |
| | | from dataclasses import dataclass |
| | | import typing |
| | | from enum import Enum |
| | | |
| | | from knowledgebase.db.doc_db_models import TEntity |
| | | |
| | |
| | | self.file = file |
| | | self.file_type = file_type |
| | | self.paragraphs: typing.List[ParagraphInfo] = paragraphs |
| | | |
| | | |
| | | class _DocType: |
| | | tm_outline = '遥测大纲' |
| | | user_requirements = '用户需求' |
| | | tm_pkt_design = '源包设计' |
| | | bus_comm_proto = '总线通信协议' |
| | | tc_format = '指令格式' |
| | | tc_cmd_table = '遥控指令表' |
| | | |
| | | |
| | | DocType = _DocType() |
| | |
| | | """ |
| | | return doc_dbh.get_text_with_entities(entity_names) |
| | | |
| | | @staticmethod |
| | | def get_texts_with_entity(entity_names: list[str]) -> list[str]: |
| | | """ |
| | | 根据实体词获取文档文本 |
| | | :param entity_names: str - 实体词名称 |
| | | :return: str - 文本内容 |
| | | """ |
| | | return doc_dbh.get_texts_with_entities(entity_names) |
| | | |
| | | def run(self): |
| | | # 根据文档,生成结构化数据 |
| | | self.handle_tm_structured_data() |
| | |
| | | def validation(gen_text): |
| | | vcs = json.loads(gen_text) |
| | | assert next(filter(lambda it: re.match('^[0-1]+$', it['VCID']), vcs)), '生成的VCID必须是二进制' |
| | | |
| | | doc_text = self.get_text_with_entity(['虚拟信道定义']) |
| | | result = self.call_model(_msg, 'out/' + dev.code + '_虚拟信道.json', doc_text, validation) |
| | | Log.info('虚拟信道:' + result) |
| | |
| | | pkts = json.loads(gen_text) |
| | | assert len(pkts), 'VC源包列表不能为空' |
| | | |
| | | text = self.call_model(_msg, 'out/' + dev.code + '_遥测源包下传时机.json', ['遥测源包下传时机'], validation) |
| | | doc_text = self.get_text_with_entity(['遥测源包下传时机']) |
| | | text = self.call_model(_msg, 'out/' + dev.code + '_遥测源包下传时机.json', doc_text, validation) |
| | | Log.info('遥测源包所属虚拟信道:' + text) |
| | | return json.loads(text) |
| | | |
| | |
| | | } |
| | | ] |
| | | """ |
| | | result = self.call_model(_msg, 'out/' + dev.code + '_源包列表.json', ['这里是文档中抽取的内容']) |
| | | doc_text = self.get_text_with_entity(['源包列表']) |
| | | result = self.call_model(_msg, 'out/' + dev.code + '_源包列表.json', doc_text) |
| | | Log.info('遥测源包列表:' + result) |
| | | return json.loads(result) |
| | | |
| | |
| | | # 例子: |
| | | {"last_par_pos":128, "par_num": 20} |
| | | """ |
| | | text = self.call_model(_msg, '', ['这里是文档中抽取的内容']) |
| | | doc_text = self.get_text_with_entity([pkt_id]) |
| | | text = self.call_model(_msg, '', doc_text) |
| | | result = json.loads(text) |
| | | last_par_pos = result['last_par_pos'] |
| | | par_num = result['par_num'] |
| | |
| | | ] |
| | | """ |
| | | |
| | | def validation(gen_text): |
| | | def _validation(gen_text): |
| | | _pkt = json.loads(gen_text) |
| | | with open(f'out/tmp/{time.time()}.json', 'w') as f: |
| | | f.write(gen_text) |
| | |
| | | # assert par_num == len(_pkt['datas']), f'数据域参数个数不对!预计{par_num}个,实际{len(_pkt["datas"])}' |
| | | assert last_par_pos == _pkt['datas'][-1]['pos'], '最后一个参数的字节位置不对!' |
| | | |
| | | result = self.call_model(_msg, f'out/数据包-{pkt_name}.json', [], ['这里是文档中抽取的内容'], validation) |
| | | result = self.call_model(_msg, f'out/数据包-{pkt_name}.json', doc_text, _validation) |
| | | Log.info(f'数据包“{pkt_name}”信息:' + result) |
| | | pkt = json.loads(result) |
| | | else: |
| | |
| | | return pkt |
| | | |
| | | def gen_bus(self): |
| | | _msg = """ |
| | | # 指令 |
| | | 我需要从文档中提取经总线的数据包列表,你要帮助我完成经总线的数据包列表的提取。 |
| | | # 需求 |
| | | 请析文档,列出总线通信包传输约定中描述的所有数据包列表, |
| | | 数据包字段包括:id(数据包代号)、name(数据包名称)、apid(16进制字符串)、service(服务子服务)、length(bit长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、 |
| | | transSer(传输服务)、note(备注)、rtAddr(所属RT的地址十进制)、rt(所属rt名称)、throughBus(是否经过总线)、burst(是否突发)、transDirect(传输方向)。 |
| | | # 约束 |
| | | - frameNum:使用文档中的文本不要做任何转换; |
| | | - subAddr:值为“深度”、“平铺”、“数字”或null; |
| | | - 是否经过总线的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线; |
| | | - 传输服务分三种:SetData(置数)、GetData(取数)、DataBlock(数据块传输); |
| | | - 传输方向分”收“和”发“,传输服务如果是”取数“是”收“,如果是”数据块传输“则根据包所在的分系统以及表格的”传输方向“列进行判断,判断对于SMU来说是收还是发; |
| | | - 是否突发:根据表格中的”传输周期“列进行判断,如果填写了类似”突发“的文字表示是突发否则表示不是突发; |
| | | - 不要漏掉任何一个数据包; |
| | | - 数据结构最外层是数组,数组元素为数据包,以JSON格式输出,不要输出JSON以外的任何文本。 |
| | | # 例子 |
| | | [ |
| | | { |
| | | "id": "PCS005", |
| | | "name": "总线管理(内部指令)", |
| | | "apid": "418", |
| | | "service": "(1, 2)", |
| | | "length": 1, |
| | | "interval": 1000, |
| | | "subAddr": null, |
| | | "frameNum": "1|2", |
| | | "transSer": "DataBlock", |
| | | "note": "", |
| | | "rtAddr": 28, |
| | | "rt": "数据接口单元XIU", |
| | | "throughBus": true, |
| | | "burst": true, |
| | | "transDirect": "发" |
| | | } |
| | | ] |
| | | """ |
| | | self.bus_pkts = [] |
| | | doc_text_list = self.get_texts_with_entity(['分系统源包']) |
| | | for doc_text in doc_text_list: |
| | | _msg = """ |
| | | # 指令 |
| | | 我需要从文档中提取经总线的数据包列表,你要帮助我完成经总线的数据包列表的提取。 |
| | | # 需求 |
| | | 请析文档,列出总线通信包传输约定中描述的所有数据包列表, |
| | | 数据包字段包括:id(数据包代号)、name(数据包名称)、apid(16进制字符串)、service(服务子服务)、length(bit长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、 |
| | | transSer(传输服务)、note(备注)、rtAddr(所属RT的地址十进制)、rt(所属rt名称)、throughBus(是否经过总线)、burst(是否突发)、transDirect(传输方向)。 |
| | | # 约束 |
| | | - frameNum:使用文档中的文本不要做任何转换; |
| | | - subAddr:值为“深度”、“平铺”、“数字”或null; |
| | | - 是否经过总线的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线; |
| | | - 传输服务分三种:SetData(置数)、GetData(取数)、DataBlock(数据块传输); |
| | | - 传输方向分”收“和”发“,传输服务如果是”取数“是”收“,如果是”数据块传输“则根据包所在的分系统以及表格的”传输方向“列进行判断,判断对于SMU来说是收还是发; |
| | | - 是否突发:根据表格中的”传输周期“列进行判断,如果填写了类似”突发“的文字表示是突发否则表示不是突发; |
| | | - 不要漏掉任何一个数据包; |
| | | - 数据结构最外层是数组,数组元素为数据包,以JSON格式输出,不要输出JSON以外的任何文本。 |
| | | # 例子 |
| | | [ |
| | | { |
| | | "id": "PCS005", |
| | | "name": "总线管理(内部指令)", |
| | | "apid": "418", |
| | | "service": "(1, 2)", |
| | | "length": 1, |
| | | "interval": 1000, |
| | | "subAddr": null, |
| | | "frameNum": "1|2", |
| | | "transSer": "DataBlock", |
| | | "note": "", |
| | | "rtAddr": 28, |
| | | "rt": "数据接口单元XIU", |
| | | "throughBus": true, |
| | | "burst": true, |
| | | "transDirect": "发" |
| | | } |
| | | ] |
| | | """ |
| | | |
| | | def validation(gen_text): |
| | | json.loads(gen_text) |
| | | def validation(gen_text): |
| | | json.loads(gen_text) |
| | | |
| | | result = self.call_model(_msg, 'out/总线.json', ['这里是文档中抽取的内容'], validation) |
| | | Log.info('总线数据包:' + result) |
| | | result = self.call_model(_msg, 'out/总线.json', doc_text, validation) |
| | | Log.info('总线数据包:' + result) |
| | | |
| | | pkts = json.loads(result) |
| | | # 筛选经总线的数据包 |
| | | pkts = list(filter(lambda it: it['throughBus'], pkts)) |
| | | # 筛选有apid的数据包 |
| | | pkts = list(filter(lambda it: it['apid'], pkts)) |
| | | pkts = json.loads(result) |
| | | # 筛选经总线的数据包 |
| | | pkts = list(filter(lambda it: it['throughBus'], pkts)) |
| | | # 筛选有apid的数据包 |
| | | pkts = list(filter(lambda it: it['apid'], pkts)) |
| | | |
| | | pkts2 = [] |
| | | # todo 这一步应该通过数据库筛选,数据库中已经有所有遥测包以及遥测包对应的定义段落文本 |
| | | for pkt in pkts: |
| | | if self.pkt_in_tm_pkts(pkt["name"]): |
| | | pkts2.append(pkt) |
| | | for pkt in pkts2: |
| | | self.gen_pkt_details(pkt['name'], pkt['id']) |
| | | _pkt = self.gen_pkt_details(pkt['name'], pkt['id']) |
| | | if _pkt: |
| | | pkt['children'] = [] |
| | | pkt['children'].extend(_pkt['datas']) |
| | | pkt['length'] = _pkt['length'] |
| | | self.bus_pkts = pkts |
| | | |
| | | def pkt_in_tm_pkts(self, pkt_name): |
| | | _msg = f""" |
| | | # 指令 |
| | | 我需要从文档中分析判读是否有某个遥测包的字段表描述,你要帮助我判断。 |
| | | # 问题 |
| | | 文档中有遥测包“{pkt_name}”的字段表描述吗? |
| | | 注意:遥测包的字段表紧接着遥测包章节标题,如果章节标题后面省略了或者详见xxx则是没有字段表描述。 |
| | | # 约束 |
| | | - 根据文档内容输出; |
| | | - 遥测包名称必须完全匹配; |
| | | - 输出“无”或“有”,不要输出其他任何内容。 |
| | | # 例子 |
| | | 有 |
| | | """ |
| | | text = self.call_model(_msg, f'out/pkts/有无数据包-{pkt_name}.txt', ['这里是文档中抽取的内容']) |
| | | Log.info(f'文档中有无“{pkt_name}”的字段描述:' + text) |
| | | return text == '有' |
| | | # pkts2 = [] |
| | | # todo 这一步应该通过数据库筛选,数据库中存储了每个数据包的代号实体 |
| | | # for pkt in pkts: |
| | | # if self.pkt_in_tm_pkts(pkt["name"]): |
| | | # pkts2.append(pkt) |
| | | for pkt in pkts: |
| | | self.gen_pkt_details(pkt['name'], pkt['id']) |
| | | _pkt = self.gen_pkt_details(pkt['name'], pkt['id']) |
| | | if _pkt: |
| | | pkt['children'] = [] |
| | | pkt['children'].extend(_pkt['datas']) |
| | | pkt['length'] = _pkt['length'] |
| | | self.bus_pkts.extend(pkts) |
| | | |
| | | # endregion 遥测-end |
| | | |
| | |
| | | def validation(gen_text): |
| | | json.loads(gen_text) |
| | | |
| | | text = self.call_model(_msg, 'out/tc_transfer_frame.json', ['这里是文档中抽取的内容'], validation) |
| | | doc_text = self.get_text_with_entity(['遥控帧格式']) |
| | | text = self.call_model(_msg, 'out/tc_transfer_frame.json', doc_text, validation) |
| | | result: dict = json.loads(text) |
| | | format_text = utils.read_from_file('tpl/tc_transfer_frame.json') |
| | | format_text = utils.replace_tpl_paras(format_text, result) |
| | |
| | | def validation(gen_text): |
| | | json.loads(gen_text) |
| | | |
| | | text = self.call_model(_msg, 'out/tc_transfer_pkt.json', ['这里是文档中抽取的内容'], validation) |
| | | doc_text = self.get_text_with_entity(['遥控包格式']) |
| | | text = self.call_model(_msg, 'out/tc_transfer_pkt.json', doc_text, validation) |
| | | result = json.loads(text) |
| | | |
| | | format_text = utils.read_from_file('tpl/tc_pkt_format.json') |
| | |
| | | return pkt_format |
| | | |
| | | def gen_tc_transfer_pkts(self): |
| | | _msg = ''' |
| | | # 指令 |
| | | 分析文档列出所有的遥控源包。 |
| | | # 输出例子: |
| | | [{ |
| | | "name": "xxx", |
| | | "code":"pkt", |
| | | "应用过程标识符":"0xAA", |
| | | "服务类型":"0x1", |
| | | "服务子类型":"0x2" |
| | | }] |
| | | ''' |
| | | doc_text_list = self.get_texts_with_entity(['APID分配']) |
| | | pkts = [] |
| | | for doc_text in doc_text_list: |
| | | _msg = ''' |
| | | # 指令 |
| | | 分析文档列出所有的遥控源包。 |
| | | # 输出例子: |
| | | [{ |
| | | "name": "xxx", |
| | | "code":"pkt", |
| | | "应用过程标识符":"0xAA", |
| | | "服务类型":"0x1", |
| | | "服务子类型":"0x2" |
| | | }] |
| | | ''' |
| | | |
| | | def validation(gen_text): |
| | | json.loads(gen_text) |
| | | def validation(gen_text): |
| | | json.loads(gen_text) |
| | | |
| | | text = self.call_model(_msg, 'out/tc_transfer_pkts.json', ['这里是文档中抽取的内容'], validation) |
| | | Log.info('遥控包列表:' + text) |
| | | return json.loads(text) |
| | | text = self.call_model(_msg, 'out/tc_transfer_pkts.json', doc_text, validation) |
| | | Log.info('遥控包列表:' + text) |
| | | pkts.extend(json.loads(text)) |
| | | return pkts |
| | | |
| | | def gen_tc_pkt_details(self, pkt): |
| | | tc_name = pkt['name'] |
| | |
| | | logger.setLevel(logging.DEBUG) |
| | | |
| | | # 创建一个文件处理器 |
| | | file_handler = logging.FileHandler('logs.log') |
| | | file_handler = logging.FileHandler('logs.log', encoding='utf-8') |
| | | file_handler.setLevel(logging.DEBUG) |
| | | |
| | | # 创建一个控制台处理器 |
| | |
| | | import math |
| | | import os |
| | | import random |
| | | import time |
| | | |
| | | from knowledgebase.markitdown import MarkItDown |
| | | |
| | | from doc_to_docx import doc_to_docx |
| | | |
| | | |
| | | def process_docs(directory): |
| | | # 遍历目录下的所有文件 |
| | | for filename in os.listdir(directory): |
| | | # 判断是否为 doc 文件 |
| | | if filename.endswith(".doc"): |
| | | # 转换为 docx |
| | | doc_to_docx(directory + filename, directory + filename.replace(".doc", ".docx")) |
| | | |
| | | |
| | | md = MarkItDown() |
| | | |
| | | |
| | | def to_markdown(dst_dir: str): |
| | | text = '' |
| | | # 遍历文件夹下的所有文件 |
| | | for file in os.listdir(dst_dir): |
| | | # 判断是否为 docx 文件 |
| | | if file.endswith(".docx"): |
| | | # 转换为 md |
| | | result = md.convert(dst_dir + file) |
| | | text = result.text_content |
| | | out_file = dst_dir + file + '.md' |
| | | with open(out_file, 'w', encoding='utf-8') as f: |
| | | f.write(text) |
| | | return out_file |
| | | |
| | | |
| | | # 1.解析文档 |
| | | # 2.输入文档 |
| | | # 3.启动LangFlow |
| | | def main(): |
| | | doc_dir = ".\\doc\\" |
| | | # 处理文档 |
| | | # process_docs(doc_dir) |
| | | # 文档转换为markdown |
| | | md_file = to_markdown(doc_dir) |
| | | |
| | | md_file = 'D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\test.md' |
| | | # 启动大模型处理流程 |
| | | # ret_text = LangFlow([md_file]).run() |
| | | # 保存结果 |
| | | # with open('D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\test.text', 'w', encoding='utf-8') as f: |
| | | # f.write(ret_text) |
| | | |
| | | |
| | | def get_bit_mask(start, end): |
| | | bits = math.ceil((end + 1) / 8) * 8 |
| | | if bits == 0: |
| | | bits = 8 |
| | | mask = 0 |
| | | for i in range(start, end + 1): |
| | | mask |= 1 << (bits - i - 1) |
| | | return mask |
| | | |
| | | |
| | | # if __name__ == '__main__': |
| | | # main() |
New file |
| | |
| | | # -*- coding: utf-8 -*- |
| | | # |
| | | # @author: |
| | | # @date: |
| | | # @version: |
| | | # @description: |
| | | from knowledgebase.db.doc_db_helper import doc_dbh |
| | | |
| | | |
| | | def test(): |
| | | text = doc_dbh.get_text_with_entities(['遥控包格式']) |
| | | print(text) |
New file |
| | |
| | | # -*- coding: utf-8 -*- |
| | | # |
| | | # @author: |
| | | # @date: |
| | | # @version: |
| | | # @description: |
| | | from knowledgebase.db.doc_db_helper import doc_dbh |
| | | from knowledgebase.doc.doc_processor import DocProcessor |
| | | |
| | | |
| | | def test_process(): |
| | | files = [ |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机1553B总线传输通信帧分配(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机分系统遥测源包设计报告(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机软件用户需求(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机遥测大纲(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机遥测信号分配表(公开).docx", |
| | | # r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机指令格式与编码定义(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\指令格式(公开).docx" |
| | | ] |
| | | for file in files: |
| | | doc_processor = DocProcessor(file) |
| | | doc_processor.process() |
| | | def test_get_text_by_entity(): |
| | | text = doc_dbh.get_text_with_entities(['分系统源包']) |
| | | print(text) |
| | | if __name__ == '__main__': |
| | | # test_process() |
| | | test_get_text_by_entity() |
New file |
| | |
| | | # -*- coding: utf-8 -*- |
| | | # |
| | | # @author: |
| | | # @date: |
| | | # @version: |
| | | # @description: |
| | | from knowledgebase.doc.docx_split import DocSplit |
| | | |
| | | |
| | | class TestDocxSplit: |
| | | def test_split(self): |
| | | docx_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机1553B总线传输通信帧分配(公开).docx' |
| | | # docx_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\table_test.docx' |
| | | doc_split = DocSplit(docx_file, "总线通信协议") |
| | | doc_split.split() |
| | | print("\n".join([x.full_text_with_children for x in doc_split.paragraphs])) |
| | |
| | | "遥测格式定义": "一般在“遥测格式”章节,内容包含”遥测帧“ ”遥测包“具体格式的定义", |
| | | "虚拟信道定义": "章节名包含“虚拟信道”,内容包含虚拟信道的划分,各源包在各虚拟信道下传分配", |
| | | "插入域": "章节名包含“插入域”,内容为一张表格,定义了插入域中的遥测参数", |
| | | "源包参数表": "章节名包含“源包设计”,内容为多个源包具体参数的表格,每个源包单独一张表格" |
| | | "源包参数表": "章节名包含“源包设计”,内容为多个源包具体参数的表格,每个源包单独一张表格", |
| | | "遥测源包下传时机": "章节名包含类似“遥测源包下传时机”的文本,内容为一个表格描述遥测源包下传时机" |
| | | } |
| | | }, |
| | | "源包设计": { |
| | | "prompts": "文件名通常包含“源包”关键字", |
| | | "entities": { |
| | | "源包参数表": "通常为叶子节点,章节名通常为 “xxx包”,内容为源包参数表格,定义了包头、数据域具体内容" |
| | | "源包参数表": "通常为叶子节点,章节名通常为 “xxx包”,内容为源包参数表格,定义了包头、数据域具体内容", |
| | | "源包列表": "章节名包含“遥测源包类型定义”的文本内容" |
| | | } |
| | | } |
| | | }, |