13个文件已修改
3个文件已添加
909 ■■■■ 已修改文件
.gitignore 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db_struct_flow.py 375 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/db/doc_db_helper.py 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/db/doc_db_models.py 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/doc_processor.py 84 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/docx_split.py 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/entity_helper.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/entity_recognition.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/models.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/gen_base_db/json_generate.py 206 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/log/__init__.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 67 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
testcases/test_doc_db_helper.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
testcases/test_doc_processor.py 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
testcases/test_docx_split.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tpl/entities.json 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -7,4 +7,5 @@
/out*
/packages
__pycache__
/static/
/static/
*.log
db_struct_flow.py
@@ -17,6 +17,8 @@
from knowledgebase.db.models import TProject
from knowledgebase.db.doc_db_helper import doc_dbh
# file_map = {
#     # "遥测源包设计报告": "./doc/HY-4A数管分系统遥测源包设计报告 Z 240824 更改3(内部) .docx.md",
#     # "遥测源包设计报告": "./doc/数管数字量快速源包.md",
@@ -105,13 +107,11 @@
10. 以 JSON 格式组织输出内容,确保数据结构的完整性和可读性,注意:生成的JSON语法格式必须符合json规范,避免出现错误。
    
## 限制:
- id和code的命名规则:英文字母、数字、下划线组成,且以英文字母或下划线开头。
- 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。
- 不输出任何注释等描述性信息。
"""
g_completion = None
def read_from_file(cache_file):
    with open(cache_file, 'r', encoding='utf-8') as f:
@@ -345,14 +345,22 @@
        self.gen_tc()
        return ''
    def get_text_with_entity(self, entity_names: list[str]) -> str:
        """
        根据实体词获取文档文本
        :param entity_names: str - 实体词名称
        :return: str - 文本内容
        """
        return doc_dbh.get_text_with_entities(entity_names)
    def _gen(self, msgs, msg, files=None):
        if files is None:
            files = [file_map['文档合并']]
    def _gen(self, msgs, msg, doc_text):
        # if files is None:
        #     files = [file_map['文档合并']]
        messages = [] if msgs is None else msgs
        doc_text = ''
        for file in files:
            doc_text += '\n' + read_from_file(file)
        # doc_text = ''
        # for file in files:
        #     doc_text += '\n' + read_from_file(file)
        if len(messages) == 0:
            # 如果是第一次提问加入system消息
            messages.append({'role': 'system', 'content': assistant_msg})
@@ -380,14 +388,14 @@
        g_completion = None
        return text
    def generate_text(self, msg, cache_file, msgs=None, files=None, validation=None, try_cnt=5, json_text=False):
    def generate_text(self, msg, cache_file, msgs=None, doc_text=None, validation=None, try_cnt=5, json_text=False):
        if msgs is None:
            msgs = []
        if USE_CACHE and os.path.isfile(cache_file):
            text = read_from_file(cache_file)
        else:
            s = time.time()
            text = self._gen(msgs, msg, files)
            text = self._gen(msgs, msg, doc_text)
            text = remove_think_tag(text)
            if json_text:
                text = get_json_text(text)
@@ -398,39 +406,27 @@
                    print(e)
                    if try_cnt <= 0:
                        raise RuntimeError('生成失败,重试次数太多,强制结束!')
                    return self.generate_text_json(msg, cache_file, msgs, files, validation, try_cnt - 1)
                    return self.generate_text_json(msg, cache_file, msgs, doc_text, validation, try_cnt - 1)
            if cache_file:
                save_to_file(text, cache_file)
            print(f'耗时:{time.time() - s}')
        return text
    def generate_text_json(self, msg, cache_file, msgs=None, files=None, validation=None, try_cnt=5):
        return self.generate_text(msg, cache_file, msgs, files, validation, try_cnt, True)
    def generate_text_json(self, msg, cache_file, msgs=None, doc_text=None, validation=None, try_cnt=5):
        return self.generate_text(msg, cache_file, msgs, doc_text, validation, try_cnt, True)
    def generate_tc_text(self, msg, cache_file, messages=None, files=None, validation=None, try_cnt=5):
        if messages is None:
            messages = []
        doc_text = ''
        for file in files:
            doc_text += '\n' + read_from_file(file)
        if len(messages) == 0:
            # 如果是第一次提问加入system消息
            messages.append({'role': 'user', 'content': "以下是文档内容:\n" + doc_text})
        return self.generate_text_json(msg, cache_file, messages, files, validation, try_cnt)
    def generate_tc_text(self, msg, cache_file, msgs=None, doc_text=None, validation=None, try_cnt=5):
        return self.generate_text(msg, cache_file, msgs, doc_text, validation, try_cnt, True)
    def gen_project(self):
        #         _msg = """
        # 根据文档输出型号信息,型号字段包括:名称和代号。仅输出型号这一级。
        # 例如:{"name":"xxx","id":"xxx"}
        # """
        #         print('型号信息:')
        #         text = self.generate_text(_msg, 'out/型号信息.json', files=[file_map['应用软件用户需求']])
        #         proj_dict = json.loads(text)
        # 工程信息从系统获取
        proj_dict = {
            "id": "JB200001",
            "name": "HY-4A"
        }
        _msg = """
            根据文档内容输出分系统信息,分系统字段包括:名称和型号代号。仅输出分系统这一级。如果型号代号中有符号也要输出,保证输出完整。
            例如:{"name":"xxx","id":"xxx"}
        """
        print('型号信息:')
        doc_text = self.get_text_with_entity(['系统概述'])
        text = self.generate_text_json(_msg, 'out/型号信息.json', doc_text=doc_text)
        proj_dict = json.loads(text)
        code = proj_dict['id']
        name = proj_dict['name']
        proj = create_project(code, name, code, name, "", datetime.now())
@@ -495,7 +491,8 @@
            assert isinstance(_devs, list), '数据结构最外层不是数组'
            assert next(filter(lambda it: it['name'].endswith('管理单元'), _devs), None), '生成的设备列表中没有管理单元'
        text = self.generate_text_json(_msg, cache_file, files=[file_map['应用软件用户需求']], validation=validation)
        doc_text = self.get_text_with_entity(['系统概述','总线管理'])
        text = self.generate_text_json(_msg, cache_file, doc_text=doc_text, validation=validation)
        devs = json.loads(text)
        # 类SMU设备,包含遥测和遥控功能,名称结尾为“管理单元”
@@ -550,9 +547,10 @@
参数信息字段包括:name(参数名称)、id(参数代号)、pos(参数起始bit位置)、length(参数bit长度)、type(类型:para)。
注意:
1个字节的长度为8位,使用B0-B7来表示,请精确计算参数长度。
文档中位置描述信息可能存在跨字节的情况,例如:"Byte1_B6~Byte2_B0":表示从第1个字节的第7位到第2个字节的第1位,长度是3;"Byte27_B7~Byte28_B0":表示从第27个字节的第8位到第28个字节的第1位,长度是2。
文档中位置描述信息可能存在跨字节的情况,例如:"Byte1_B6~Byte2_B0":表示从第1个字节的第7位到第2个字节的第1位,长度是3;"Byte27_B7~Byte28_B0":表示从第27个字节的第8位到第28个字节的第1位,长度是2;"Byte38~Byte74":表示从第38个字节到第74个字节,中间有37个字节,长度是298。
#约束
- 不要遗漏任何参数;
- 如果有代号严格依照文档中的代号,文档中的代号如果不符合代号命名规则将特殊字符转换为下划线,例如:Rsv-1转换为Rsv_1;
- 数据结构最外层为数组,数组元素为参数信息对象;
- 仅输出JSON文本。
#例子
@@ -567,14 +565,14 @@
]
"""
        print('插入域参数列表:')
        files = [file_map['遥测大纲']]
        def validation(gen_text):
            params = json.loads(gen_text)
            assert isinstance(params, list), '插入域参数列表数据结构最外层必须是数组'
            assert len(params), '插入域参数列表不能为空'
        text = self.generate_text_json(_msg, './out/插入域参数列表.json', files=files, validation=validation)
        doc_text = self.get_text_with_entity(['插入域'])
        text = self.generate_text_json(_msg, './out/插入域参数列表.json', doc_text=doc_text, validation=validation)
        return json.loads(text)
    def gen_tm_frame_data(self):
@@ -623,29 +621,31 @@
            _vc_pkts = filter(lambda it: it['vcs'].__contains__(vc['id']), self.vc_pkts)
            for _pkt in _vc_pkts:
                # 判断遥测包是否有详细定义
                if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], self.tm_pkts), None):
                    continue
                # if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], self.tm_pkts), None):
                #     continue
                # 获取包详情
                _pkt = self.gen_pkt_details(_pkt['name'], _pkt['id'])
                epdu = next(filter(lambda it: it['name'] == '数据域', vc['children']), None)
                if epdu and _pkt:
                    _pkt['children'] = _pkt['datas']
                    _last_par = _pkt['children'][len(_pkt['children']) - 1]
                    _pkt['length'] = (_last_par['pos'] + _last_par['length'])
                    _pkt['pos'] = 0
                    if 'children' not in epdu:
                        epdu['children'] = []
                    # 添加解析规则后缀防止重复
                    _pkt['id'] = _pkt['id'] + '_' + vc['VCID']
                    # 给包名加代号前缀
                    if not _pkt['name'].startswith(_pkt['id']):
                        _pkt['name'] = _pkt['id'] + '_' + _pkt['name']
                    epdu['children'].append(_pkt)
                    apid_node = next(filter(lambda it: it['name'].__contains__('应用过程'), _pkt['headers']), None)
                    ser_node = next(filter(lambda it: it['name'] == '服务', _pkt['headers']), None)
                    sub_ser_node = next(filter(lambda it: it['name'] == '子服务', _pkt['headers']), None)
                    _pkt['vals'] = \
                        f"{apid_node['content']}/{int(ser_node['content'], 16)}/{int(sub_ser_node['content'], 16)}/"
                    # todo 当数据包获取到东西但不是参数时,获取到的包结构有问题,需要过滤
                    if len(_pkt['children']) > 0:
                        _last_par = _pkt['children'][len(_pkt['children']) - 1]
                        _pkt['length'] = (_last_par['pos'] + _last_par['length'])
                        _pkt['pos'] = 0
                        if 'children' not in epdu:
                            epdu['children'] = []
                        # 添加解析规则后缀防止重复
                        _pkt['id'] = _pkt['id'] + '_' + vc['VCID']
                        # 给包名加代号前缀
                        if not _pkt['name'].startswith(_pkt['id']):
                            _pkt['name'] = _pkt['id'] + '_' + _pkt['name']
                        epdu['children'].append(_pkt)
                        apid_node = next(filter(lambda it: it['name'].__contains__('应用过程'), _pkt['headers']), None)
                        ser_node = next(filter(lambda it: it['name'] == '服务', _pkt['headers']), None)
                        sub_ser_node = next(filter(lambda it: it['name'] == '子服务', _pkt['headers']), None)
                        _pkt['vals'] = \
                            f"{apid_node['content']}/{int(ser_node['content'], 16)}/{int(sub_ser_node['content'], 16)}/"
        # 重新计数起始偏移
        self.compute_length_pos(cadu['children'])
@@ -718,7 +718,8 @@
            assert next(filter(lambda it: re.match('^[0-1]+$', it['VCID']), vcs)), '生成的VCID必须是二进制'
        print('虚拟信道:')
        text = self.generate_text_json(_msg, "out/虚拟信道.json", files=[file_map['遥测大纲']], validation=validation)
        doc_text = self.get_text_with_entity(['虚拟信道定义'])
        text = self.generate_text_json(_msg, "out/虚拟信道.json", doc_text=doc_text, validation=validation)
        vcs = json.loads(text)
        return vcs
@@ -743,131 +744,106 @@
        dev_pkts = json.loads(text)
        return dev_pkts
    def pkt_in_tm_pkts(self, pkt_name):
        cache_file = f'out/数据包-{pkt_name}.json'
        if os.path.isfile(cache_file):
            return True
        files = [file_map['遥测源包设计报告']]
        print(f'文档中有无“{pkt_name}”的字段描述:', end='')
        _msg = f"""
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中分析判读是否有某个遥测包的字段表描述,你要帮助我判断。
#问题
文档中有遥测包“{pkt_name}”的字段表描述吗?
注意:遥测包的字段表紧接着遥测包章节标题,如果章节标题后面省略了或者详见xxx则是没有字段表描述。
#约束
- 根据文档内容输出;
- 遥测包名称必须完全匹配;
- 输出“无”或“有”,不要输出其他任何内容。
#例子
"""
        text = self.generate_text(_msg, f'out/pkts/有无数据包-{pkt_name}.txt', [], files)
        return text == '有'
    def gen_pkt_details(self, pkt_name, pkt_id):
        cache_file = f'out/数据包-{pkt_name}.json'
        files = [file_map['遥测源包设计报告']]
        if not os.path.isfile(cache_file):
            # 先问最后一个参数的字节位置
            print(f'遥测源包“{pkt_name}”信息:')
            _msg = f"""
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中提取遥测源包的最后一个参数的bit位置和数据域参数个数,你要帮我完成参数bit位置和数据域参数个数的提取。
#需求
输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包的最后一个参数的bit位置和数据域参数个数。
""" + """
#约束
- 遥测源包的内容在一个表格中定义,表格结束则包内容结束;
- 数据域中每一行对应一个参数;
- 不要跨表格提取;
- 字节位置中字节位置是从1开始的,bit位置是从0开始的;
- bit位置计算公式为:(N-1)*8+B,其中N是字节数,B是bit数;
- 仅输出json,不要输出其他任何字符。
#例子:
{"last_par_pos":128, "par_num": 20}
"""
            text = self.generate_text_json(_msg, '', files=files)
            result = json.loads(text)
            last_par_pos = result['last_par_pos']
            par_num = result['par_num']
        # _msg = f"""
        #     #角色
        #     你是一名资深的软件工程师。
        #     #指令
        #     我需要从文档中提取遥测源包的最后一个参数的bit位置和数据域参数个数,你要帮我完成参数bit位置和数据域参数个数的提取。
        #     #需求
        #     输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包的最后一个参数的bit位置和数据域参数个数。
        #     """ + """
        #     #约束
        #     - 遥测源包的内容在一个表格中定义,表格结束则包内容结束;
        #     - 数据域中每一行对应一个参数;
        #     - 不要跨表格提取;
        #     - 字节位置中字节位置是从1开始的,bit位置是从0开始的;
        #     - bit位置计算公式为:(N-1)*8+B,其中N是字节数,B是bit数;
        #     - 仅输出json,不要输出其他任何字符。
        #     #例子:
        #     {"last_par_pos":128, "par_num": 20}
        #     """
        # text = self.generate_text_json(_msg, '', doc_text=doc_text)
        # result = json.loads(text)
        # last_par_pos = result['last_par_pos']
        # par_num = result['par_num']
            _msg = f"""
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中提取遥测源包信息列表,你要帮我完成遥测源包信息列表的提取。
#需求
输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包。
注意:最后一个参数的起始bit偏移位置为{last_par_pos}。
""" + """
遥测包字段包括:名称(name)、代号(id)、类型(type)、包头属性列表(headers)、数据域参数列表(datas),类型为 linear;
包头的属性的字段包括:名称(name)、代号(id)、位置(pos)、定义(content)、长度(length)、类型(type),类型为 para;
数据域参数字段包括:参数名称(name)、参数代号(id)、位置(pos)、长度(length)、字节顺序(byteOrder),类型为 para;
        _msg = f"""
            #角色
            你是一名资深的软件工程师。
            #指令
            我需要从文档中提取遥测源包信息列表,你要帮我完成遥测源包信息列表的提取。
            #需求
            输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包。
            """ + """
            遥测包字段包括:名称(name)、代号(id)、类型(type)、包头属性列表(headers)、数据域参数列表(datas),类型为 linear;
            包头的属性的字段包括:名称(name)、代号(id)、位置(pos)、定义(content)、长度(length)、类型(type),类型为 para;
            数据域参数字段包括:参数名称(name)、参数代号(id)、位置(pos)、长度(length)、字节顺序(byteOrder),类型为 para;
包头属性包括:包版本号、包类型、副导头标识、应用过程标识、序列标记、包序列计数、包长、服务、子服务。
包头属性的长度:包版本号(3)、包类型(1)、副导头标识(1)、应用过程标识(11)、序列标记(2)、包序列计数(14)、包长(16)、服务(8)、子服务(8)。
            包头属性包括:包版本号、包类型、副导头标识、应用过程标识、序列标记、包序列计数、包长、服务、子服务。
            包头属性的长度:包版本号(3)、包类型(1)、副导头标识(1)、应用过程标识(11)、序列标记(2)、包序列计数(14)、包长(16)、服务(8)、子服务(8)。
表格单元格合并说明:包格中存在单元格合并的情况,如果水平或垂直相邻的单元格内容一样那么这几个内容一样的单元格有可能是一个合并单元格在分析时应该当作合并单元格分析。
#约束
- 代号命名规则:数字、英文字母和下划线组成且以英文字母和下划线开头;
- 如果没有名称用代号代替,如果没有代号用名称的英文翻译代替,翻译尽量简短;
- 如果有代号严格依照文档中的代号,文档中的代号如果不符合代号命名规则将特殊字符转换为下划线,例如:Rsv-1转换为Rsv_1;
- 你需要理解数据包的位置信息,由位置信息得到长度,并且将所有输出单位统一转换为 bits;
- pos字段:数值类型,从0开始计算,由长度(length)累加得到;
- 应用过程标识:如果不是十六进制转换为十六进制,转换完成后要验证是否正确,以0x开头,;
- 包头后面的每一行都对应一个参数,逐行输出参数,不要遗漏任何参数;
- 类似”保留(Rsv)“的行也要当参数生成;
- 重复的行也要生成;
- 注意包内容的范围,不要提取到其他包中的内容,包内容都在同一个表格中;
- 字节顺序:值为大端“B”,小端“L”,默认为“B”;
- 输出严格按照文档中的内容生成,不要创造文档中不存在的内容;
- 仅输出json,不要输出任何其他内容。
#例子
{
  "name": "数管缓变遥测包",
  "id": "PMS003",
  "type": "linear",
  "headers": [
    {
      "name": "包标识",
      "id": "packetIdentifier",
      "pos": 0,
      "content": "000",
      "length": 8,
      "type": "para"
    }
  ],
  "datas": [
    {
      "name": "XXX包",
      "id": "XXX",
      "pos": 0,
      "length": 8,
      "byteOrder": ""
    }
  ]
"""
            print(f'遥测源包“{pkt_name}”信息:')
            表格单元格合并说明:包格中存在单元格合并的情况,如果水平或垂直相邻的单元格内容一样那么这几个内容一样的单元格有可能是一个合并单元格在分析时应该当作合并单元格分析。
            #约束
            - 代号命名规则:数字、英文字母和下划线组成且以英文字母和下划线开头;
            - 如果没有名称用代号代替,如果没有代号用名称的英文翻译代替,翻译尽量简短;
            - 如果有代号严格依照文档中的代号,文档中的代号如果不符合代号命名规则将特殊字符转换为下划线,例如:Rsv-1转换为Rsv_1;
            - 你需要理解数据包的位置信息,由位置信息得到长度,并且将所有输出单位统一转换为 bits;
            - pos字段:数值类型,从0开始计算,由长度(length)累加得到;
            - 应用过程标识:应用过程标识的定义如果不是十六进制转换为十六进制,转换完成后要验证是否正确,以0x开头;
            - 包头后面的每一行都对应一个参数,逐行输出参数,不要遗漏任何参数;
            - 类似”保留(Rsv)“的行也要当参数生成;
            - 重复的行也要生成;
            - 注意包内容的范围,不要提取到其他包中的内容,包内容都在同一个表格中;
            - 字节顺序:值为大端“B”,小端“L”,默认为“B”;
            - 输出严格按照文档中的内容生成,不要创造文档中不存在的内容;
            - 仅输出json,不要输出任何其他内容。
            #例子
            {
            "name": "数管缓变遥测包",
            "id": "PMS003",
            "type": "linear",
            "headers": [
                {
                "name": "包标识",
                "id": "packetIdentifier",
                "pos": 0,
                "content": "000",
                "length": 8,
                "type": "para"
                }
            ],
            "datas": [
                {
                "name": "XXX包",
                "id": "XXX",
                "pos": 0,
                "length": 8,
                "byteOrder": ""
                }
            ]
            """
            def validation(gen_text):
                _pkt = json.loads(gen_text)
                with open(f'out/tmp/{time.time()}.json', 'w') as f:
                    f.write(gen_text)
                assert 'headers' in _pkt, '包结构中必须包含headers字段'
                assert 'datas' in _pkt, '包结构中必须包含datas字段'
                print(f'参数个数:{len(_pkt["datas"])}')
                # assert par_num == len(_pkt['datas']), f'数据域参数个数不对!预计{par_num}个,实际{len(_pkt["datas"])}'
                assert last_par_pos == _pkt['datas'][-1]['pos'], '最后一个参数的字节位置不对!'
        print(f'遥测源包“{pkt_name}”信息:')
            text = self.generate_text_json(_msg, cache_file, [], files, validation)
            pkt = json.loads(text)
        else:
            pkt = json.loads(read_from_file(cache_file))
        def validation(gen_text):
            _pkt = json.loads(gen_text)
            with open(f'out/tmp/{time.time()}.json', 'w') as f:
                f.write(gen_text)
            assert 'headers' in _pkt, '包结构中必须包含headers字段'
            assert 'datas' in _pkt, '包结构中必须包含datas字段'
            # assert par_num == len(_pkt['datas']), f'数据域参数个数不对!预计{par_num}个,实际{len(_pkt["datas"])}'
            # assert last_par_pos == _pkt['datas'][-1]['pos'], '最后一个参数的字节位置不对!'
        doc_text = self.get_text_with_entity([pkt_id])
        if doc_text == '':
            return None
        text = self.generate_text_json(_msg, cache_file, [], doc_text, validation)
        pkt = json.loads(text)
        pkt_len = 0
        for par in pkt['datas']:
            par['pos'] = pkt_len
@@ -882,14 +858,12 @@
#指令
我需要从文档中提取遥测包数据,你要根据文档内容帮我完成遥测包数据的提取。
#需求
输出文档中描述的遥测包列表,遥测包字段包括:名称(name)、代号(id)、是否有参数(hasParams)。
输出文档中描述的遥测包列表,遥测包字段包括:名称(name)、代号(id)。
字段描述:
1.名称:遥测包的名称;
2.代号:遥测包的代号;
3.是否有参数:表示当前遥测包是否有参数列表,遥测包的参数表紧接着遥测包章节标题,如果章节标题后面省略了或者类似”详见xxx“则是没有参数表。
#约束
- name:名称中不要包含代号,仅从文档中提取源包名称;
- hasParams:值为布尔值,true或false;
- 如果没有代号,使用遥测包名称的英文翻译代替;
- 如果没有名称用代号代替;
- 不要漏掉任何遥测包;
@@ -899,13 +873,12 @@
    {
        "name": "数管数字量快速源包",
        "id": "PMS001",
        "hasParams": true
    }
]
"""
        print(f'遥测源包列表:')
        files = [file_map['遥测源包设计报告']]
        text = self.generate_text_json(_msg, 'out/源包列表.json', [], files)
        doc_text = self.get_text_with_entity(['源包列表'])
        text = self.generate_text_json(_msg, 'out/源包列表.json', doc_text=doc_text)
        pkt = json.loads(text)
        return pkt
@@ -914,11 +887,10 @@
#角色
你是一名资深软件工程师。
#指令
我需要从文档中提取遥测源包信息,你要帮助我完成遥测源包信息的提取。
我需要从文档中提取所有遥测源包信息,你要帮助我完成遥测源包信息的提取。
#需求
根据”遥测源包下传时机定义“章节的内容输出各个遥测源包信息列表,顶级结构为数组元素为遥测源包,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs),下传时机(timeTags)。
根据文档内容输出遥测源包信息,顶级结构为数组,元素为遥测源包,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs),下传时机(timeTags)。
#约束
- 从”遥测源包下传时机定义“章节中提取遥测源包信息;
- 所属虚拟信道:必须是文档中描述的遥测虚拟信道代号(序号);
- 下传时机:与表格中定义的一致;
- 不要遗漏任何遥测源包。
@@ -932,14 +904,13 @@
  },
]
        """
        files = [file_map['遥测大纲']]
        print('遥测源包所属虚拟信道:')
        def validation(gen_text):
            pkts = json.loads(gen_text)
            assert len(pkts), 'VC源包列表不能为空'
        text = self.generate_text_json(_msg, 'out/遥测VC源包.json', files=files, validation=validation)
        doc_text = self.get_text_with_entity(['虚拟信道定义','遥测源包下传时机'])
        text = self.generate_text_json(_msg, 'out/遥测VC源包.json', doc_text=doc_text, validation=validation)
        pkt_vcs = json.loads(text)
        return pkt_vcs
@@ -1044,7 +1015,10 @@
        def validation(gen_text):
            json.loads(gen_text)
        text = self.generate_text_json(_msg, 'out/总线.json', files=[file_map['总线传输通信帧分配']],
        doc_text = self.get_text_with_entity(['RT地址分配','分系统源包'])
        text = self.generate_text_json(_msg, 'out/总线.json', doc_text=doc_text,
                                       validation=validation)
        pkts = json.loads(text)
        # 筛选经总线的数据包
@@ -1053,11 +1027,7 @@
        # 筛选有apid的数据包
        pkts = list(filter(lambda it: it['apid'], pkts))
        pkts2 = []
        for pkt in pkts:
            if self.pkt_in_tm_pkts(pkt["name"]):
                pkts2.append(pkt)
        for pkt in pkts2:
            _pkt = self.gen_pkt_details(pkt['name'], pkt['id'])
            if _pkt:
                pkt['children'] = []
@@ -1291,7 +1261,8 @@
        def validation(gen_text):
            json.loads(gen_text)
        text = self.generate_tc_text(_msg, 'out/tc_transfer_frame.json', files=[file_map['指令格式']],
        doc_text = self.get_text_with_entity(['遥控帧格式'])
        text = self.generate_tc_text(_msg, 'out/tc_transfer_frame.json', doc_text=doc_text,
                                     validation=validation)
        result: dict = json.loads(text)
        format_text = utils.read_from_file('tpl/tc_transfer_frame.json')
@@ -1332,7 +1303,8 @@
        def validation(gen_text):
            json.loads(gen_text)
        text = self.generate_tc_text(_msg, 'out/tc_transfer_pkt.json', files=[file_map['指令格式']],
        doc_text = self.get_text_with_entity(['遥控包格式'])
        text = self.generate_tc_text(_msg, 'out/tc_transfer_pkt.json', doc_text=doc_text,
                                     validation=validation)
        result = json.loads(text)
@@ -1346,7 +1318,10 @@
# 角色
你是一名资深的软件工程师。
# 指令
分析文档列出所有的遥控源包。
分析文档列出所有的遥控指令。
#  约束
- 应用过程标识:应用过程标识就是APID,一般会在名称后的括号中列出来
- code是指令代号,没有就空着
# 输出例子:
[{
"name": "xxx",
@@ -1360,7 +1335,8 @@
        def validation(gen_text):
            json.loads(gen_text)
        text = self.generate_tc_text(_msg, 'out/tc_transfer_pkts.json', files=[file_map['指令格式']],
        doc_text = self.get_text_with_entity(['APID分配'])
        text = self.generate_tc_text(_msg, 'out/tc_transfer_pkts.json', doc_text=doc_text,
                                     validation=validation)
        pkts = json.loads(text)
        return pkts
@@ -1415,8 +1391,11 @@
        def validation(gen_text):
            json.loads(gen_text)
        doc_text = self.get_text_with_entity([tc_name])
        if doc_text == '':
            return None
        text = self.generate_tc_text(_msg, f'out/遥控指令数据域-{tc_code}-{utils.to_file_name(tc_name)}.json',
                                     files=[file_map['指令格式']],
                                     doc_text=doc_text,
                                     validation=validation)
        result = json.loads(text)
        pkt['children'] = result
knowledgebase/db/doc_db_helper.py
@@ -82,6 +82,12 @@
        self.session.commit()
        return paragraph_entity_link.id
    def get_entity(self, entity):
        ret = self.session.query(TEntity).where(
            TEntity.name == entity.name and TEntity.type == entity.type and TEntity.doc_type == entity.doc_type).first()
        if ret:
            return ret
    def add_entity(self, entity):
        """
        添加实体
@@ -106,11 +112,11 @@
    def get_docs(self) -> list[TDoc]:
        return self.session.query(TDoc).all()
    def get_text_with_entities(self, entity_names: list[str]) -> str:
    def get_texts_with_entities(self, entity_names: list[str]):
        """
        根据实体词获取文本内容
        根据实体词获取文本内容列表
        :param entity_names: list[str] - 实体词
        :return: str - 文本
        :return: list[str] - 文本列表
        """
        if not entity_names:
            return ""
@@ -118,26 +124,30 @@
        _entitie_ids = [entity.id for entity in _entities]
        links = self.session.query(TParagraphEntityLink).where(TParagraphEntityLink.entity_id.in_(_entitie_ids)).all()
        _paragraphs = [link.paragraph for link in links]
        return [self.get_paragraph_full_text(p) for p in _paragraphs]
    def get_text_with_entities(self, entity_names: list[str]) -> str:
        """
        根据实体词获取文本内容
        :param entity_names: list[str] - 实体词
        :return: str - 文本
        """
        texts = self.get_texts_with_entities(entity_names)
        return '\n'.join(texts)
        return '\n'.join([self.get_paragraph_full_text(p) for p in _paragraphs])
    def get_entities_by_names(self, names: list[str]):
        _entities = self.session.query(TEntity).where(TEntity.name.in_(names)).all()
        return _entities
    def get_paragraph_full_text(self, p: TParagraph):
        result = p.text if p.title_level == 0 else p.title_num + ' ' + p.text
        return result + '\n' + '\n'.join([self.get_paragraph_full_text(p) for p in p.children])
    def get_entities_by_doc_type(self, doc_type):
        _entities = self.session.query(TEntity).where(TEntity.doc_type == doc_type).all()
        return _entities
    def commit(self):
        self.session.commit()
doc_dbh = DocDbHelper()
# if __name__ == '__main__':
#     text = doc_dbh.get_text_with_entities(['遥控包格式'])
#     print(text)
#     doc_db = DocDbHelper()
#     # doc_db.insert_entities()
#     doc = doc_db.add_doc(DocInfo(file='aaa', file_name='test'))
#     p1 = doc_db.add_paragraph(doc.id, None, ParagraphInfo(text='test1', title_level=1, num=1, num_level=1))
#     p2 = doc_db.add_paragraph(doc.id, p1.id, ParagraphInfo(text='test2', title_level=2, num=1, num_level=2))
#     p3 = doc_db.add_paragraph(doc.id, p2.id, ParagraphInfo(text='test3', title_level=3, num=1, num_level=3))
#     doc_db.add_paragraph_ref_link(TParagraphRefLink(parent_id=p1.id, child_id=p3.id))
knowledgebase/db/doc_db_models.py
@@ -120,6 +120,25 @@
    is_del = Column(Integer)
# class TTmPacket(Base):
#     __tablename__ = 't_tm_packets'
#     id = Column(Integer, primary_key=True)
#     name = Column(Text)
#     code = Column(Text)
#     apid = Column(Integer)
#     is_del = Column(Integer)
#
#
# class TTmPacketParagraphLink(Base):
#     __tablename__ = 't_tm_packet_paragraph_links'
#     id = Column(Integer, primary_key=True)
#     tm_packet_id = Column(Integer, ForeignKey('t_tm_packets.id'))
#     paragraph_id = Column(Integer, ForeignKey('t_paragraphs.id'))
#     tm_packet = relationship("TTmPacket", foreign_keys=[tm_packet_id], uselist=False)
#     paragraph = relationship("TParagraph", foreign_keys=[paragraph_id], uselist=False)
#     is_del = Column(Integer)
def init_doc_db():
    """
    初始化文档数据库
@@ -128,6 +147,7 @@
    # mysql
    Log.info("连接并初始化文档数据库...")
    engine = create_engine('mysql+pymysql://root:123456@192.168.3.145:3306/knowledgebase', echo=False)
    # engine = create_engine('sqlite:///doc_db.db', echo=False)
    Base.metadata.create_all(engine)
    SessionFactory = sessionmaker(bind=engine)
    Session = scoped_session(SessionFactory)
knowledgebase/doc/doc_processor.py
@@ -5,7 +5,10 @@
# @version: 
# @description: 处理文档,拆分文档,将拆分后的章节保存到数据库中。
from langchain_core.messages import HumanMessage
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate
from knowledgebase.db.doc_db_models import TEntity
from knowledgebase.doc.docx_split import DocSplit
import asyncio
from knowledgebase.db.doc_db_helper import doc_dbh
@@ -13,7 +16,7 @@
from knowledgebase.doc.entity_recognition import EntityRecognition
import os.path
from knowledgebase.doc.models import DocInfo, ParagraphInfo
from knowledgebase.doc.models import DocInfo, ParagraphInfo, DocType
from knowledgebase.llm import llm
from knowledgebase.log import Log
from knowledgebase import utils
@@ -27,8 +30,8 @@
        """
        Log.info(f'开始处理文档:{docx_file}')
        self.docx_file = docx_file
        self.doc_split = DocSplit(docx_file)
        self.doc_type = self.get_doc_type()
        self.doc_split = DocSplit(docx_file, self.doc_type)
        self.entity_recognition = EntityRecognition(self.doc_type)
        self.doc_id = 0
@@ -48,14 +51,58 @@
        Log.info(f'识别结果:{resp.content}')
        return resp.content
    async def gen_sect_entities(self, paragraph: ParagraphInfo):
        # Log.info(f'生成章节实体词:{paragraph.full_text}')
    def get_tm_pkt_info(self, paragraph: ParagraphInfo):
        if self.doc_type not in [DocType.tm_outline, DocType.tm_pkt_design]:
            return ''
        prompt = HumanMessagePromptTemplate.from_template('''
# 指令
识别遥测包信息,请从下面的文本中识别遥测包信息,如果识别失败不要输出任何字符。
识别规则:章节标题中包含包名称和代号,章节内容为表格,表格中包括包头定义和包参数定义。
提取的遥测包信息包括:包名称,包代号,APID。
# 约束
- 如果文本内容是目录则不要输出任何字符;
- 文本描述的内容是单个遥测包,如果有多个遥测包则不要输出任何字符;
- 文本结构通常是:包名称、代号和APID在开头,后面紧接着是包头和参数定义表;
- 如果没有识别到遥测包信息不要输出任何字符;
- 识别失败,不要输出任何内容,包括解释性文本;
- 输出json格式。
# 复合要求的文本结构
1.1.1 code xxx包(APID=0x123)
```json
表格内容
```
# 示例 - 识别到数据包
{{
    "name": "xxx包",
    "code": "xxx",
    "apid": 123
}}
# 示例 - 未识别到数据包
""
# 文本内容:
{text}
''')
        chain = prompt.prompt | llm | JsonOutputParser()
        resp = chain.invoke({"text": paragraph.full_text})
        return resp
    async def gen_chapter_entities(self, paragraph: ParagraphInfo):
        # 获取章节实体词
        entities = await asyncio.to_thread(lambda: self.entity_recognition.run(paragraph.full_text))
        Log.info(f'章节实体词:{entities}')
        if entities:
            paragraph.entities = [next(filter(lambda x: x.name == e, entity_helper.entities), None) for e in entities]
            paragraph.entities = [e for e in paragraph.entities if e]
        entity_names = await asyncio.to_thread(lambda: self.entity_recognition.run(paragraph.full_text))
        Log.info(f'章节{paragraph.title_num}实体词:{entity_names}')
        if entity_names:
            paragraph.entities = doc_dbh.get_entities_by_names(entity_names)
        # 获取遥测包信息
        pkt = self.get_tm_pkt_info(paragraph)
        if pkt:
            entity = TEntity(name=pkt['code'], type='遥测包配置', prompts='', doc_type='')
            e = doc_dbh.get_entity(entity)
            if e:
                entity.id = e.id
                return e
            doc_dbh.add_entity(entity)
            Log.info(f"新增Entity:{entity.name},id:{entity.id}")
            paragraph.entities.append(entity)
    def process(self):
        self.doc_split.split()
@@ -65,7 +112,7 @@
            batch_paragraphs = self.doc_split.paragraphs[i:i + batch_size]
            tasks = []
            for paragraph in batch_paragraphs:
                tasks.append(self.gen_sect_entities(paragraph))
                tasks.append(self.gen_chapter_entities(paragraph))
            async def run():
                await asyncio.gather(*tasks)
@@ -87,20 +134,3 @@
        for paragraph in doc.paragraphs:
            doc_dbh.add_paragraph(self.doc_id, None, paragraph)
        Log.info('保存段落和段落实体词关系到数据库完成')
if __name__ == '__main__':
    files = [
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机1553B总线传输通信帧分配(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机分系统遥测源包设计报告(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机软件用户需求(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机遥测大纲(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机遥测信号分配表(公开).docx",
        # r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机指令格式与编码定义(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\指令格式(公开).docx"
    ]
    for file in files:
        doc_processor = DocProcessor(file)
        doc_processor.process()
    # doc_dbh.get_docs()
knowledgebase/doc/docx_split.py
@@ -27,12 +27,14 @@
    """
    def __init__(self, docx_file: str):
    def __init__(self, docx_file: str, docx_type: str):
        """
        docx文档拆分
        :param docx_file: 要拆分的docx文件路径
        :param docx_type: 文档类型
        """
        self.docx_file = docx_file
        self.docx_type = docx_type
        self.image_to_text = ImageToText()
        self.paragraphs: list[ParagraphInfo] = []
        self.paragraph_tree: list[ParagraphInfo] = []
@@ -291,21 +293,3 @@
        # 替换原始列表内容,避免多次 remove 操作
        self.paragraphs[:] = _paragraphs
        self.paragraph_tree = result
if __name__ == '__main__':
    docx_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机1553B总线传输通信帧分配(公开).docx'
    # docx_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\table_test.docx'
    doc_split = DocSplit(docx_file)
    doc_split.split()
    # er = EntityRecognition()
    # db = Neo4jHelper()
    # for trunk in doc_split.trunks:
    #     print('段落文本:')
    #     print(trunk)
    #     print('实体词:')
    #     print(er.run(trunk))
    # entities = er.run(trunk)
    # db.create_page_node()
    print("\n".join([x.full_text_with_children for x in doc_split.paragraphs]))
    print()
knowledgebase/doc/entity_helper.py
@@ -15,15 +15,11 @@
class EntityHelper:
    # 文档类型和识别提示词map
    doc_prompt_map: dict
    # 所有实体
    entities: list[TEntity]
    def __init__(self):
        Log.info("初始化EntityHelper")
        current_dir = os.path.dirname(__file__)
        self.entities = doc_dbh.get_all_entities()
        self.doc_prompt_map = {}
        entity_names = [entity.name for entity in self.entities]
        with open(f'{current_dir}/../../tpl/entities.json', 'r', encoding='utf-8') as f:
            text = f.read()
            obj = json.loads(text)
@@ -33,13 +29,11 @@
                    prompts = obj2[doc_ty]['prompts']
                    self.doc_prompt_map[doc_ty] = prompts
                    for entity in obj2[doc_ty]['entities']:
                        if entity in entity_names:
                            continue
                        _entity = TEntity(name=entity, type=ty, doc_type=doc_ty,
                                          prompts=obj2[doc_ty]['entities'][entity])
                        if doc_dbh.get_entity(_entity):
                            continue
                        doc_dbh.add_entity(_entity)
                        self.entities.append(_entity)
                        Log.info(f"新增Entity:{entity},id:{_entity.id}")
entity_helper = EntityHelper()
knowledgebase/doc/entity_recognition.py
@@ -11,7 +11,7 @@
import json
from knowledgebase import utils
from knowledgebase.doc.entity_helper import entity_helper
from knowledgebase.db.doc_db_helper import doc_dbh
from knowledgebase.log import Log
llm = ChatOpenAI(temperature=0,
@@ -31,7 +31,7 @@
    def __init__(self, doc_type: str):
        # 实体词列表
        entities = list(filter(lambda x: x.doc_type == doc_type, entity_helper.entities))
        entities = doc_dbh.get_entities_by_doc_type(doc_type)
        entity_list = ','.join([entity.name for entity in entities]) + "。"
        entity_rules = ";\n".join([f"- {entity.name}:{entity.prompts}" for entity in entities]) + "。"
        tpl = """
knowledgebase/doc/models.py
@@ -6,6 +6,7 @@
# @description: 文档相关数据类
from dataclasses import dataclass
import typing
from enum import Enum
from knowledgebase.db.doc_db_models import TEntity
@@ -115,3 +116,15 @@
        self.file = file
        self.file_type = file_type
        self.paragraphs: typing.List[ParagraphInfo] = paragraphs
class _DocType:
    tm_outline = '遥测大纲'
    user_requirements = '用户需求'
    tm_pkt_design = '源包设计'
    bus_comm_proto = '总线通信协议'
    tc_format = '指令格式'
    tc_cmd_table = '遥控指令表'
DocType = _DocType()
knowledgebase/gen_base_db/json_generate.py
@@ -134,6 +134,15 @@
        """
        return doc_dbh.get_text_with_entities(entity_names)
    @staticmethod
    def get_texts_with_entity(entity_names: list[str]) -> list[str]:
        """
        根据实体词获取文档文本
        :param entity_names: str - 实体词名称
        :return: str - 文本内容
        """
        return doc_dbh.get_texts_with_entities(entity_names)
    def run(self):
        # 根据文档,生成结构化数据
        self.handle_tm_structured_data()
@@ -349,6 +358,7 @@
        def validation(gen_text):
            vcs = json.loads(gen_text)
            assert next(filter(lambda it: re.match('^[0-1]+$', it['VCID']), vcs)), '生成的VCID必须是二进制'
        doc_text = self.get_text_with_entity(['虚拟信道定义'])
        result = self.call_model(_msg, 'out/' + dev.code + '_虚拟信道.json', doc_text, validation)
        Log.info('虚拟信道:' + result)
@@ -380,7 +390,8 @@
            pkts = json.loads(gen_text)
            assert len(pkts), 'VC源包列表不能为空'
        text = self.call_model(_msg, 'out/' + dev.code + '_遥测源包下传时机.json', ['遥测源包下传时机'], validation)
        doc_text = self.get_text_with_entity(['遥测源包下传时机'])
        text = self.call_model(_msg, 'out/' + dev.code + '_遥测源包下传时机.json', doc_text, validation)
        Log.info('遥测源包所属虚拟信道:' + text)
        return json.loads(text)
@@ -410,7 +421,8 @@
                }
            ]
        """
        result = self.call_model(_msg, 'out/' + dev.code + '_源包列表.json', ['这里是文档中抽取的内容'])
        doc_text = self.get_text_with_entity(['源包列表'])
        result = self.call_model(_msg, 'out/' + dev.code + '_源包列表.json', doc_text)
        Log.info('遥测源包列表:' + result)
        return json.loads(result)
@@ -434,7 +446,8 @@
                # 例子:
                {"last_par_pos":128, "par_num": 20}
            """
            text = self.call_model(_msg, '', ['这里是文档中抽取的内容'])
            doc_text = self.get_text_with_entity([pkt_id])
            text = self.call_model(_msg, '', doc_text)
            result = json.loads(text)
            last_par_pos = result['last_par_pos']
            par_num = result['par_num']
@@ -494,7 +507,7 @@
                ]
            """
            def validation(gen_text):
            def _validation(gen_text):
                _pkt = json.loads(gen_text)
                with open(f'out/tmp/{time.time()}.json', 'w') as f:
                    f.write(gen_text)
@@ -504,7 +517,7 @@
                # assert par_num == len(_pkt['datas']), f'数据域参数个数不对!预计{par_num}个,实际{len(_pkt["datas"])}'
                assert last_par_pos == _pkt['datas'][-1]['pos'], '最后一个参数的字节位置不对!'
            result = self.call_model(_msg, f'out/数据包-{pkt_name}.json', [], ['这里是文档中抽取的内容'], validation)
            result = self.call_model(_msg, f'out/数据包-{pkt_name}.json', doc_text, _validation)
            Log.info(f'数据包“{pkt_name}”信息:' + result)
            pkt = json.loads(result)
        else:
@@ -517,87 +530,72 @@
        return pkt
    def gen_bus(self):
        _msg = """
            # 指令
            我需要从文档中提取经总线的数据包列表,你要帮助我完成经总线的数据包列表的提取。
            # 需求
            请析文档,列出总线通信包传输约定中描述的所有数据包列表,
            数据包字段包括:id(数据包代号)、name(数据包名称)、apid(16进制字符串)、service(服务子服务)、length(bit长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、
            transSer(传输服务)、note(备注)、rtAddr(所属RT的地址十进制)、rt(所属rt名称)、throughBus(是否经过总线)、burst(是否突发)、transDirect(传输方向)。
            # 约束
            - frameNum:使用文档中的文本不要做任何转换;
            - subAddr:值为“深度”、“平铺”、“数字”或null;
            - 是否经过总线的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线;
            - 传输服务分三种:SetData(置数)、GetData(取数)、DataBlock(数据块传输);
            - 传输方向分”收“和”发“,传输服务如果是”取数“是”收“,如果是”数据块传输“则根据包所在的分系统以及表格的”传输方向“列进行判断,判断对于SMU来说是收还是发;
            - 是否突发:根据表格中的”传输周期“列进行判断,如果填写了类似”突发“的文字表示是突发否则表示不是突发;
            - 不要漏掉任何一个数据包;
            - 数据结构最外层是数组,数组元素为数据包,以JSON格式输出,不要输出JSON以外的任何文本。
            # 例子
            [
                {
                    "id": "PCS005",
                    "name": "总线管理(内部指令)",
                    "apid": "418",
                    "service": "(1, 2)",
                    "length": 1,
                    "interval": 1000,
                    "subAddr": null,
                    "frameNum": "1|2",
                    "transSer": "DataBlock",
                    "note": "",
                    "rtAddr": 28,
                    "rt": "数据接口单元XIU",
                    "throughBus": true,
                    "burst": true,
                    "transDirect": "发"
                }
            ]
        """
        self.bus_pkts = []
        doc_text_list = self.get_texts_with_entity(['分系统源包'])
        for doc_text in doc_text_list:
            _msg = """
                # 指令
                我需要从文档中提取经总线的数据包列表,你要帮助我完成经总线的数据包列表的提取。
                # 需求
                请析文档,列出总线通信包传输约定中描述的所有数据包列表,
                数据包字段包括:id(数据包代号)、name(数据包名称)、apid(16进制字符串)、service(服务子服务)、length(bit长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、
                transSer(传输服务)、note(备注)、rtAddr(所属RT的地址十进制)、rt(所属rt名称)、throughBus(是否经过总线)、burst(是否突发)、transDirect(传输方向)。
                # 约束
                - frameNum:使用文档中的文本不要做任何转换;
                - subAddr:值为“深度”、“平铺”、“数字”或null;
                - 是否经过总线的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线;
                - 传输服务分三种:SetData(置数)、GetData(取数)、DataBlock(数据块传输);
                - 传输方向分”收“和”发“,传输服务如果是”取数“是”收“,如果是”数据块传输“则根据包所在的分系统以及表格的”传输方向“列进行判断,判断对于SMU来说是收还是发;
                - 是否突发:根据表格中的”传输周期“列进行判断,如果填写了类似”突发“的文字表示是突发否则表示不是突发;
                - 不要漏掉任何一个数据包;
                - 数据结构最外层是数组,数组元素为数据包,以JSON格式输出,不要输出JSON以外的任何文本。
                # 例子
                [
                    {
                        "id": "PCS005",
                        "name": "总线管理(内部指令)",
                        "apid": "418",
                        "service": "(1, 2)",
                        "length": 1,
                        "interval": 1000,
                        "subAddr": null,
                        "frameNum": "1|2",
                        "transSer": "DataBlock",
                        "note": "",
                        "rtAddr": 28,
                        "rt": "数据接口单元XIU",
                        "throughBus": true,
                        "burst": true,
                        "transDirect": "发"
                    }
                ]
            """
        def validation(gen_text):
            json.loads(gen_text)
            def validation(gen_text):
                json.loads(gen_text)
        result = self.call_model(_msg, 'out/总线.json', ['这里是文档中抽取的内容'], validation)
        Log.info('总线数据包:' + result)
            result = self.call_model(_msg, 'out/总线.json', doc_text, validation)
            Log.info('总线数据包:' + result)
        pkts = json.loads(result)
        # 筛选经总线的数据包
        pkts = list(filter(lambda it: it['throughBus'], pkts))
        # 筛选有apid的数据包
        pkts = list(filter(lambda it: it['apid'], pkts))
            pkts = json.loads(result)
            # 筛选经总线的数据包
            pkts = list(filter(lambda it: it['throughBus'], pkts))
            # 筛选有apid的数据包
            pkts = list(filter(lambda it: it['apid'], pkts))
        pkts2 = []
        # todo 这一步应该通过数据库筛选,数据库中已经有所有遥测包以及遥测包对应的定义段落文本
        for pkt in pkts:
            if self.pkt_in_tm_pkts(pkt["name"]):
                pkts2.append(pkt)
        for pkt in pkts2:
            self.gen_pkt_details(pkt['name'], pkt['id'])
            _pkt = self.gen_pkt_details(pkt['name'], pkt['id'])
            if _pkt:
                pkt['children'] = []
                pkt['children'].extend(_pkt['datas'])
                pkt['length'] = _pkt['length']
        self.bus_pkts = pkts
    def pkt_in_tm_pkts(self, pkt_name):
        _msg = f"""
            # 指令
            我需要从文档中分析判读是否有某个遥测包的字段表描述,你要帮助我判断。
            # 问题
            文档中有遥测包“{pkt_name}”的字段表描述吗?
            注意:遥测包的字段表紧接着遥测包章节标题,如果章节标题后面省略了或者详见xxx则是没有字段表描述。
            # 约束
            - 根据文档内容输出;
            - 遥测包名称必须完全匹配;
            - 输出“无”或“有”,不要输出其他任何内容。
            # 例子
            有
        """
        text = self.call_model(_msg, f'out/pkts/有无数据包-{pkt_name}.txt', ['这里是文档中抽取的内容'])
        Log.info(f'文档中有无“{pkt_name}”的字段描述:' + text)
        return text == '有'
            # pkts2 = []
            # todo 这一步应该通过数据库筛选,数据库中存储了每个数据包的代号实体
            # for pkt in pkts:
            #     if self.pkt_in_tm_pkts(pkt["name"]):
            #         pkts2.append(pkt)
            for pkt in pkts:
                self.gen_pkt_details(pkt['name'], pkt['id'])
                _pkt = self.gen_pkt_details(pkt['name'], pkt['id'])
                if _pkt:
                    pkt['children'] = []
                    pkt['children'].extend(_pkt['datas'])
                    pkt['length'] = _pkt['length']
            self.bus_pkts.extend(pkts)
    # endregion 遥测-end
@@ -642,7 +640,8 @@
        def validation(gen_text):
            json.loads(gen_text)
        text = self.call_model(_msg, 'out/tc_transfer_frame.json', ['这里是文档中抽取的内容'], validation)
        doc_text = self.get_text_with_entity(['遥控帧格式'])
        text = self.call_model(_msg, 'out/tc_transfer_frame.json', doc_text, validation)
        result: dict = json.loads(text)
        format_text = utils.read_from_file('tpl/tc_transfer_frame.json')
        format_text = utils.replace_tpl_paras(format_text, result)
@@ -681,7 +680,8 @@
        def validation(gen_text):
            json.loads(gen_text)
        text = self.call_model(_msg, 'out/tc_transfer_pkt.json', ['这里是文档中抽取的内容'], validation)
        doc_text = self.get_text_with_entity(['遥控包格式'])
        text = self.call_model(_msg, 'out/tc_transfer_pkt.json', doc_text, validation)
        result = json.loads(text)
        format_text = utils.read_from_file('tpl/tc_pkt_format.json')
@@ -691,25 +691,29 @@
        return pkt_format
    def gen_tc_transfer_pkts(self):
        _msg = '''
            # 指令
            分析文档列出所有的遥控源包。
            # 输出例子:
            [{
            "name": "xxx",
            "code":"pkt",
            "应用过程标识符":"0xAA",
            "服务类型":"0x1",
            "服务子类型":"0x2"
            }]
        '''
        doc_text_list = self.get_texts_with_entity(['APID分配'])
        pkts = []
        for doc_text in doc_text_list:
            _msg = '''
                # 指令
                分析文档列出所有的遥控源包。
                # 输出例子:
                [{
                "name": "xxx",
                "code":"pkt",
                "应用过程标识符":"0xAA",
                "服务类型":"0x1",
                "服务子类型":"0x2"
                }]
            '''
        def validation(gen_text):
            json.loads(gen_text)
            def validation(gen_text):
                json.loads(gen_text)
        text = self.call_model(_msg, 'out/tc_transfer_pkts.json', ['这里是文档中抽取的内容'], validation)
        Log.info('遥控包列表:' + text)
        return json.loads(text)
            text = self.call_model(_msg, 'out/tc_transfer_pkts.json', doc_text, validation)
            Log.info('遥控包列表:' + text)
            pkts.extend(json.loads(text))
        return pkts
    def gen_tc_pkt_details(self, pkt):
        tc_name = pkt['name']
knowledgebase/log/__init__.py
@@ -10,7 +10,7 @@
logger.setLevel(logging.DEBUG)
# 创建一个文件处理器
file_handler = logging.FileHandler('logs.log')
file_handler = logging.FileHandler('logs.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
# 创建一个控制台处理器
main.py
@@ -1,67 +0,0 @@
import math
import os
import random
import time
from knowledgebase.markitdown import MarkItDown
from doc_to_docx import doc_to_docx
def process_docs(directory):
    # 遍历目录下的所有文件
    for filename in os.listdir(directory):
        # 判断是否为 doc 文件
        if filename.endswith(".doc"):
            # 转换为 docx
            doc_to_docx(directory + filename, directory + filename.replace(".doc", ".docx"))
md = MarkItDown()
def to_markdown(dst_dir: str):
    text = ''
    # 遍历文件夹下的所有文件
    for file in os.listdir(dst_dir):
        # 判断是否为 docx 文件
        if file.endswith(".docx"):
            # 转换为 md
            result = md.convert(dst_dir + file)
            text = result.text_content
            out_file = dst_dir + file + '.md'
            with open(out_file, 'w', encoding='utf-8') as f:
                f.write(text)
    return out_file
# 1.解析文档
# 2.输入文档
# 3.启动LangFlow
def main():
    doc_dir = ".\\doc\\"
    # 处理文档
    # process_docs(doc_dir)
    # 文档转换为markdown
    md_file = to_markdown(doc_dir)
    md_file = 'D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\test.md'
    # 启动大模型处理流程
    # ret_text = LangFlow([md_file]).run()
    # 保存结果
    # with open('D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\test.text', 'w', encoding='utf-8') as f:
    #     f.write(ret_text)
def get_bit_mask(start, end):
    bits = math.ceil((end + 1) / 8) * 8
    if bits == 0:
        bits = 8
    mask = 0
    for i in range(start, end + 1):
        mask |= 1 << (bits - i - 1)
    return mask
# if __name__ == '__main__':
#     main()
testcases/test_doc_db_helper.py
New file
@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
#
# @author:
# @date:
# @version:
# @description:
from knowledgebase.db.doc_db_helper import doc_dbh
def test():
    text = doc_dbh.get_text_with_entities(['遥控包格式'])
    print(text)
testcases/test_doc_processor.py
New file
@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
#
# @author:
# @date:
# @version:
# @description:
from knowledgebase.db.doc_db_helper import doc_dbh
from knowledgebase.doc.doc_processor import DocProcessor
def test_process():
    files = [
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机1553B总线传输通信帧分配(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机分系统遥测源包设计报告(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机软件用户需求(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机遥测大纲(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机遥测信号分配表(公开).docx",
        # r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机指令格式与编码定义(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\指令格式(公开).docx"
    ]
    for file in files:
        doc_processor = DocProcessor(file)
        doc_processor.process()
def test_get_text_by_entity():
    text = doc_dbh.get_text_with_entities(['分系统源包'])
    print(text)
if __name__ == '__main__':
    # test_process()
    test_get_text_by_entity()
testcases/test_docx_split.py
New file
@@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
#
# @author:
# @date:
# @version:
# @description:
from knowledgebase.doc.docx_split import DocSplit
class TestDocxSplit:
    def test_split(self):
        docx_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机1553B总线传输通信帧分配(公开).docx'
        # docx_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\table_test.docx'
        doc_split = DocSplit(docx_file, "总线通信协议")
        doc_split.split()
        print("\n".join([x.full_text_with_children for x in doc_split.paragraphs]))
tpl/entities.json
@@ -15,13 +15,15 @@
        "遥测格式定义": "一般在“遥测格式”章节,内容包含”遥测帧“ ”遥测包“具体格式的定义",
        "虚拟信道定义": "章节名包含“虚拟信道”,内容包含虚拟信道的划分,各源包在各虚拟信道下传分配",
        "插入域": "章节名包含“插入域”,内容为一张表格,定义了插入域中的遥测参数",
        "源包参数表": "章节名包含“源包设计”,内容为多个源包具体参数的表格,每个源包单独一张表格"
        "源包参数表": "章节名包含“源包设计”,内容为多个源包具体参数的表格,每个源包单独一张表格",
        "遥测源包下传时机": "章节名包含类似“遥测源包下传时机”的文本,内容为一个表格描述遥测源包下传时机"
      }
    },
    "源包设计": {
      "prompts": "文件名通常包含“源包”关键字",
      "entities": {
        "源包参数表": "通常为叶子节点,章节名通常为 “xxx包”,内容为源包参数表格,定义了包头、数据域具体内容"
        "源包参数表": "通常为叶子节点,章节名通常为 “xxx包”,内容为源包参数表格,定义了包头、数据域具体内容",
        "源包列表": "章节名包含“遥测源包类型定义”的文本内容"
      }
    }
  },