lyg
2025-05-22 c099e6662b8a6e320ac314d31eda9b40455e5aa7
db_struct_flow.py
@@ -12,10 +12,12 @@
import data_templates
from knowledgebase import utils
from knowledgebase.db.db_helper import create_project, create_device, create_data_stream, \
    update_rule_enc, create_extend_info, create_ref_ds_rule_stream, create_ins_format
    update_rule_enc, create_extend_info, create_ref_ds_rule_stream, create_ins_format, make_attr
from knowledgebase.db.data_creator import create_prop_enc, create_enc_pkt, get_data_ty, create_any_pkt
from knowledgebase.db.models import TProject
from knowledgebase.db.doc_db_helper import doc_dbh
# file_map = {
#     # "遥测源包设计报告": "./doc/HY-4A数管分系统遥测源包设计报告 Z 240824 更改3(内部) .docx.md",
@@ -105,9 +107,16 @@
10. 以 JSON 格式组织输出内容,确保数据结构的完整性和可读性,注意:生成的JSON语法格式必须符合json规范,避免出现错误。
    
## 限制:
- id和code的命名规则:英文字母、数字、下划线组成,且以英文字母或下划线开头。
- 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。
- 不输出任何注释等描述性信息。
"""
tc_system_msg = """
# 角色
你是一个资深软件工程师。
# 约束
- 输出内容必须根据文档和问题回答,不要创造其他内容;
- 输出内容必须是,JSON格式,不要输出其他文本。
"""
g_completion = None
@@ -346,13 +355,21 @@
        self.gen_tc()
        return ''
    def _gen(self, msgs, msg, files=None):
        if files is None:
            files = [file_map['文档合并']]
    def get_text_with_entity(self, entity_names: list[str]) -> str:
        """
        根据实体词获取文档文本
        :param entity_names: str - 实体词名称
        :return: str - 文本内容
        """
        return doc_dbh.get_text_with_entities(entity_names)
    def _gen(self, msgs, msg, doc_text):
        # if files is None:
        #     files = [file_map['文档合并']]
        messages = [] if msgs is None else msgs
        doc_text = ''
        for file in files:
            doc_text += '\n' + read_from_file(file)
        # doc_text = ''
        # for file in files:
        #     doc_text += '\n' + read_from_file(file)
        if len(messages) == 0:
            # 如果是第一次提问加入system消息
            messages.append({'role': 'system', 'content': assistant_msg})
@@ -380,14 +397,14 @@
        g_completion = None
        return text
    def generate_text(self, msg, cache_file, msgs=None, files=None, validation=None, try_cnt=5, json_text=False):
    def generate_text(self, msg, cache_file, msgs=None, doc_text=None, validation=None, try_cnt=5, json_text=False):
        if msgs is None:
            msgs = []
        if USE_CACHE and os.path.isfile(cache_file):
            text = read_from_file(cache_file)
        else:
            s = time.time()
            text = self._gen(msgs, msg, files)
            text = self._gen(msgs, msg, doc_text)
            text = remove_think_tag(text)
            if json_text:
                text = get_json_text(text)
@@ -398,39 +415,30 @@
                    print(e)
                    if try_cnt <= 0:
                        raise RuntimeError('生成失败,重试次数太多,强制结束!')
                    return self.generate_text_json(msg, cache_file, msgs, files, validation, try_cnt - 1)
                    return self.generate_text_json(msg, cache_file, msgs, doc_text, validation, try_cnt - 1)
            if cache_file:
                save_to_file(text, cache_file)
            print(f'耗时:{time.time() - s}')
        return text
    def generate_text_json(self, msg, cache_file, msgs=None, files=None, validation=None, try_cnt=5):
        return self.generate_text(msg, cache_file, msgs, files, validation, try_cnt, True)
    def generate_text_json(self, msg, cache_file, msgs=None, doc_text=None, validation=None, try_cnt=5):
        return self.generate_text(msg, cache_file, msgs, doc_text, validation, try_cnt, True)
    def generate_tc_text(self, msg, cache_file, messages=None, files=None, validation=None, try_cnt=5):
        if messages is None:
            messages = []
        doc_text = ''
        for file in files:
            doc_text += '\n' + read_from_file(file)
        if len(messages) == 0:
            # 如果是第一次提问加入system消息
            messages.append({'role': 'user', 'content': "以下是文档内容:\n" + doc_text})
        return self.generate_text_json(msg, cache_file, messages, files, validation, try_cnt)
    def generate_tc_text(self, msg, cache_file, msgs=None, doc_text=None, validation=None, try_cnt=5):
        msgs = [
            {'role': 'system', 'content': tc_system_msg},
            {'role': 'user', 'content': "以下是文档内容:\n" + doc_text}]
        return self.generate_text(msg, cache_file, msgs, doc_text, validation, try_cnt, True)
    def gen_project(self):
        #         _msg = """
        # 根据文档输出型号信息,型号字段包括:名称和代号。仅输出型号这一级。
        # 例如:{"name":"xxx","id":"xxx"}
        # """
        #         print('型号信息:')
        #         text = self.generate_text(_msg, 'out/型号信息.json', files=[file_map['应用软件用户需求']])
        #         proj_dict = json.loads(text)
        # 工程信息从系统获取
        proj_dict = {
            "id": "JB200001",
            "name": "HY-4A"
        }
        _msg = """
            根据文档内容输出分系统信息,分系统字段包括:名称和型号代号。仅输出分系统这一级。如果型号代号中有符号也要输出,保证输出完整。
            例如:{"name":"xxx","id":"xxx"}
        """
        print('型号信息:')
        doc_text = self.get_text_with_entity(['系统概述'])
        text = self.generate_text_json(_msg, 'out/型号信息.json', doc_text=doc_text)
        proj_dict = json.loads(text)
        code = proj_dict['id']
        name = proj_dict['name']
        proj = create_project(code, name, code, name, "", datetime.now())
@@ -495,7 +503,8 @@
            assert isinstance(_devs, list), '数据结构最外层不是数组'
            assert next(filter(lambda it: it['name'].endswith('管理单元'), _devs), None), '生成的设备列表中没有管理单元'
        text = self.generate_text_json(_msg, cache_file, files=[file_map['应用软件用户需求']], validation=validation)
        doc_text = self.get_text_with_entity(['系统概述', '总线管理'])
        text = self.generate_text_json(_msg, cache_file, doc_text=doc_text, validation=validation)
        devs = json.loads(text)
        # 类SMU设备,包含遥测和遥控功能,名称结尾为“管理单元”
@@ -550,9 +559,10 @@
参数信息字段包括:name(参数名称)、id(参数代号)、pos(参数起始bit位置)、length(参数bit长度)、type(类型:para)。
注意:
1个字节的长度为8位,使用B0-B7来表示,请精确计算参数长度。
文档中位置描述信息可能存在跨字节的情况,例如:"Byte1_B6~Byte2_B0":表示从第1个字节的第7位到第2个字节的第1位,长度是3;"Byte27_B7~Byte28_B0":表示从第27个字节的第8位到第28个字节的第1位,长度是2。
文档中位置描述信息可能存在跨字节的情况,例如:"Byte1_B6~Byte2_B0":表示从第1个字节的第7位到第2个字节的第1位,长度是3;"Byte27_B7~Byte28_B0":表示从第27个字节的第8位到第28个字节的第1位,长度是2;"Byte38~Byte74":表示从第38个字节到第74个字节,中间有37个字节,长度是298。
#约束
- 不要遗漏任何参数;
- 如果有代号严格依照文档中的代号,文档中的代号如果不符合代号命名规则将特殊字符转换为下划线,例如:Rsv-1转换为Rsv_1;
- 数据结构最外层为数组,数组元素为参数信息对象;
- 仅输出JSON文本。
#例子
@@ -567,14 +577,14 @@
]
"""
        print('插入域参数列表:')
        files = [file_map['遥测大纲']]
        def validation(gen_text):
            params = json.loads(gen_text)
            assert isinstance(params, list), '插入域参数列表数据结构最外层必须是数组'
            assert len(params), '插入域参数列表不能为空'
        text = self.generate_text_json(_msg, './out/插入域参数列表.json', files=files, validation=validation)
        doc_text = self.get_text_with_entity(['插入域'])
        text = self.generate_text_json(_msg, './out/插入域参数列表.json', doc_text=doc_text, validation=validation)
        return json.loads(text)
    def gen_tm_frame_data(self):
@@ -623,29 +633,31 @@
            _vc_pkts = filter(lambda it: it['vcs'].__contains__(vc['id']), self.vc_pkts)
            for _pkt in _vc_pkts:
                # 判断遥测包是否有详细定义
                if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], self.tm_pkts), None):
                    continue
                # if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], self.tm_pkts), None):
                #     continue
                # 获取包详情
                _pkt = self.gen_pkt_details(_pkt['name'], _pkt['id'])
                epdu = next(filter(lambda it: it['name'] == '数据域', vc['children']), None)
                if epdu and _pkt:
                    _pkt['children'] = _pkt['datas']
                    _last_par = _pkt['children'][len(_pkt['children']) - 1]
                    _pkt['length'] = (_last_par['pos'] + _last_par['length'])
                    _pkt['pos'] = 0
                    if 'children' not in epdu:
                        epdu['children'] = []
                    # 添加解析规则后缀防止重复
                    _pkt['id'] = _pkt['id'] + '_' + vc['VCID']
                    # 给包名加代号前缀
                    if not _pkt['name'].startswith(_pkt['id']):
                        _pkt['name'] = _pkt['id'] + '_' + _pkt['name']
                    epdu['children'].append(_pkt)
                    apid_node = next(filter(lambda it: it['name'].__contains__('应用过程'), _pkt['headers']), None)
                    ser_node = next(filter(lambda it: it['name'] == '服务', _pkt['headers']), None)
                    sub_ser_node = next(filter(lambda it: it['name'] == '子服务', _pkt['headers']), None)
                    _pkt['vals'] = \
                        f"{apid_node['content']}/{int(ser_node['content'], 16)}/{int(sub_ser_node['content'], 16)}/"
                    # todo 当数据包获取到东西但不是参数时,获取到的包结构有问题,需要过滤
                    if len(_pkt['children']) > 0:
                        _last_par = _pkt['children'][len(_pkt['children']) - 1]
                        _pkt['length'] = (_last_par['pos'] + _last_par['length'])
                        _pkt['pos'] = 0
                        if 'children' not in epdu:
                            epdu['children'] = []
                        # 添加解析规则后缀防止重复
                        _pkt['id'] = _pkt['id'] + '_' + vc['VCID']
                        # 给包名加代号前缀
                        if not _pkt['name'].startswith(_pkt['id']):
                            _pkt['name'] = _pkt['id'] + '_' + _pkt['name']
                        epdu['children'].append(_pkt)
                        apid_node = next(filter(lambda it: it['name'].__contains__('应用过程'), _pkt['headers']), None)
                        ser_node = next(filter(lambda it: it['name'] == '服务', _pkt['headers']), None)
                        sub_ser_node = next(filter(lambda it: it['name'] == '子服务', _pkt['headers']), None)
                        _pkt['vals'] = \
                            f"{apid_node['content']}/{int(ser_node['content'], 16)}/{int(sub_ser_node['content'], 16)}/"
        # 重新计数起始偏移
        self.compute_length_pos(cadu['children'])
@@ -718,7 +730,8 @@
            assert next(filter(lambda it: re.match('^[0-1]+$', it['VCID']), vcs)), '生成的VCID必须是二进制'
        print('虚拟信道:')
        text = self.generate_text_json(_msg, "out/虚拟信道.json", files=[file_map['遥测大纲']], validation=validation)
        doc_text = self.get_text_with_entity(['虚拟信道定义'])
        text = self.generate_text_json(_msg, "out/虚拟信道.json", doc_text=doc_text, validation=validation)
        vcs = json.loads(text)
        return vcs
@@ -743,131 +756,106 @@
        dev_pkts = json.loads(text)
        return dev_pkts
    def pkt_in_tm_pkts(self, pkt_name):
        cache_file = f'out/数据包-{pkt_name}.json'
        if os.path.isfile(cache_file):
            return True
        files = [file_map['遥测源包设计报告']]
        print(f'文档中有无“{pkt_name}”的字段描述:', end='')
        _msg = f"""
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中分析判读是否有某个遥测包的字段表描述,你要帮助我判断。
#问题
文档中有遥测包“{pkt_name}”的字段表描述吗?
注意:遥测包的字段表紧接着遥测包章节标题,如果章节标题后面省略了或者详见xxx则是没有字段表描述。
#约束
- 根据文档内容输出;
- 遥测包名称必须完全匹配;
- 输出“无”或“有”,不要输出其他任何内容。
#例子
"""
        text = self.generate_text(_msg, f'out/pkts/有无数据包-{pkt_name}.txt', [], files)
        return text == '有'
    def gen_pkt_details(self, pkt_name, pkt_id):
        cache_file = f'out/数据包-{pkt_name}.json'
        files = [file_map['遥测源包设计报告']]
        if not os.path.isfile(cache_file):
            # 先问最后一个参数的字节位置
            print(f'遥测源包“{pkt_name}”信息:')
            _msg = f"""
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中提取遥测源包的最后一个参数的bit位置和数据域参数个数,你要帮我完成参数bit位置和数据域参数个数的提取。
#需求
输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包的最后一个参数的bit位置和数据域参数个数。
""" + """
#约束
- 遥测源包的内容在一个表格中定义,表格结束则包内容结束;
- 数据域中每一行对应一个参数;
- 不要跨表格提取;
- 字节位置中字节位置是从1开始的,bit位置是从0开始的;
- bit位置计算公式为:(N-1)*8+B,其中N是字节数,B是bit数;
- 仅输出json,不要输出其他任何字符。
#例子:
{"last_par_pos":128, "par_num": 20}
"""
            text = self.generate_text_json(_msg, '', files=files)
            result = json.loads(text)
            last_par_pos = result['last_par_pos']
            par_num = result['par_num']
            _msg = f"""
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中提取遥测源包信息列表,你要帮我完成遥测源包信息列表的提取。
#需求
输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包。
注意:最后一个参数的起始bit偏移位置为{last_par_pos}。
""" + """
遥测包字段包括:名称(name)、代号(id)、类型(type)、包头属性列表(headers)、数据域参数列表(datas),类型为 linear;
包头的属性的字段包括:名称(name)、代号(id)、位置(pos)、定义(content)、长度(length)、类型(type),类型为 para;
数据域参数字段包括:参数名称(name)、参数代号(id)、位置(pos)、长度(length)、字节顺序(byteOrder),类型为 para;
        # _msg = f"""
        #     #角色
        #     你是一名资深的软件工程师。
        #     #指令
        #     我需要从文档中提取遥测源包的最后一个参数的bit位置和数据域参数个数,你要帮我完成参数bit位置和数据域参数个数的提取。
        #     #需求
        #     输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包的最后一个参数的bit位置和数据域参数个数。
        #     """ + """
        #     #约束
        #     - 遥测源包的内容在一个表格中定义,表格结束则包内容结束;
        #     - 数据域中每一行对应一个参数;
        #     - 不要跨表格提取;
        #     - 字节位置中字节位置是从1开始的,bit位置是从0开始的;
        #     - bit位置计算公式为:(N-1)*8+B,其中N是字节数,B是bit数;
        #     - 仅输出json,不要输出其他任何字符。
        #     #例子:
        #     {"last_par_pos":128, "par_num": 20}
        #     """
包头属性包括:包版本号、包类型、副导头标识、应用过程标识、序列标记、包序列计数、包长、服务、子服务。
包头属性的长度:包版本号(3)、包类型(1)、副导头标识(1)、应用过程标识(11)、序列标记(2)、包序列计数(14)、包长(16)、服务(8)、子服务(8)。
        # text = self.generate_text_json(_msg, '', doc_text=doc_text)
        # result = json.loads(text)
        # last_par_pos = result['last_par_pos']
        # par_num = result['par_num']
表格单元格合并说明:包格中存在单元格合并的情况,如果水平或垂直相邻的单元格内容一样那么这几个内容一样的单元格有可能是一个合并单元格在分析时应该当作合并单元格分析。
#约束
- 代号命名规则:数字、英文字母和下划线组成且以英文字母和下划线开头;
- 如果没有名称用代号代替,如果没有代号用名称的英文翻译代替,翻译尽量简短;
- 如果有代号严格依照文档中的代号,文档中的代号如果不符合代号命名规则将特殊字符转换为下划线,例如:Rsv-1转换为Rsv_1;
- 你需要理解数据包的位置信息,由位置信息得到长度,并且将所有输出单位统一转换为 bits;
- pos字段:数值类型,从0开始计算,由长度(length)累加得到;
- 应用过程标识:如果不是十六进制转换为十六进制,转换完成后要验证是否正确,以0x开头,;
- 包头后面的每一行都对应一个参数,逐行输出参数,不要遗漏任何参数;
- 类似”保留(Rsv)“的行也要当参数生成;
- 重复的行也要生成;
- 注意包内容的范围,不要提取到其他包中的内容,包内容都在同一个表格中;
- 字节顺序:值为大端“B”,小端“L”,默认为“B”;
- 输出严格按照文档中的内容生成,不要创造文档中不存在的内容;
- 仅输出json,不要输出任何其他内容。
#例子
{
  "name": "数管缓变遥测包",
  "id": "PMS003",
  "type": "linear",
  "headers": [
    {
      "name": "包标识",
      "id": "packetIdentifier",
      "pos": 0,
      "content": "000",
      "length": 8,
      "type": "para"
    }
  ],
  "datas": [
    {
      "name": "XXX包",
      "id": "XXX",
      "pos": 0,
      "length": 8,
      "byteOrder": ""
    }
  ]
"""
            print(f'遥测源包“{pkt_name}”信息:')
        _msg = f"""
            #角色
            你是一名资深的软件工程师。
            #指令
            我需要从文档中提取遥测源包信息列表,你要帮我完成遥测源包信息列表的提取。
            #需求
            输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包。
            """ + """
            遥测包字段包括:名称(name)、代号(id)、类型(type)、包头属性列表(headers)、数据域参数列表(datas),类型为 linear;
            包头的属性的字段包括:名称(name)、代号(id)、位置(pos)、定义(content)、长度(length)、类型(type),类型为 para;
            数据域参数字段包括:参数名称(name)、参数代号(id)、位置(pos)、长度(length)、字节顺序(byteOrder),类型为 para;
            def validation(gen_text):
                _pkt = json.loads(gen_text)
                with open(f'out/tmp/{time.time()}.json', 'w') as f:
                    f.write(gen_text)
                assert 'headers' in _pkt, '包结构中必须包含headers字段'
                assert 'datas' in _pkt, '包结构中必须包含datas字段'
                print(f'参数个数:{len(_pkt["datas"])}')
                # assert par_num == len(_pkt['datas']), f'数据域参数个数不对!预计{par_num}个,实际{len(_pkt["datas"])}'
                assert last_par_pos == _pkt['datas'][-1]['pos'], '最后一个参数的字节位置不对!'
            包头属性包括:包版本号、包类型、副导头标识、应用过程标识、序列标记、包序列计数、包长、服务、子服务。
            包头属性的长度:包版本号(3)、包类型(1)、副导头标识(1)、应用过程标识(11)、序列标记(2)、包序列计数(14)、包长(16)、服务(8)、子服务(8)。
            text = self.generate_text_json(_msg, cache_file, [], files, validation)
            pkt = json.loads(text)
        else:
            pkt = json.loads(read_from_file(cache_file))
            表格单元格合并说明:包格中存在单元格合并的情况,如果水平或垂直相邻的单元格内容一样那么这几个内容一样的单元格有可能是一个合并单元格在分析时应该当作合并单元格分析。
            #约束
            - 代号命名规则:数字、英文字母和下划线组成且以英文字母和下划线开头;
            - 如果没有名称用代号代替,如果没有代号用名称的英文翻译代替,翻译尽量简短;
            - 如果有代号严格依照文档中的代号,文档中的代号如果不符合代号命名规则将特殊字符转换为下划线,例如:Rsv-1转换为Rsv_1;
            - 你需要理解数据包的位置信息,由位置信息得到长度,并且将所有输出单位统一转换为 bits;
            - pos字段:数值类型,从0开始计算,由长度(length)累加得到;
            - 应用过程标识:应用过程标识的定义如果不是十六进制转换为十六进制,转换完成后要验证是否正确,以0x开头;
            - 包头后面的每一行都对应一个参数,逐行输出参数,不要遗漏任何参数;
            - 类似”保留(Rsv)“的行也要当参数生成;
            - 重复的行也要生成;
            - 注意包内容的范围,不要提取到其他包中的内容,包内容都在同一个表格中;
            - 字节顺序:值为大端“B”,小端“L”,默认为“B”;
            - 输出严格按照文档中的内容生成,不要创造文档中不存在的内容;
            - 仅输出json,不要输出任何其他内容。
            #例子
            {
            "name": "数管缓变遥测包",
            "id": "PMS003",
            "type": "linear",
            "headers": [
                {
                "name": "包标识",
                "id": "packetIdentifier",
                "pos": 0,
                "content": "000",
                "length": 8,
                "type": "para"
                }
            ],
            "datas": [
                {
                "name": "XXX包",
                "id": "XXX",
                "pos": 0,
                "length": 8,
                "byteOrder": ""
                }
            ]
            """
        print(f'遥测源包“{pkt_name}”信息:')
        def validation(gen_text):
            _pkt = json.loads(gen_text)
            with open(f'out/tmp/{time.time()}.json', 'w') as f:
                f.write(gen_text)
            assert 'headers' in _pkt, '包结构中必须包含headers字段'
            assert 'datas' in _pkt, '包结构中必须包含datas字段'
            # assert par_num == len(_pkt['datas']), f'数据域参数个数不对!预计{par_num}个,实际{len(_pkt["datas"])}'
            # assert last_par_pos == _pkt['datas'][-1]['pos'], '最后一个参数的字节位置不对!'
        doc_text = self.get_text_with_entity([pkt_id])
        if doc_text == '':
            return None
        text = self.generate_text_json(_msg, cache_file, [], doc_text, validation)
        pkt = json.loads(text)
        pkt_len = 0
        for par in pkt['datas']:
            par['pos'] = pkt_len
@@ -882,14 +870,12 @@
#指令
我需要从文档中提取遥测包数据,你要根据文档内容帮我完成遥测包数据的提取。
#需求
输出文档中描述的遥测包列表,遥测包字段包括:名称(name)、代号(id)、是否有参数(hasParams)。
输出文档中描述的遥测包列表,遥测包字段包括:名称(name)、代号(id)。
字段描述:
1.名称:遥测包的名称;
2.代号:遥测包的代号;
3.是否有参数:表示当前遥测包是否有参数列表,遥测包的参数表紧接着遥测包章节标题,如果章节标题后面省略了或者类似”详见xxx“则是没有参数表。
#约束
- name:名称中不要包含代号,仅从文档中提取源包名称;
- hasParams:值为布尔值,true或false;
- 如果没有代号,使用遥测包名称的英文翻译代替;
- 如果没有名称用代号代替;
- 不要漏掉任何遥测包;
@@ -899,13 +885,12 @@
    {
        "name": "数管数字量快速源包",
        "id": "PMS001",
        "hasParams": true
    }
]
"""
        print(f'遥测源包列表:')
        files = [file_map['遥测源包设计报告']]
        text = self.generate_text_json(_msg, 'out/源包列表.json', [], files)
        doc_text = self.get_text_with_entity(['源包列表'])
        text = self.generate_text_json(_msg, 'out/源包列表.json', doc_text=doc_text)
        pkt = json.loads(text)
        return pkt
@@ -914,11 +899,10 @@
#角色
你是一名资深软件工程师。
#指令
我需要从文档中提取遥测源包信息,你要帮助我完成遥测源包信息的提取。
我需要从文档中提取所有遥测源包信息,你要帮助我完成遥测源包信息的提取。
#需求
根据”遥测源包下传时机定义“章节的内容输出各个遥测源包信息列表,顶级结构为数组元素为遥测源包,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs),下传时机(timeTags)。
根据文档内容输出遥测源包信息,顶级结构为数组,元素为遥测源包,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs),下传时机(timeTags)。
#约束
- 从”遥测源包下传时机定义“章节中提取遥测源包信息;
- 所属虚拟信道:必须是文档中描述的遥测虚拟信道代号(序号);
- 下传时机:与表格中定义的一致;
- 不要遗漏任何遥测源包。
@@ -932,14 +916,14 @@
  },
]
        """
        files = [file_map['遥测大纲']]
        print('遥测源包所属虚拟信道:')
        def validation(gen_text):
            pkts = json.loads(gen_text)
            assert len(pkts), 'VC源包列表不能为空'
        text = self.generate_text_json(_msg, 'out/遥测VC源包.json', files=files, validation=validation)
        doc_text = self.get_text_with_entity(['虚拟信道定义', '遥测源包下传时机'])
        text = self.generate_text_json(_msg, 'out/遥测VC源包.json', doc_text=doc_text, validation=validation)
        pkt_vcs = json.loads(text)
        return pkt_vcs
@@ -1044,7 +1028,8 @@
        def validation(gen_text):
            json.loads(gen_text)
        text = self.generate_text_json(_msg, 'out/总线.json', files=[file_map['总线传输通信帧分配']],
        doc_text = self.get_text_with_entity(['RT地址分配', '分系统源包'])
        text = self.generate_text_json(_msg, 'out/总线.json', doc_text=doc_text,
                                       validation=validation)
        pkts = json.loads(text)
        # 筛选经总线的数据包
@@ -1053,11 +1038,7 @@
        # 筛选有apid的数据包
        pkts = list(filter(lambda it: it['apid'], pkts))
        pkts2 = []
        for pkt in pkts:
            if self.pkt_in_tm_pkts(pkt["name"]):
                pkts2.append(pkt)
        for pkt in pkts2:
            _pkt = self.gen_pkt_details(pkt['name'], pkt['id'])
            if _pkt:
                pkt['children'] = []
@@ -1148,18 +1129,18 @@
        # 遥控包格式
        pkt_format = self.gen_tc_pkt_format()
        # 遥控包列表
        pkts = self.gen_tc_transfer_pkts()
        for pkt in pkts:
            # 遥控包数据区内容
            self.gen_tc_pkt_details(pkt)
            pkt['type'] = 'insUnit'
        instructions = self.gen_tc_transfer_pkts()
        for inst in instructions:
            # 遥控指令数据区内容
            self.gen_tc_pkt_details(inst)
            inst['type'] = 'insUnit'
            format_text = json.dumps(pkt_format, ensure_ascii=False)
            format_text = utils.replace_tpl_paras(format_text, pkt)
            format_text = utils.replace_tpl_paras(format_text, inst)
            pf = json.loads(format_text)
            pf['name'] = pkt['name']
            pf['code'] = pkt['code']
            pf['name'] = inst['name']
            pf['code'] = inst['code']
            data_area = next(filter(lambda x: x['name'] == '应用数据区', pf['children']))
            data_area['children'].append(pkt)
            data_area['children'].append(inst)
            frame['subPkts'].append(pf)
        self.order = 0
@@ -1173,7 +1154,7 @@
            elif item['type'] == 'length':
                return None
            elif item['type'] == 'checkSum':
                return json.dumps({"ChecksumType": "CRC-CCITT"})
                return json.dumps({"ChecksumType": item['value']['type']})
            elif item['type'] == 'subPkt':
                return json.dumps({"CanInput": False})
            elif item['type'] in ['combPkt', 'insUnitList', 'input']:
@@ -1182,26 +1163,10 @@
                return '{"MinLength":null,"MaxLength":null,"IsSubPackage":false,"InputParams":[],"OutPutParams":[],"MatchItems":[]}'
            elif item['type'] == 'pkt':
                return '''{"MaxLength":1024,"IsSplit8":false,"Split8Start":null,"Split8End":null,"PadCode":null,"Alignment":null,"InputParams":[],"OutPutParams":[],"MatchItems":[]}'''
            elif item['type'] == 'pktSeqCnt':
                return json.dumps({"FirstPackValue":"PackCount","MiddlePackValue":"PackIndex","LastPackValue":"PackIndex","IndependPackValue":"InsUnitCount"})
            elif 'value' in item:
                return item['value']
        def make_attr(ty: str):
            """
            获取字段定义的ATTR。
            位掩码,用于标识节点类型。
            类型:0~2 BinaryType;
                3~5 DataType;
                6~8: InputFormat;
                9 : IsSubPackage;
                10: IsSendFlag;
                11~13: ProcessMethod;
                14~16: ExpressionType;
                17~19: EnumType
            :param ty:
            :return:
            """
        def create_tc_format(parent_pk, field, parent_parent_pk=None):
            """
@@ -1229,11 +1194,20 @@
            if 'length' in field:
                field['bitWidth'] = field['length']
            field['bitOrder'] = None
            field['attr'] = 0
            field['attr'] = make_attr(field)
            if field['type'] == 'length' and 'value' in field and field['value']:
                val = field['value']
                field['range'] = val['start'] + "~" + val['end']
                field['formula'] = val['formula']
            # 即时输入长度为null则是变长字段,需要把类型改为variableLength
            if field['type'] == 'input' and field['length'] is None:
                field['type'] = 'variableLength'
            # 枚举值默认值设置
            if field['type'] == 'enum' and len(field['enums']) and not next(filter(lambda x: 'default' in x and x['default'], field['enums']), None):
                field['enums'][0]['default'] = True
            # 校验和
            if field['type'] == 'checkSum':
                field['range'] = f'{field["value"]["start"]}~{field["value"]["end"]}'
            ins_format = create_ins_format(self.proj.C_PROJECT_PK, parent_pk, field)
            ins_format_pk = ins_format.C_INS_FORMAT_PK
            if 'children' in field:
@@ -1272,11 +1246,12 @@
- 通过标志:const,二进制,以B结尾;
- 控制命令标志:const,二进制,以B结尾;
- 空闲位:const,二进制,以B结尾;
- 航天器标识:const,十六进制,以0x开头;
- 航天器标识:const,十六进制,以0x开头,如果是二进制或十进制需要转换为十六进制;
- 虚拟信道标识:sendFlag,发送标记,默认为“任务注入帧”,所有的值都要列举出来;
# 数据类型
- const:固定码字,数值,二进制以B结尾,十进制,十六进制以0x开头;
- sendFlag:发送标记,类似枚举,定义样例:[{"n":"name","v":"value","c":"code","default":true}],n表示名称,v表示值,c表示code(没有空着),default表示是默认值;
- checkSum:校验和,如果是校验和类型还需要分析校验和的算法,并保存在value中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other)
# 约束
- 以JSON格式输出;
- 仅输出JSON文本,不要输出任何其他文本。
@@ -1291,7 +1266,8 @@
        def validation(gen_text):
            json.loads(gen_text)
        text = self.generate_tc_text(_msg, 'out/tc_transfer_frame.json', files=[file_map['指令格式']],
        doc_text = self.get_text_with_entity(['遥控帧格式'])
        text = self.generate_tc_text(_msg, 'out/tc_transfer_frame.json', doc_text=doc_text,
                                     validation=validation)
        result: dict = json.loads(text)
        format_text = utils.read_from_file('tpl/tc_transfer_frame.json')
@@ -1317,7 +1293,19 @@
- 命令正确应答: const,二进制;
- 源地址: const,十六进制。
# 数据类型
- const:固定码字,数值,二进制以B结尾,十进制,十六进制以0x开头;
- 固定码字:const,数值,二进制以B结尾,十进制,十六进制以0x开头;
- 长度:length,如果字段描述内容为数据区域的长度则表示是长度,长度的value为数值、null或范围定义,
- 枚举值:enum,
- 校验和:checkSum,如果是校验和类型还需要分析校验和的算法,并保存在value中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other)
- 即时输入:input。
# 长度类型的范围定义描述
{"start": "起始字段code", "end": "结束字段code", "formula": "计算公式"}
- start:起始字段code,长度包括起始字段,字段描述中说明了起始字段,
- end:结束字段code,长度包括结束字段,字段描述中说明了结束字段,
- formula:计算公式,如果没有计算相关描述则表示不需要计算公式。
计算公式定义:
- BYTES:按字节计算;
- N-x:总字节数减x,例如总字节数减1的公式为N-1。
# 约束
- 以JSON格式输出;
- 仅输出JSON文本,不要输出任何其他文本。
@@ -1332,7 +1320,8 @@
        def validation(gen_text):
            json.loads(gen_text)
        text = self.generate_tc_text(_msg, 'out/tc_transfer_pkt.json', files=[file_map['指令格式']],
        doc_text = self.get_text_with_entity(['遥控包格式'])
        text = self.generate_tc_text(_msg, 'out/tc_transfer_pkt.json', doc_text=doc_text,
                                     validation=validation)
        result = json.loads(text)
@@ -1346,21 +1335,27 @@
# 角色
你是一名资深的软件工程师。
# 指令
分析文档列出所有的遥控源包。
分析文档列出所有的遥控指令。
#  约束
- 应用过程标识:应用过程标识就是APID,一般会在名称后的括号中列出来;
- code:指令代号,没有就空着;
- 应用数据区:提取表格中的应用数据区内容。
# 输出例子:
[{
"name": "xxx",
"code":"pkt",
"应用过程标识符":"0xAA",
"服务类型":"0x1",
"服务子类型":"0x2"
"服务子类型":"0x2",
"应用数据区": ""
}]
'''
        def validation(gen_text):
            json.loads(gen_text)
        text = self.generate_tc_text(_msg, 'out/tc_transfer_pkts.json', files=[file_map['指令格式']],
        doc_text = self.get_text_with_entity(['APID分配'])
        text = self.generate_tc_text(_msg, 'out/tc_transfer_pkts.json', doc_text=doc_text,
                                     validation=validation)
        pkts = json.loads(text)
        return pkts
@@ -1376,28 +1371,43 @@
# 指令
分析文档,从文档中提取遥控指令名称为“{tc_name}”代号为“{tc_code}”的指令应用数据区定义。
有些文档内容非常简单仅仅包含特定字节的内容描述,如果是这种文档,则每个特定字节的内容描述定义为一个字段,字段类型根据字节内容确定。
""" + """
# 约束
- code 如果没有明确定义则使用名称的英文翻译,尽量简短;
- length 自动转换为bit长度,必须是数值或null,不能为0;
- value 根据字段描述提取;
- enums 有些字段是枚举值,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""};
- 输出内容必须为严格的json,不能输出除json以外的任何内容。
# 字段类型
- 固定码字:const,
- 长度:length,
- 固定码字:const,数值,二进制以B结尾,十进制,十六进制以0x开头;
- 长度:length,如果字段描述内容为数据区域的长度则表示是长度,长度的value为数值、null或范围定义,
- 枚举值:enum,
- 校验和:checkSum,
- 即时输入:input。
- 校验和:checkSum,如果是校验和类型还需要分析校验和的算法是什么,并保存在value中,
- 即时输入:input,如果是即时输入value的值为空字符串。
# 长度类型的范围定义描述
{"start": "起始字段code", "end": "结束字段code", "formula": "计算公式"}
- start:起始字段code,长度包括起始字段,字段描述中说明了起始字段,
- end:结束字段code,长度包括结束字段,字段描述中说明了结束字段,
- formula:计算公式,如果没有计算相关描述则表示不需要计算公式。
计算公式定义:
- BYTES:按字节计算;
- N-x:总字节数减x,例如总字节数减1的公式为N-1。
# 字段类型分析方法
- 根据字段描述分析字段的类型;
- 字段描述中明确指定了字段值的,类型为const;
- 字段中没有明确指定字段值,但是罗列了取值范围的,类型为enum;
- 字段描述中没有明确指定字段值,但是罗列了取值范围的,类型为enum;
- 字段描述中如果没有明确指定字段值也没有罗列取值范围的,类型为input;
- 字段如果是和“长度”有关,类型为length;
- 如果和数据域有关,类型为const;
- 字段如果和校验和有关,类型为checkSum。
- 字段如果和校验和有关,类型为checkSum,分析校验和的算法,并保存在value中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other)。
# 约束
- code 如果没有明确定义则使用名称的英文翻译,尽量简短;
- length 自动转换为bit长度,必须是数值、null或范围定义,不能为0;
- value 根据字段描述提取字段值,字段值一般为数值类型,需要根据字段类型来分析,如果是length类型value的值为范围定义;
- enums 枚举类型的字段必须要有enums,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""};
- length类型的范围定义中的start和end必须是生成结果中的字段code;
- 输出数据结构为数组,数组元素为字段信息;
- 输出内容必须为严格的json,不能输出除json以外的任何内容。
# 输出例子:
[
@@ -1407,16 +1417,53 @@
        "length": 8,
        "type": "const",
        "value": "0xAA"
    },
    {
        "name": "para2",
        "code": "para2",
        "length": 8,
        "type": "length",
        "value": {"start": "para1", "end": "data", "formula": "BYTES"}
    },
    {
        "name": "数据",
        "code": "data",
        "length": null,
        "type": "input",
        "value": ""
    }
    ...
]
"""
        def validation(gen_text):
            json.loads(gen_text)
            fields = json.loads(gen_text)
            for field in fields:
                if field['type'] == 'length':
                    if field['value'] is None:
                        raise Exception('length类型的value不能为空')
                    if 'start' not in field['value'] or 'end' not in field['value']:
                        raise Exception('length类型的value必须包含start和end')
                    if field['value']['start'] not in [f['code'] for f in fields]:
                        raise Exception('length类型的value的start字段必须在fields中')
                    if field['value']['end'] not in [f['code'] for f in fields]:
                        raise Exception('length类型的value的end字段必须在fields中')
                elif field['type'] == 'enum':
                    if 'enums' not in field:
                        raise Exception('enum类型的field必须包含enums')
                    if len(field['enums']) == 0:
                        raise Exception('enum类型的field的enums不能为空')
                    for enum in field['enums']:
                        if 'n' not in enum or 'v' not in enum:
                            raise Exception('enum类型的field的enums的元素必须包含n、v、c')
                        if enum['n'] == '' or enum['v'] == '':
                            raise Exception('enum类型的field的enums的元素不能为空')
        text = self.generate_tc_text(_msg, f'out/遥控指令数据域-{tc_code}-{utils.to_file_name(tc_name)}.json',
                                     files=[file_map['指令格式']],
        doc_text = self.get_text_with_entity([tc_name])
        if doc_text == '':
            doc_text = pkt['应用数据区']
        text = self.generate_tc_text(_msg,
                                     f'out/遥控指令数据域-{tc_code}-{utils.to_file_name(tc_name)}.json',
                                     doc_text=doc_text,
                                     validation=validation)
        result = json.loads(text)
        pkt['children'] = result