lyg
2025-05-22 c099e6662b8a6e320ac314d31eda9b40455e5aa7
knowledgebase/gen_base_db/json_generate.py
@@ -9,15 +9,32 @@
import time
import json
import data_templates
from knowledgebase.db.doc_db_helper import doc_dbh
from knowledgebase.llm import llm
from knowledgebase import utils
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import HumanMessage,SystemMessage
from langchain_core.messages import HumanMessage, SystemMessage
import textwrap
from knowledgebase.log import Log
USE_CACHE = True
class JsonGenerate:
    project: dict
    devs: list[dict]
    cadu: dict
    vcs: list[dict]
    tm_pkts: list[dict]
    vc_pkts: list[dict]
    bus_pkts: list[dict]
    tc_frame: dict
    tc_pkt_format: dict
    tc_pkts: dict
    def __init__(self):
        self.llm = llm
        self.systemPrompt = """
@@ -68,9 +85,18 @@
            - 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。
            - 不输出任何注释等描述性信息。
        """
    # 模型调用
    def call_model(self, msg, cache_file, doc, validation=None, try_cnt=3):
    def call_model(self, msg: str, cache_file: str, doc: str, validation=None, try_cnt=3) -> str:
        """
        调用大模型
        :param msg: 问题
        :param cache_file: 生成结果缓存文件
        :param doc: 文档文本
        :param validation: 校验函数,(text: str)-> None
        :param try_cnt: 失败重试次数
        :return: 生成的文本
        """
        if USE_CACHE and os.path.isfile(cache_file):
            with open(cache_file, 'r', encoding='utf-8') as f:
                text = f.read()
@@ -81,44 +107,64 @@
                messages.append(HumanMessage(info))
            prompt = ChatPromptTemplate.from_messages(messages)
            chain = prompt | self.llm
            # 去除多余的缩进
            msg = textwrap.dedent(msg).strip()
            resp = chain.invoke({"msg": msg})
            text = resp.content
            if validation:
                try:
                    validation(text)
                except BaseException as e:
                    print(e)
                    Log.error(e)
                    if try_cnt <= 0:
                        raise RuntimeError('生成失败,重试次数太多,强制结束!')
                    return self.call_model(msg, cache_file, validation, try_cnt - 1)
            if cache_file:
                with open(cache_file, 'w', encoding='utf-8') as f:
                    f.write(text)
            print(f'耗时:{time.time() - s}')
            Log.info(f'耗时:{time.time() - s}')
        return text
    # :param type: int - None 全部、1 遥测、2 遥控
    def run(self,type):
    @staticmethod
    def get_text_with_entity(entity_names: list[str]) -> str:
        """
        根据实体词获取文档文本
        :param entity_names: str - 实体词名称
        :return: str - 文本内容
        """
        return doc_dbh.get_text_with_entities(entity_names)
    @staticmethod
    def get_texts_with_entity(entity_names: list[str]) -> list[str]:
        """
        根据实体词获取文档文本
        :param entity_names: str - 实体词名称
        :return: str - 文本内容
        """
        return doc_dbh.get_texts_with_entities(entity_names)
    def run(self):
        # 根据文档,生成结构化数据
        if type is not None:
            if type == 1:
                self.handle_yc_structured_data()
            if type == 2:
                self.handle_yk_structured_data()
        else:
            self.handle_yc_structured_data()
            self.handle_yk_structured_data()
    # 遥测-start
    def handle_yc_structured_data(self):
        self.handle_tm_structured_data()
        self.handle_tc_structured_data()
    # region start 遥测
    def handle_tm_structured_data(self):
        self.gen_project()
        self.gen_device()
    # 获取项目信息
    def gen_project(self):
        _msg = '根据文档输出型号信息,型号字段包括:名称和代号。仅输出型号这一级。例如:{"name":"xxx","id":"xxx"}'
        result = self.call_model(_msg, 'out/型号信息.json', ['这里是文档中抽取的内容'])
        print('型号信息:' + result)
        _msg = """
        # 指令
        根据文档内容分析型号信息,型号字段包括:名称和代号。
        # 例子
        {"name":"xxx","id":"xxx"}
        """
        doc_text = self.get_text_with_entity(['系统概述'])
        text = self.call_model(_msg, 'out/型号信息.json', doc_text)
        self.project = json.loads(text)
        Log.info('型号信息:' + self.project)
    # 获取设备信息
    def gen_device(self):
@@ -155,39 +201,100 @@
                }
            ]
        """
        result = self.call_model(_msg, 'out/设备列表.json', ['这里是文档中抽取的内容'])
        print('设备列表:' + result)
        doc_text = self.get_text_with_entity(['系统概述', '总线管理'])
        text = self.call_model(_msg, 'out/设备列表.json', doc_text)
        Log.info('设备列表:' + text)
        devs = json.loads(result)
        self.devs = json.loads(text)
        # 类SMU设备,包含遥测和遥控功能,名称结尾为“管理单元”
        like_smu_devs = list(filter(lambda it: it['hasTcTm'] and it['name'].endswith('管理单元'), devs))
        like_smu_devs = list(filter(lambda it: it['hasTcTm'] and it['name'].endswith('管理单元'), self.devs))
        for dev in like_smu_devs:
            self.gen_tm_frame(dev)
        # 总线
        self.gen_bus()
        hasBus = any(d['hasBus'] for d in self.devs)
        if hasBus:
            self.gen_bus()
    def gen_tm_frame(self,dev):
    def gen_tm_frame(self, dev):
        # 插入域参数列表
        self.gen_insert_domain_params(dev)
        insert_domain = self.gen_insert_domain_params(dev)
        # VC源包格式
        vc_pkt_fields = data_templates.vc_pkt_fields
        # 获取虚拟信道 vc
        vcs = self.gen_vc(dev)
        self.vcs = self.gen_vc(dev)
        for vc in self.vcs:
            vc['children'] = []
            vc['VCID'] = str(int(vc['VCID'], 2))
            for field in vc_pkt_fields:
                if field['name'] == '数据域':
                    field['children'] = []
                vc['children'].append(dict(field))
        def build_vcid_content(vcs):
            _vcs = []
            for _vc in vcs:
                _vcs.append(_vc['name'] + ',' + _vc['VCID'])
            return ' '.join(_vcs)
        # VCID 字段内容
        vcid_content = build_vcid_content(self.vcs)
        # 遥测帧结构由模板生成,只需提供特定参数
        tm_data = {
            "vcidContent": vcid_content,
            'insertDomain': insert_domain,
        }
        self.cadu = data_templates.get_tm_frame(tm_data)
        # 获取vc源包
        vc_pkts = self.gen_pkt_vc(dev)
        self.vc_pkts = self.gen_pkt_vc(dev)
        # 获取源包列表
        tm_pkts = self.gen_pkts(dev)
        self.tm_pkts = self.gen_pkts(dev)
        # 获取VC下面的遥测包数据
        for vc in vcs:
        for vc in self.vcs:
            # 此VC下的遥测包过滤
            _vc_pkts = filter(lambda it: it['vcs'].__contains__(vc['id']), vc_pkts)
            _vc_pkts = filter(lambda it: it['vcs'].__contains__(vc['id']), self.vc_pkts)
            for _pkt in _vc_pkts:
                # 判断遥测包是否有详细定义
                if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], tm_pkts), None):
                if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], self.tm_pkts), None):
                    continue
                # 获取包详情
                _pkt = self.gen_pkt_details(_pkt['name'], _pkt['id'])
                epdu = next(filter(lambda it: it['name'] == '数据域', vc['children']), None)
                if epdu and _pkt:
                    _pkt['children'] = _pkt['datas']
                    _last_par = _pkt['children'][len(_pkt['children']) - 1]
                    _pkt['length'] = (_last_par['pos'] + _last_par['length'])
                    _pkt['pos'] = 0
                    if 'children' not in epdu:
                        epdu['children'] = []
                    # 添加解析规则后缀防止重复
                    _pkt['id'] = _pkt['id'] + '_' + vc['VCID']
                    # 给包名加代号前缀
                    if not _pkt['name'].startswith(_pkt['id']):
                        _pkt['name'] = _pkt['id'] + '_' + _pkt['name']
                    epdu['children'].append(_pkt)
                    apid_node = next(filter(lambda it: it['name'].__contains__('应用过程'), _pkt['headers']), None)
                    ser_node = next(filter(lambda it: it['name'] == '服务', _pkt['headers']), None)
                    sub_ser_node = next(filter(lambda it: it['name'] == '子服务', _pkt['headers']), None)
                    _pkt['vals'] = \
                        f"{apid_node['content']}/{int(ser_node['content'], 16)}/{int(sub_ser_node['content'], 16)}/"
        # 重新计数起始偏移
        self.compute_length_pos(self.cadu['children'])
    def gen_insert_domain_params(self,dev):
    def compute_length_pos(self, items: list):
        length = 0
        pos = 0
        for child in items:
            if 'children' in child:
                self.compute_length_pos(child['children'])
            child['pos'] = pos
            if 'length' in child and isinstance(child['length'], int):
                length = length + child['length']
                pos = pos + child['length']
        # node['length'] = length
    def gen_insert_domain_params(self, dev):
        _msg = """
            # 指令
            我需要从文档中提取插入域的参数列表,你要帮助我完成插入域参数列表的提取。
@@ -203,23 +310,28 @@
            - 仅输出JSON文本。
            # 例子
            [
            {
                "name": "遥测模式字",
                "id": "TMS215",
                "pos": 0,
                "length": 8,
                "type": "para"
            }
                {
                    "name": "遥测模式字",
                    "id": "TMS215",
                    "pos": 0,
                    "length": 8,
                    "type": "para"
                }
            ]
        """
        def validation(gen_text):
            params = json.loads(gen_text)
            assert isinstance(params, list), '插入域参数列表数据结构最外层必须是数组'
            assert len(params), '插入域参数列表不能为空'
        result = self.call_model(_msg, 'out/'+dev.code+'_插入域参数列表.json', ['这里是文档中抽取的内容'], validation)
        print('插入域参数列表:' + result)
    def gen_vc(self,dev):
        doc_text = self.get_text_with_entity(['插入域'])
        result = self.call_model(_msg, 'out/' + dev.code + '_插入域参数列表.json', doc_text,
                                 validation)
        Log.info('插入域参数列表:' + result)
        return json.loads(result)
    def gen_vc(self, dev):
        _msg = """
            # 指令
            我需要从文档中提取虚拟信道列表,你要帮助我完成虚拟信道列表的提取。
@@ -246,11 +358,13 @@
        def validation(gen_text):
            vcs = json.loads(gen_text)
            assert next(filter(lambda it: re.match('^[0-1]+$', it['VCID']), vcs)), '生成的VCID必须是二进制'
        result = self.call_model(_msg, 'out/'+dev.code+'_虚拟信道.json', ['这里是文档中抽取的内容'], validation)
        print('虚拟信道:' + result)
        doc_text = self.get_text_with_entity(['虚拟信道定义'])
        result = self.call_model(_msg, 'out/' + dev.code + '_虚拟信道.json', doc_text, validation)
        Log.info('虚拟信道:' + result)
        return json.loads(result)
    def gen_pkt_vc(self,dev):
    def gen_pkt_vc(self, dev):
        _msg = """
            # 指令
            我需要从文档中提取遥测源包信息,你要帮助我完成遥测源包信息的提取。
@@ -263,22 +377,25 @@
            - 不要遗漏任何遥测源包。
            # 例子:
            [
            {
                "id": "PMS001",
                "name": "数管数字量快速源包",
                "vcs": ["VC1"],
                "timeTags": ["实时"]
            },
                {
                    "id": "PMS001",
                    "name": "数管数字量快速源包",
                    "vcs": ["VC1"],
                    "timeTags": ["实时"]
                },
            ]
        """
        def validation(gen_text):
            pkts = json.loads(gen_text)
            assert len(pkts), 'VC源包列表不能为空'
        result = self.call_model(_msg, 'out/'+dev.code+'_遥测VC源包.json', ['这里是文档中抽取的内容'], validation)
        print('遥测源包所属虚拟信道:' + result)
        return json.loads(result)
    def gen_pkts(self,dev):
        doc_text = self.get_text_with_entity(['遥测源包下传时机'])
        text = self.call_model(_msg, 'out/' + dev.code + '_遥测源包下传时机.json', doc_text, validation)
        Log.info('遥测源包所属虚拟信道:' + text)
        return json.loads(text)
    def gen_pkts(self, dev):
        _msg = """
            # 指令
            我需要从文档中提取遥测包数据,你要根据文档内容帮我完成遥测包数据的提取。
@@ -304,190 +421,196 @@
                }
            ]
        """
        result = self.call_model(_msg, 'out/'+dev.code+'_源包列表.json', ['这里是文档中抽取的内容'])
        print('遥测源包列表:' + result)
        doc_text = self.get_text_with_entity(['源包列表'])
        result = self.call_model(_msg, 'out/' + dev.code + '_源包列表.json', doc_text)
        Log.info('遥测源包列表:' + result)
        return json.loads(result)
    def gen_pkt_details(self, pkt_name, pkt_id):
        _msg = f"""
            # 指令
            我需要从文档中提取遥测源包的最后一个参数的bit位置和数据域参数个数,你要帮我完成参数bit位置和数据域参数个数的提取。
            # 需求
            输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包的最后一个参数的bit位置和数据域参数个数。
            # 约束
            - 遥测源包的内容在一个表格中定义,表格结束则包内容结束;
            - 数据域中每一行对应一个参数;
            - 不要跨表格提取;
            - 字节位置中字节位置是从1开始的,bit位置是从0开始的;
            - bit位置计算公式为:(N-1)*8+B,其中N是字节数,B是bit数;
            - 仅输出json,不要输出其他任何字符。
            # 例子:
            {"last_par_pos":128, "par_num": 20}
        """
        text = self.call_model(_msg, '', ['这里是文档中抽取的内容'])
        result = json.loads(text)
        last_par_pos = result['last_par_pos']
        par_num = result['par_num']
        cache_file = f'out/数据包-{pkt_name}.json'
        if not os.path.isfile(cache_file):
            # 先问最后一个参数的字节位置
            Log.info(f'遥测源包“{pkt_name}”信息:')
            _msg = f"""
                # 指令
                我需要从文档中提取遥测源包的最后一个参数的bit位置和数据域参数个数,你要帮我完成参数bit位置和数据域参数个数的提取。
                # 需求
                输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包的最后一个参数的bit位置和数据域参数个数。
                # 约束
                - 遥测源包的内容在一个表格中定义,表格结束则包内容结束;
                - 数据域中每一行对应一个参数;
                - 不要跨表格提取;
                - 字节位置中字节位置是从1开始的,bit位置是从0开始的;
                - bit位置计算公式为:(N-1)*8+B,其中N是字节数,B是bit数;
                - 仅输出json,不要输出其他任何字符。
                # 例子:
                {"last_par_pos":128, "par_num": 20}
            """
            doc_text = self.get_text_with_entity([pkt_id])
            text = self.call_model(_msg, '', doc_text)
            result = json.loads(text)
            last_par_pos = result['last_par_pos']
            par_num = result['par_num']
        _msg = f"""
            # 指令
            我需要从文档中提取遥测源包信息列表,你要帮我完成遥测源包信息列表的提取。
            # 需求
            输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包。
            注意:最后一个参数的起始bit偏移位置为{last_par_pos}。
            """ + """
            遥测包字段包括:名称(name)、代号(id)、类型(type)、包头属性列表(headers)、数据域参数列表(datas),类型为 linear;
            包头的属性的字段包括:名称(name)、代号(id)、位置(pos)、定义(content)、长度(length)、类型(type),类型为 para;
            数据域参数字段包括:参数名称(name)、参数代号(id)、位置(pos)、长度(length)、字节顺序(byteOrder),类型为 para;
            包头属性包括:包版本号、包类型、副导头标识、应用过程标识、序列标记、包序列计数、包长、服务、子服务。
            包头属性的长度:包版本号(3)、包类型(1)、副导头标识(1)、应用过程标识(11)、序列标记(2)、包序列计数(14)、包长(16)、服务(8)、子服务(8)。
            表格单元格合并说明:包格中存在单元格合并的情况,如果水平或垂直相邻的单元格内容一样那么这几个内容一样的单元格有可能是一个合并单元格在分析时应该当作合并单元格分析。
            # 约束
            - 代号命名规则:数字、英文字母和下划线组成且以英文字母和下划线开头;
            - 如果没有名称用代号代替,如果没有代号用名称的英文翻译代替,翻译尽量简短;
            - 如果有代号严格依照文档中的代号,文档中的代号如果不符合代号命名规则将特殊字符转换为下划线,例如:Rsv-1转换为Rsv_1;
            - 你需要理解数据包的位置信息,由位置信息得到长度,并且将所有输出单位统一转换为 bits;
            - pos字段:数值类型,从0开始计算,由长度(length)累加得到;
            - 应用过程标识:如果不是十六进制转换为十六进制,转换完成后要验证是否正确,以0x开头,;
            - 包头后面的每一行都对应一个参数,逐行输出参数,不要遗漏任何参数;
            - 类似”保留(Rsv)“的行也要当参数生成;
            - 重复的行也要生成;
            - 注意包内容的范围,不要提取到其他包中的内容,包内容都在同一个表格中;
            - 字节顺序:值为大端“B”,小端“L”,默认为“B”;
            - 输出严格按照文档中的内容生成,不要创造文档中不存在的内容;
            - 仅输出json,不要输出任何其他内容。
            # 例子
            {
            "name": "数管缓变遥测包",
            "id": "PMS003",
            "type": "linear",
            "headers": [
            _msg = f"""
                # 指令
                我需要从文档中提取遥测源包信息列表,你要帮我完成遥测源包信息列表的提取。
                # 需求
                输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包。
                注意:最后一个参数的起始bit偏移位置为{last_par_pos}。
                """ + """
                遥测包字段包括:名称(name)、代号(id)、类型(type)、包头属性列表(headers)、数据域参数列表(datas),类型为 linear;
                包头的属性的字段包括:名称(name)、代号(id)、位置(pos)、定义(content)、长度(length)、类型(type),类型为 para;
                数据域参数字段包括:参数名称(name)、参数代号(id)、位置(pos)、长度(length)、字节顺序(byteOrder),类型为 para;
                包头属性包括:包版本号、包类型、副导头标识、应用过程标识、序列标记、包序列计数、包长、服务、子服务。
                包头属性的长度:包版本号(3)、包类型(1)、副导头标识(1)、应用过程标识(11)、序列标记(2)、包序列计数(14)、包长(16)、服务(8)、子服务(8)。
                表格单元格合并说明:包格中存在单元格合并的情况,如果水平或垂直相邻的单元格内容一样那么这几个内容一样的单元格有可能是一个合并单元格在分析时应该当作合并单元格分析。
                # 约束
                - 代号命名规则:数字、英文字母和下划线组成且以英文字母和下划线开头;
                - 如果没有名称用代号代替,如果没有代号用名称的英文翻译代替,翻译尽量简短;
                - 如果有代号严格依照文档中的代号,文档中的代号如果不符合代号命名规则将特殊字符转换为下划线,例如:Rsv-1转换为Rsv_1;
                - 你需要理解数据包的位置信息,由位置信息得到长度,并且将所有输出单位统一转换为 bits;
                - pos字段:数值类型,从0开始计算,由长度(length)累加得到;
                - 应用过程标识:如果不是十六进制转换为十六进制,转换完成后要验证是否正确,以0x开头,;
                - 包头后面的每一行都对应一个参数,逐行输出参数,不要遗漏任何参数;
                - 类似”保留(Rsv)“的行也要当参数生成;
                - 重复的行也要生成;
                - 注意包内容的范围,不要提取到其他包中的内容,包内容都在同一个表格中;
                - 字节顺序:值为大端“B”,小端“L”,默认为“B”;
                - 输出严格按照文档中的内容生成,不要创造文档中不存在的内容;
                - 仅输出json,不要输出任何其他内容。
                # 例子
                {
                "name": "包标识",
                "id": "packetIdentifier",
                "pos": 0,
                "content": "000",
                "length": 8,
                "type": "para"
                }
            ],
            "datas": [
                {
                "name": "XXX包",
                "id": "XXX",
                "pos": 0,
                "length": 8,
                "byteOrder": ""
                }
            ]
        """
                "name": "数管缓变遥测包",
                "id": "PMS003",
                "type": "linear",
                "headers": [
                    {
                    "name": "包标识",
                    "id": "packetIdentifier",
                    "pos": 0,
                    "content": "000",
                    "length": 8,
                    "type": "para"
                    }
                ],
                "datas": [
                    {
                    "name": "XXX包",
                    "id": "XXX",
                    "pos": 0,
                    "length": 8,
                    "byteOrder": ""
                    }
                ]
            """
        def validation(gen_text):
            _pkt = json.loads(gen_text)
            with open(f'out/tmp/{time.time()}.json', 'w') as f:
                f.write(gen_text)
            assert 'headers' in _pkt, '包结构中必须包含headers字段'
            assert 'datas' in _pkt, '包结构中必须包含datas字段'
            print(f'参数个数:{len(_pkt["datas"])}')
            # assert par_num == len(_pkt['datas']), f'数据域参数个数不对!预计{par_num}个,实际{len(_pkt["datas"])}'
            assert last_par_pos == _pkt['datas'][-1]['pos'], '最后一个参数的字节位置不对!'
            def _validation(gen_text):
                _pkt = json.loads(gen_text)
                with open(f'out/tmp/{time.time()}.json', 'w') as f:
                    f.write(gen_text)
                assert 'headers' in _pkt, '包结构中必须包含headers字段'
                assert 'datas' in _pkt, '包结构中必须包含datas字段'
                Log.info(f'参数个数:{len(_pkt["datas"])}')
                # assert par_num == len(_pkt['datas']), f'数据域参数个数不对!预计{par_num}个,实际{len(_pkt["datas"])}'
                assert last_par_pos == _pkt['datas'][-1]['pos'], '最后一个参数的字节位置不对!'
        result = self.call_model(_msg, f'out/数据包-{pkt_name}.json', [], ['这里是文档中抽取的内容'], validation)
        print(f'数据包“{pkt_name}”信息:'+result)
            result = self.call_model(_msg, f'out/数据包-{pkt_name}.json', doc_text, _validation)
            Log.info(f'数据包“{pkt_name}”信息:' + result)
            pkt = json.loads(result)
        else:
            pkt = json.loads(utils.read_from_file(cache_file))
        pkt_len = 0
        for par in pkt['datas']:
            par['pos'] = pkt_len
            pkt_len += par['length']
        pkt['length'] = pkt_len
        return pkt
    def gen_bus(self):
        _msg = """
            # 指令
            我需要从文档中提取经总线的数据包列表,你要帮助我完成经总线的数据包列表的提取。
            # 需求
            请析文档,列出总线通信包传输约定中描述的所有数据包列表,
            数据包字段包括:id(数据包代号)、name(数据包名称)、apid(16进制字符串)、service(服务子服务)、length(bit长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、
            transSer(传输服务)、note(备注)、rtAddr(所属RT的地址十进制)、rt(所属rt名称)、throughBus(是否经过总线)、burst(是否突发)、transDirect(传输方向)。
            # 约束
            - frameNum:使用文档中的文本不要做任何转换;
            - subAddr:值为“深度”、“平铺”、“数字”或null;
            - 是否经过总线的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线;
            - 传输服务分三种:SetData(置数)、GetData(取数)、DataBlock(数据块传输);
            - 传输方向分”收“和”发“,传输服务如果是”取数“是”收“,如果是”数据块传输“则根据包所在的分系统以及表格的”传输方向“列进行判断,判断对于SMU来说是收还是发;
            - 是否突发:根据表格中的”传输周期“列进行判断,如果填写了类似”突发“的文字表示是突发否则表示不是突发;
            - 不要漏掉任何一个数据包;
            - 数据结构最外层是数组,数组元素为数据包,以JSON格式输出,不要输出JSON以外的任何文本。
            # 例子
            [
                {
                    "id": "PCS005",
                    "name": "总线管理(内部指令)",
                    "apid": "418",
                    "service": "(1, 2)",
                    "length": 1,
                    "interval": 1000,
                    "subAddr": null,
                    "frameNum": "1|2",
                    "transSer": "DataBlock",
                    "note": "",
                    "rtAddr": 28,
                    "rt": "数据接口单元XIU",
                    "throughBus": true,
                    "burst": true,
                    "transDirect": "发"
                }
            ]
        """
        self.bus_pkts = []
        doc_text_list = self.get_texts_with_entity(['分系统源包'])
        for doc_text in doc_text_list:
            _msg = """
                # 指令
                我需要从文档中提取经总线的数据包列表,你要帮助我完成经总线的数据包列表的提取。
                # 需求
                请析文档,列出总线通信包传输约定中描述的所有数据包列表,
                数据包字段包括:id(数据包代号)、name(数据包名称)、apid(16进制字符串)、service(服务子服务)、length(bit长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、
                transSer(传输服务)、note(备注)、rtAddr(所属RT的地址十进制)、rt(所属rt名称)、throughBus(是否经过总线)、burst(是否突发)、transDirect(传输方向)。
                # 约束
                - frameNum:使用文档中的文本不要做任何转换;
                - subAddr:值为“深度”、“平铺”、“数字”或null;
                - 是否经过总线的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线;
                - 传输服务分三种:SetData(置数)、GetData(取数)、DataBlock(数据块传输);
                - 传输方向分”收“和”发“,传输服务如果是”取数“是”收“,如果是”数据块传输“则根据包所在的分系统以及表格的”传输方向“列进行判断,判断对于SMU来说是收还是发;
                - 是否突发:根据表格中的”传输周期“列进行判断,如果填写了类似”突发“的文字表示是突发否则表示不是突发;
                - 不要漏掉任何一个数据包;
                - 数据结构最外层是数组,数组元素为数据包,以JSON格式输出,不要输出JSON以外的任何文本。
                # 例子
                [
                    {
                        "id": "PCS005",
                        "name": "总线管理(内部指令)",
                        "apid": "418",
                        "service": "(1, 2)",
                        "length": 1,
                        "interval": 1000,
                        "subAddr": null,
                        "frameNum": "1|2",
                        "transSer": "DataBlock",
                        "note": "",
                        "rtAddr": 28,
                        "rt": "数据接口单元XIU",
                        "throughBus": true,
                        "burst": true,
                        "transDirect": "发"
                    }
                ]
            """
        def validation(gen_text):
            json.loads(gen_text)
            def validation(gen_text):
                json.loads(gen_text)
        result = self.call_model(_msg, 'out/总线.json', ['这里是文档中抽取的内容'], validation)
        print('总线数据包:' + result)
        pkts = json.loads(result)
        # 筛选经总线的数据包
        pkts = list(filter(lambda it: it['throughBus'], pkts))
        # 筛选有apid的数据包
        pkts = list(filter(lambda it: it['apid'], pkts))
            result = self.call_model(_msg, 'out/总线.json', doc_text, validation)
            Log.info('总线数据包:' + result)
        pkts2 = []
        for pkt in pkts:
            if self.pkt_in_tm_pkts(pkt["name"]):
                pkts2.append(pkt)
        for pkt in pkts2:
            self.gen_pkt_details(pkt['name'], pkt['id'])
            pkts = json.loads(result)
            # 筛选经总线的数据包
            pkts = list(filter(lambda it: it['throughBus'], pkts))
            # 筛选有apid的数据包
            pkts = list(filter(lambda it: it['apid'], pkts))
    def pkt_in_tm_pkts(self, pkt_name):
        _msg = f"""
            # 指令
            我需要从文档中分析判读是否有某个遥测包的字段表描述,你要帮助我判断。
            # 问题
            文档中有遥测包“{pkt_name}”的字段表描述吗?
            注意:遥测包的字段表紧接着遥测包章节标题,如果章节标题后面省略了或者详见xxx则是没有字段表描述。
            # 约束
            - 根据文档内容输出;
            - 遥测包名称必须完全匹配;
            - 输出“无”或“有”,不要输出其他任何内容。
            # 例子
            有
        """
        text = self.call_model(_msg, f'out/pkts/有无数据包-{pkt_name}.txt', ['这里是文档中抽取的内容'])
        print(f'文档中有无“{pkt_name}”的字段描述:'+ text)
        return text == '有'
            # pkts2 = []
            # todo 这一步应该通过数据库筛选,数据库中存储了每个数据包的代号实体
            # for pkt in pkts:
            #     if self.pkt_in_tm_pkts(pkt["name"]):
            #         pkts2.append(pkt)
            for pkt in pkts:
                self.gen_pkt_details(pkt['name'], pkt['id'])
                _pkt = self.gen_pkt_details(pkt['name'], pkt['id'])
                if _pkt:
                    pkt['children'] = []
                    pkt['children'].extend(_pkt['datas'])
                    pkt['length'] = _pkt['length']
            self.bus_pkts.extend(pkts)
    # 遥测-end
    # endregion 遥测-end
    # 遥控-start
    def handle_yk_structured_data(self):
    # region start 遥控
    def handle_tc_structured_data(self):
        # 数据帧格式
        self.gen_tc_transfer_frame_format()
        self.tc_frame = self.gen_tc_transfer_frame_format()
        # 遥控包格式
        self.gen_tc_pkt_format()
        self.tc_pkt_format = self.gen_tc_pkt_format()
        # 遥控包列表
        pkts = self.gen_tc_transfer_pkts()
        for pkt in pkts:
        self.tc_pkts = self.gen_tc_transfer_pkts()
        for pkt in self.tc_pkts:
            # 遥控包数据区内容
            self.gen_tc_pkt_details(pkt)
    def gen_tc_transfer_frame_format(self):
        _msg = '''
            # 指令
@@ -517,8 +640,14 @@
        def validation(gen_text):
            json.loads(gen_text)
        result = self.call_model(_msg, 'out/tc_transfer_frame.json', ['这里是文档中抽取的内容'],validation)
        print('遥控帧格式:' + result)
        doc_text = self.get_text_with_entity(['遥控帧格式'])
        text = self.call_model(_msg, 'out/tc_transfer_frame.json', doc_text, validation)
        result: dict = json.loads(text)
        format_text = utils.read_from_file('tpl/tc_transfer_frame.json')
        format_text = utils.replace_tpl_paras(format_text, result)
        frame = json.loads(format_text)
        Log.info('遥控帧格式:' + format_text)
        return frame
    def gen_tc_pkt_format(self):
        _msg = '''
@@ -551,28 +680,40 @@
        def validation(gen_text):
            json.loads(gen_text)
        result = self.call_model(_msg, 'out/tc_transfer_pkt.json', ['这里是文档中抽取的内容'],validation)
        print('遥控包格式:' + result)
        doc_text = self.get_text_with_entity(['遥控包格式'])
        text = self.call_model(_msg, 'out/tc_transfer_pkt.json', doc_text, validation)
        result = json.loads(text)
        format_text = utils.read_from_file('tpl/tc_pkt_format.json')
        format_text = utils.replace_tpl_paras(format_text, result)
        pkt_format = json.loads(format_text)
        Log.info('遥控包格式:' + format_text)
        return pkt_format
    def gen_tc_transfer_pkts(self):
        _msg = '''
            # 指令
            分析文档列出所有的遥控源包。
            # 输出例子:
            [{
            "name": "xxx",
            "code":"pkt",
            "应用过程标识符":"0xAA",
            "服务类型":"0x1",
            "服务子类型":"0x2"
            }]
        '''
        doc_text_list = self.get_texts_with_entity(['APID分配'])
        pkts = []
        for doc_text in doc_text_list:
            _msg = '''
                # 指令
                分析文档列出所有的遥控源包。
                # 输出例子:
                [{
                "name": "xxx",
                "code":"pkt",
                "应用过程标识符":"0xAA",
                "服务类型":"0x1",
                "服务子类型":"0x2"
                }]
            '''
        def validation(gen_text):
            json.loads(gen_text)
            def validation(gen_text):
                json.loads(gen_text)
        result = self.call_model(_msg, 'out/tc_transfer_pkts.json', ['这里是文档中抽取的内容'],validation)
        print('遥控包列表:' + result)
            text = self.call_model(_msg, 'out/tc_transfer_pkts.json', doc_text, validation)
            Log.info('遥控包列表:' + text)
            pkts.extend(json.loads(text))
        return pkts
    def gen_tc_pkt_details(self, pkt):
        tc_name = pkt['name']
@@ -620,7 +761,8 @@
        def validation(gen_text):
            json.loads(gen_text)
        result = self.call_model(_msg, f'out/遥控指令数据域-{tc_code}-{utils.to_file_name(tc_name)}.json', ['这里是文档中抽取的内容'],validation)
        print('遥控指令数据域:' + result)
        result = self.call_model(_msg, f'out/遥控指令数据域-{tc_code}-{utils.to_file_name(tc_name)}.json',
                                 ['这里是文档中抽取的内容'], validation)
        Log.info('遥控指令数据域:' + result)
    # 遥控-end
    # endregion 遥控-end