lyg
2 天以前 22f370322412074174cde20ecfd14ec03657ab63
db_struct_flow.py
@@ -1,4 +1,7 @@
import asyncio
import math
import os
import subprocess
import time
from datetime import datetime
@@ -8,115 +11,56 @@
from langchain_community.chat_models import ChatOpenAI
from langchain_core.prompts import HumanMessagePromptTemplate, SystemMessagePromptTemplate
import textwrap
import data_templates
from knowledgebase import utils
from knowledgebase.db.db_helper import create_project, create_device, create_data_stream, \
    update_rule_enc, create_extend_info, create_ref_ds_rule_stream, create_ins_format, make_attr
    update_rule_enc, create_extend_info, create_ref_ds_rule_stream, create_ins_format, make_attr, init_db_helper
from knowledgebase.db.data_creator import create_prop_enc, create_enc_pkt, get_data_ty, create_any_pkt
from knowledgebase.db.models import TProject
from knowledgebase.db.models import TProject, init_base_db
from knowledgebase.db.doc_db_helper import doc_dbh
from knowledgebase.llm import llm
# file_map = {
#     # "遥测源包设计报告": "./doc/HY-4A数管分系统遥测源包设计报告 Z 240824 更改3(内部) .docx.md",
#     # "遥测源包设计报告": "./doc/数管数字量快速源包.md",
#     # "遥测源包设计报告": "./doc/数管数字量中速源包.md",
#     # "遥测源包设计报告": "./doc/硬通道设备工作状态数据包.md",
#     # "遥测源包设计报告": "./doc/DIU遥测模块采集的DS量4.md",
#     "遥测源包设计报告": "./doc/DIU遥测模块模拟量.md",
#     "遥测大纲": "./doc/HY-4A卫星遥测大纲 Z 240824 更改3(内部).docx.md",
#     # "总线传输通信帧分配": "./doc/HY-4A卫星1553B总线传输通信帧分配 Z 240824 更改3(内部).docx.md",
#     "总线传输通信帧分配": "./doc/总线.md",
#     "应用软件用户需求": "./doc/HY-4A数管分系统应用软件用户需求(星务管理分册) Z 240831 更改4(内部).docx.md"
# }
# file_map = {
#     "遥测源包设计报告": "./docs/HY-4A数管分系统遥测源包设计报告 Z 240824 更改3(内部) .docx.md",
#     "遥测大纲": "./docs/HY-4A卫星遥测大纲 Z 240824 更改3(内部).docx.md",
#     "总线传输通信帧分配": "./docs/HY-4A卫星1553B总线传输通信帧分配 Z 240824 更改3(内部).docx.md",
#     "应用软件用户需求": "./docs/HY-4A数管分系统应用软件用户需求(星务管理分册) Z 240831 更改4(内部).docx.md"
# }
file_map = {
    "文档合并": "./doc/文档合并.md",
    "遥测源包设计报告": "./doc/XA-5D无人机分系统探测源包设计报告(公开).md",
    "遥测大纲": "./doc/XA-5D无人机探测大纲(公开).md",
    "总线传输通信帧分配": "./doc/XA-5D无人机1314A总线传输通信帧分配(公开).md",
    "指令格式": "./doc/ZL格式(公开).docx.md"
}
# BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1'
# API_KEY = 'sk-15ecf7e273ad4b729c7f7f42b542749e'
# MODEL_NAME = 'qwen2.5-72b-instruct'
BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1'
API_KEY = 'sk-15ecf7e273ad4b729c7f7f42b542749e'
MODEL_NAME = 'qwen2.5-72b-instruct'
# BASE_URL = 'http://10.74.15.164:11434/v1/'
# BASE_URL = 'http://10.74.15.171:11434/v1/'
# API_KEY = 'ollama'
# MODEL_NAME = 'qwen2.5:32b-128k'
# MODEL_NAME = 'qwen2.5:72b-instruct'
# BASE_URL = 'http://chat.com/api'
# API_KEY = 'sk-49457e83f734475cb4cf7066c649d563'
# MODEL_NAME = 'qwen2.5:72b-120k'
# BASE_URL = 'http://10.74.15.171:8000/v1'
# API_KEY = 'EMPTY'
BASE_URL = 'http://10.74.15.171:8000/v1'
API_KEY = 'EMPTY'
# MODEL_NAME = 'QwQ:32b'
# MODEL_NAME = 'vllm-Qwen-72b-4bit'
MODEL_NAME = 'Qwen2.5-72B-Instruct-AWQ'
# MODEL_NAME = 'qwen2.5:72b-instruct'
USE_CACHE = True
assistant_msg = """
# 角色
你是一名资深的软件工程师,擅长进行文档分析和通信协议分析,同时能够解析 markdown 类型的文档。拥有成熟准确的文档阅读与分析能力,能够妥善处理多文档间存在引用关系的复杂情况。
## 技能
### 技能 1:文档分析(包括 markdown 文档)
1. 当用户提供文档时,仔细阅读文档内容,严格按照文档中的描述提取关键信息,不得加入自己的回答或建议。
2. 分析文档的结构、主题和重点内容,同样只依据文档进行表述。
3. 如果文档间存在引用关系,梳理引用脉络,明确各文档之间的关联,且仅呈现文档中体现的内容。
### 技能 2:通信协议分析
1. 接收通信协议相关信息,理解协议的规则和流程,仅依据所给信息进行分析。
## 背景知识
###软件主要功能与运行机制总结如下:
1. 数据采集和处理:
   DIU负责根据卫星的工作状态或模式提供遥测数据,包括模拟量(AN)、总线信号(BL)以及温度(TH)和数字量(DS),并将这些信息打包,通过总线发送给SMU。
   SMU则收集硬通道上的遥测参数,并通过总线接收DIU采集的信息。
2. 多路复用与数据传输:
   遥测源包被组织成E-PDU,进一步复用为M-PDU,并填充到VCDU中构成遥测帧。
   利用CCSDS AOS CADU格式进行遥测数据的多路复用和传输。
3. 虚拟信道(VC)调度机制:
   通过常规遥测VC、突发数据VC、延时遥测VC、记录数据VC以及回放VC实现不同类型的数据下传。
4. 遥控指令处理:
   上行遥控包括直接指令和间接指令,需经过格式验证后转发给相应单机执行。
   遥控帧通过特定的虚拟信道(VC)进行传输。
这些知识需要你记住,再后续的处理中可以帮助你理解要处理的数据。
## 目标导向
1. 通过对文档和通信协议的分析,为用户提供清晰、准确的数据结构,帮助用户更好地理解和使用相关信息。
## 规则
1. 每一个型号都会有一套文档,需准确判断是否为同一个型号的文档后再进行整体分析,每次只分析同一个型号的文档。
2. 大多数文档结构为:型号下包含设备,设备下包含数据流,数据流下包含数据帧,数据帧中有一块是包域,包域中会挂载各种类型的数据包。
3. 文档都是对于数据传输协议的描述,在数据流、数据帧、数据包等传输实体中都描述了各个字段的分布、各个字段的大小和位置等信息,且大小单位不统一,需理解这些单位,并将所有输出单位统一为 bits,长度字段使用 length 表示,位置字段使用 pos 表示,如果为变长使用“"变长"”表示。
4. 如果有层级,使用树形 JSON 输出,如果有子节点,子节点 key 使用children;需保证一次输出的数据结构统一,并且判断每个层级是什么类型,输出类型字段(type),类型字段的 key 使用 type,类型包括:型号(project)、设备(dev)、封装包(enc)、线性包(linear)、参数(para),封装包子级有数据包,所以type为enc,线性包子级只有参数,所以type为linear;每个层级都包含偏移位置(pos),每个层级的偏移位置从0开始。
5. 名称相关的字段的 key 使用name;代号、编号或者唯一标识相关的字段的key使用id,id由数字、英文字母、下划线组成且以英文字母开头,长度尽量简短;序号相关的字段的key使用number;偏移位置相关字段的key使用pos;其他没有举例的字段使用精简的翻译作为字段的key;每个结构必须包含name和id。
6. 遥测帧为CADU,其中包含同步头和VCDU,按照习惯需要使用VCDU层级嵌套传输帧主导头、传输帧插入域、传输帧数据域、传输帧尾的结构。
7. 数据包字段包括:name、id、type、pos、length、children;参数字段包括:name、id、pos、type、length;必须包含pos和length字段。
8. 常用id参考:遥测(TM)、遥控(TC)、总线(BUS)、版本号(Ver)、应用过程标识(APID)。
9. 注意:一定要记得morkdown文档中会将一些特殊字符进行转义,以此来保证文档的正确性,这些转义符号(也就是反斜杠‘\’)不需要在结果中输出。
10. 以 JSON 格式组织输出内容,确保数据结构的完整性和可读性,注意:生成的JSON语法格式必须符合json规范,避免出现错误。
## 限制:
- 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。
- 不输出任何注释等描述性信息。
你是一名资深的软件工程师。
"""
#
# ## 技能
# ### 技能 1:通信协议分析
# 1. 接收通信协议相关信息,理解协议的规则和流程,仅依据所给信息进行分析。
#
# ## 目标导向
# 1. 通过对文档和通信协议的分析,为用户提供清晰、准确的数据结构,帮助用户更好地理解和使用相关信息。
#
# ## 限制:
# - 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。
# - 不输出任何注释等描述性信息。
tc_system_msg = """
# 角色
你是一个资深软件工程师。
# 约束
- 输出内容必须根据文档和问题回答,不要创造其他内容;
- 输出内容必须是,JSON格式,不要输出其他文本。
- 输出内容根据文档内容输出。
"""
g_completion = None
@@ -151,7 +95,7 @@
        return text
def rt_pkt_map_gen(pkt, trans_ser, rt_pkt_map, pkt_id, vals):
def rt_pkt_map_gen(pkt, trans_ser, rt_pkt_map, pkt_id, vals, pkts: list):
    # 逻辑封装包,数据块传输的只有一个,取数的根据RT地址、子地址和帧号划分
    frame_num = pkt['frameNum']
    if trans_ser == '数据块传输':
@@ -178,7 +122,9 @@
    interval = f'{pkt["interval"]}'.replace(".", "_")
    if trans_ser == '取数':
        _key = f'RT{pkt["rtAddr"]}Frame{frame.replace("|", "_")}_Per{interval}'
        # 取数忽略周期
        # _key = f'RT{pkt["rtAddr"]}Frame{frame.replace("|", "_")}_Per{interval}'
        _key = f'RT{pkt["rtAddr"]}Frame{frame.replace("|", "_")}'
    else:
        # 数据块传输
        if pkt['burst']:
@@ -326,6 +272,7 @@
class DbStructFlow:
    json_path = ''
    # 工程
    proj: TProject = None
    # 遥测源包列表,仅包名称、包id和hasParams
@@ -333,27 +280,65 @@
    # vc源包
    vc_pkts = []
    def __init__(self):
    def __init__(self, project_path: str):
        self.client = OpenAI(
            api_key=API_KEY,
            base_url=BASE_URL,
            # api_key="ollama",
            # base_url="http://192.168.1.48:11434/v1/",
        )
        self.json_path = f'{project_path}/json'
        self.db_dir = f'{project_path}/db'
        os.makedirs(f"{self.json_path}", exist_ok=True)
        os.makedirs(f"{self.json_path}/pkts", exist_ok=True)
        os.makedirs(f"{self.db_dir}", exist_ok=True)
        init_base_db(f'{self.db_dir}/db.db')
        init_db_helper()
        # self.llm = ChatOpenAI(model=MODEL_NAME, temperature=0, api_key=API_KEY, base_url=BASE_URL)
    def run(self):
    async def run(self):
        # 生成型号结构
        # 生成设备结构
        # 生成数据流结构 CADU
        # 生成VCDU结构
        # 生成遥测数据包结构
        self.proj = self.gen_project()
        tasks = []
        tasks.append(self.gen_device(self.proj))
        # devs = self.gen_device(self.proj)
        tasks.append(self.gen_tc())
        self.gen_tc()
        # 测试位置计算
        # print(self.handle_pos("Byte1_B0~Byte1_B0"))
        # print(self.handle_pos("Byte0_B0~Byte0_B7"))
        # print(self.handle_pos("Byte9_B0~Byte9_B7"))
        await asyncio.gather(*tasks)
        return ''
    def handle_pos(self, srt):
        pos_data = {
            "start": 0,
            "end": 0
        }
        pos = srt.split("~")
        for index, p in enumerate(pos):
            byte = p.split('_')
            for b in byte:
                if b.find("Byte") > -1:
                    value = b.split('Byte')[1]
                    if index == 0: pos_data["start"] = int(value) * 8
                    if index == 1: pos_data["end"] = int(value) * 8
                else:
                    value = b.split('B')[1]
                    if index == 0: pos_data["start"] += int(value)
                    if index == 1: pos_data["end"] += int(value)
        return {
            "pos": pos_data["start"],
            "length": pos_data["end"] - pos_data["start"] + 1,
        }
    def get_text_with_entity(self, entity_names: list[str]) -> str:
        """
@@ -363,6 +348,14 @@
        """
        return doc_dbh.get_text_with_entities(entity_names)
    def get_text_list_with_entity(self, entity_names: list[str]) -> str:
        """
        根据实体词获取文档文本列表
        :param entity_names: 实体词列表
        :return: [str] - 文本列表
        """
        return doc_dbh.get_texts_with_entities(entity_names)
    def _gen(self, msgs, msg, doc_text):
        # if files is None:
        #     files = [file_map['文档合并']]
@@ -370,34 +363,42 @@
        # doc_text = ''
        # for file in files:
        #     doc_text += '\n' + read_from_file(file)
        # 去除多余的缩进
        msg = textwrap.dedent(msg).strip()
        if len(messages) == 0:
            # 如果是第一次提问加入system消息
            messages.append({'role': 'system', 'content': assistant_msg})
            messages.append({'role': 'user', 'content': "以下是文档内容:\n" + doc_text})
        messages.append({'role': 'user', 'content': msg})
        completion = self.client.chat.completions.create(
            model=MODEL_NAME,
            messages=messages,
            stream=True,
            temperature=0.6,
            # top_p=0,
            timeout=30 * 60000,
            max_completion_tokens=32000,
            seed=0
            # stream_options={"include_usage": True}
        )
        g_completion = completion
        text = ''
        for chunk in completion:
            if chunk.choices[0].delta.content is not None:
                text += chunk.choices[0].delta.content
                print(chunk.choices[0].delta.content, end="")
        print("")
        g_completion = None
        for ai_msg in llm.stream(messages):
            text += ai_msg.content
            print(ai_msg.content, end='')
        print('')
        # completion = self.client.chat.completions.create(
        #     model=MODEL_NAME,
        #     messages=messages,
        #     stream=True,
        #     temperature=0,
        #     # top_p=0,
        #     timeout=30 * 60000,
        #     max_completion_tokens=32000,
        #     seed=0
        #     # stream_options={"include_usage": True}
        # )
        # g_completion = completion
        # text = ''
        # for chunk in completion:
        #     if chunk.choices[0].delta.content is not None:
        #         text += chunk.choices[0].delta.content
        #         print(chunk.choices[0].delta.content, end="")
        # print("")
        # g_completion = None
        return text
    def generate_text(self, msg, cache_file, msgs=None, doc_text=None, validation=None, try_cnt=5, json_text=False):
    def generate_text(self, msg, cache_file, msgs=None, doc_text="", validation=None, try_cnt=5, json_text=False):
        if msgs is None:
            msgs = []
        if USE_CACHE and os.path.isfile(cache_file):
@@ -421,7 +422,7 @@
            print(f'耗时:{time.time() - s}')
        return text
    def generate_text_json(self, msg, cache_file, msgs=None, doc_text=None, validation=None, try_cnt=5):
    def generate_text_json(self, msg, cache_file, msgs=None, doc_text="", validation=None, try_cnt=5):
        return self.generate_text(msg, cache_file, msgs, doc_text, validation, try_cnt, True)
    def generate_tc_text(self, msg, cache_file, msgs=None, doc_text=None, validation=None, try_cnt=5):
@@ -432,19 +433,19 @@
    def gen_project(self):
        _msg = """
            根据文档内容输出分系统信息,分系统字段包括:名称和型号代号。仅输出分系统这一级。如果型号代号中有符号也要输出,保证输出完整。
            根据文档内容输出卫星的型号信息,输出字段包括:卫星的型号名称和卫星的型号代号。注意:如果没有单独描述型号名称或者型号代号,那么型号名称和型号代号是相同的,并且只输出一个层级。如果型号代号中有符号也要输出,保证输出完整。
            例如:{"name":"xxx","id":"xxx"}
        """
        print('型号信息:')
        doc_text = self.get_text_with_entity(['系统概述'])
        text = self.generate_text_json(_msg, 'out/型号信息.json', doc_text=doc_text)
        text = self.generate_text_json(_msg, f'{self.json_path}/型号信息.json', doc_text=doc_text)
        proj_dict = json.loads(text)
        code = proj_dict['id']
        name = proj_dict['name']
        proj = create_project(code, name, code, name, "", datetime.now())
        return proj
    def gen_device(self, proj):
    async def gen_device(self, proj):
        """
        设备列表生成规则:
        1.如文档中有1553协议描述,加入1553设备
@@ -496,7 +497,7 @@
]
"""
        print('设备列表:')
        cache_file = 'out/设备列表.json'
        cache_file = f'{self.json_path}/设备列表.json'
        def validation(gen_text):
            _devs = json.loads(gen_text)
@@ -509,18 +510,20 @@
        # 类SMU设备,包含遥测和遥控功能,名称结尾为“管理单元”
        like_smu_devs = list(filter(lambda it: it['hasTcTm'] and it['name'].endswith('管理单元'), devs))
        tasks = []
        for dev in like_smu_devs:
            dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK)
            devices.append(dev)
            # 创建数据流
            ds_tmfl, rule_stream, _ = create_data_stream(proj_pk, dev.C_DEV_PK, 'AOS遥测', 'TMF1', 'TMFL', '1', 'TMF1',
                                                         '001')
            self.gen_tm_frame(proj_pk, rule_stream.C_RULE_PK, ds_tmfl, rule_stream.C_PATH)
            task = self.gen_tm_frame(proj_pk, rule_stream.C_RULE_PK, ds_tmfl, rule_stream.C_PATH)
            tasks.append(task)
            # ds_tcfl, rule_stream, _ = create_data_stream(proj_pk, dev.C_DEV_PK, '遥控指令', 'TCFL', 'TCFL', '0', 'TCFL',
            #                                              '006')
        hasBus = any(d['hasBus'] for d in devs)
        if hasBus:
        has_bus = any(d['hasBus'] for d in devs)
        if has_bus:
            # 总线设备
            dev = create_device("1553", "1553总线", '1', 'StandardProCommunicationDev', proj_pk)
            create_extend_info(proj_pk, "BusType", "总线类型", "ECSS_Standard", dev.C_DEV_PK)
@@ -529,11 +532,15 @@
            ds_u153, rs_u153, rule_enc = create_data_stream(proj_pk, dev.C_DEV_PK, '上行总线数据', 'U15E', 'B153',
                                                            '0', '1553', '001')
            # 创建总线结构
            self.gen_bus(proj_pk, rule_enc, '1553', ds_u153, rs_u153.C_PATH, dev.C_DEV_NAME)
            task = self.gen_bus(proj_pk, rule_enc, '1553', ds_u153, rs_u153.C_PATH, dev.C_DEV_NAME)
            tasks.append(task)
            await asyncio.gather(*tasks)
            ds_d153, rule_stream, rule_enc = create_data_stream(proj_pk, dev.C_DEV_PK, '下行总线数据', 'D15E', 'B153',
                                                                '1', '1553', '001', rs_u153.C_RULE_PK)
            create_ref_ds_rule_stream(proj_pk, rule_stream.C_STREAM_PK, rule_stream.C_STREAM_ID,
                                      rule_stream.C_STREAM_NAME, rule_stream.C_STREAM_DIR, rs_u153.C_STREAM_PK)
        else:
            await asyncio.gather(*tasks)
        # 类RTU设备,包含温度量和模拟量功能,名称结尾为“接口单元”
        # like_rtu_devs = list(filter(lambda it: it['hasTemperatureAnalog'] and it['name'].endswith('接口单元'), devs))
        # for dev in like_rtu_devs:
@@ -550,28 +557,19 @@
    def gen_insert_domain_params(self):
        _msg = """
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中提取插入域的参数列表,你要帮助我完成插入域参数列表的提取。
#需求
分析文档,输出插入域的参数列表,将所有参数全部输出。
参数信息字段包括:name(参数名称)、id(参数代号)、pos(参数起始bit位置)、length(参数bit长度)、type(类型:para)。
注意:
参数信息字段包括:name(参数名称)、id(参数代号)、pos(参数位置)、type(类型:para)。
# 要求
1个字节的长度为8位,使用B0-B7来表示,请精确计算参数长度。
文档中位置描述信息可能存在跨字节的情况,例如:"Byte1_B6~Byte2_B0":表示从第1个字节的第7位到第2个字节的第1位,长度是3;"Byte27_B7~Byte28_B0":表示从第27个字节的第8位到第28个字节的第1位,长度是2;"Byte38~Byte74":表示从第38个字节到第74个字节,中间有37个字节,长度是298。
#约束
- 不要遗漏任何参数;
- 如果有代号严格依照文档中的代号,文档中的代号如果不符合代号命名规则将特殊字符转换为下划线,例如:Rsv-1转换为Rsv_1;
- 数据结构最外层为数组,数组元素为参数信息对象;
- 仅输出JSON文本。
#例子
位置信息转换为通用格式"Byte1_B6~Byte2_B0"进行输出,如果缺少内容要进行补全,例如:"Byte1_B0~B2" 转换为 "Byte1_B0~Byte1_B2"。例如:"Byte1~Byte2" 转换为 "Byte1_B0~Byte2_B7"。例如:"Byte1_B5" 转换为 "Byte1_B5~Byte1_B5"。
# 输出示例
[
  {
    "name": "遥测模式字",
    "id": "TMS215",
    "pos": 0,
    "length": 8,
    "pos": Byte0_B0~Byte0_B7,
    "type": "para"
  }
]
@@ -584,18 +582,50 @@
            assert len(params), '插入域参数列表不能为空'
        doc_text = self.get_text_with_entity(['插入域'])
        text = self.generate_text_json(_msg, './out/插入域参数列表.json', doc_text=doc_text, validation=validation)
        return json.loads(text)
        text = self.generate_text_json(_msg, f'{self.json_path}/插入域参数列表.json', doc_text=doc_text, validation=validation)
        json_list = json.loads(text)
        for j in json_list:
            if j['pos'] is not None:
                pos_data = self.handle_pos(j['pos'])
                j['pos'] = pos_data['pos']
                j['length'] = pos_data['length']
        return json_list
    def gen_tm_frame_data(self):
        _msg = """
        """
        files = [file_map['遥测大纲']]
    async def get_pkt_details(self, _pkt, vc):
        _pkt = await self.gen_pkt_details(_pkt['name'], _pkt['id'])
        epdu = next(filter(lambda it: it['name'] == '数据域', vc['children']), None)
        if epdu and _pkt:
            _pkt['children'] = _pkt['datas']
            # todo 当数据包获取到东西但不是参数时,获取到的包结构有问题,需要过滤
            _pkt['length'] = 0
            _pkt['pos'] = 0
            if len(_pkt['children']) > 0:
                _last_par = _pkt['children'][len(_pkt['children']) - 1]
                _pkt['length'] = (_last_par['pos'] + _last_par['length'])
            if 'children' not in epdu:
                epdu['children'] = []
            # 添加解析规则后缀防止重复
            _pkt['id'] = _pkt['id'] + '_' + vc['VCID']
            # 给包名加代号前缀
            if not _pkt['name'].startswith(_pkt['id']):
                _pkt['name'] = _pkt['id'] + '_' + _pkt['name']
            epdu['children'].append(_pkt)
            apid_node = next(filter(lambda it: it['name'].__contains__('应用过程'), _pkt['headers']), None)
            ser_node = next(filter(lambda it: it['name'] == '服务', _pkt['headers']), None)
            sub_ser_node = next(filter(lambda it: it['name'] == '子服务', _pkt['headers']), None)
            apid = ''
            service = ''
            sub_service = ''
            if apid_node and apid_node['content']:
                apid = apid_node['content']
            if ser_node and ser_node['content']:
                service = f"{int(ser_node['content'], 16)}"
            if sub_ser_node and sub_ser_node['content']:
                sub_service = f"{int(sub_ser_node['content'], 16)}"
            _pkt['vals'] = \
                f"{apid}/{service}/{sub_service}/"
        def validation(gen_text):
            pass
    def gen_tm_frame(self, proj_pk, rule_pk, ds, name_path):
    async def gen_tm_frame(self, proj_pk, rule_pk, ds, name_path):
        # 插入域参数列表
        insert_domain = self.gen_insert_domain_params()
@@ -621,43 +651,24 @@
            'insertDomain': insert_domain,
        }
        cadu = data_templates.get_tm_frame(tm_data)
        # VC源包
        self.vc_pkts = self.gen_pkt_vc()
        # 遥测源包设计中的源包列表
        self.tm_pkts = self.gen_pkts()
        self.vc_pkts = await self.gen_pkt_vc()  # ,self.tm_pkts = self.gen_pkts()
        # 处理VC下面的遥测包数据
        tasks = []
        for vc in vcs:
            # 此VC下的遥测包过滤
            _vc_pkts = filter(lambda it: it['vcs'].__contains__(vc['id']), self.vc_pkts)
            _vc_pkts = list(filter(lambda it: vc['id'] in it['vcs'], self.vc_pkts))
            for _pkt in _vc_pkts:
                # 判断遥测包是否有详细定义
                # if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], self.tm_pkts), None):
                #     continue
                # 获取包详情
                _pkt = self.gen_pkt_details(_pkt['name'], _pkt['id'])
                epdu = next(filter(lambda it: it['name'] == '数据域', vc['children']), None)
                if epdu and _pkt:
                    _pkt['children'] = _pkt['datas']
                    # todo 当数据包获取到东西但不是参数时,获取到的包结构有问题,需要过滤
                    if len(_pkt['children']) > 0:
                        _last_par = _pkt['children'][len(_pkt['children']) - 1]
                        _pkt['length'] = (_last_par['pos'] + _last_par['length'])
                        _pkt['pos'] = 0
                        if 'children' not in epdu:
                            epdu['children'] = []
                        # 添加解析规则后缀防止重复
                        _pkt['id'] = _pkt['id'] + '_' + vc['VCID']
                        # 给包名加代号前缀
                        if not _pkt['name'].startswith(_pkt['id']):
                            _pkt['name'] = _pkt['id'] + '_' + _pkt['name']
                        epdu['children'].append(_pkt)
                        apid_node = next(filter(lambda it: it['name'].__contains__('应用过程'), _pkt['headers']), None)
                        ser_node = next(filter(lambda it: it['name'] == '服务', _pkt['headers']), None)
                        sub_ser_node = next(filter(lambda it: it['name'] == '子服务', _pkt['headers']), None)
                        _pkt['vals'] = \
                            f"{apid_node['content']}/{int(ser_node['content'], 16)}/{int(sub_ser_node['content'], 16)}/"
                ret = self.get_pkt_details(_pkt, vc)
                tasks.append(ret)
        if len(tasks):
            await asyncio.gather(*tasks)
        # 重新计数起始偏移
        self.compute_length_pos(cadu['children'])
@@ -731,139 +742,200 @@
        print('虚拟信道:')
        doc_text = self.get_text_with_entity(['虚拟信道定义'])
        text = self.generate_text_json(_msg, "out/虚拟信道.json", doc_text=doc_text, validation=validation)
        text = self.generate_text_json(_msg, f"{self.json_path}/虚拟信道.json", doc_text=doc_text,
                                       validation=validation)
        vcs = json.loads(text)
        return vcs
    def gen_dev_pkts(self):
        _msg = """
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中提取设备以及设备下面的遥测包信息,你要帮助我完成提取。
#需求
输出文档中遥测源包类型定义描述的设备以及设备下面的遥测包。
#约束
- 数据结构:数组 > 设备 > 遥测包列表(pkts);
- 设备字段包括:名称(name)、代号(id);
- 源包字段包括:名称(name)、代号(id);
- 仅输出JSON文本。
#例子
"""
        print('设备遥测源包信息:')
        files = [file_map["遥测源包设计报告"]]
        text = self.generate_text_json(_msg, 'out/设备数据包.json', [], files)
        dev_pkts = json.loads(text)
        return dev_pkts
    def gen_pkt_details(self, pkt_name, pkt_id):
        cache_file = f'out/数据包-{pkt_name}.json'
        # _msg = f"""
        #     #角色
        #     你是一名资深的软件工程师。
        #     #指令
        #     我需要从文档中提取遥测源包的最后一个参数的bit位置和数据域参数个数,你要帮我完成参数bit位置和数据域参数个数的提取。
        #     #需求
        #     输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包的最后一个参数的bit位置和数据域参数个数。
        #     """ + """
        #     #约束
        #     - 遥测源包的内容在一个表格中定义,表格结束则包内容结束;
        #     - 数据域中每一行对应一个参数;
        #     - 不要跨表格提取;
        #     - 字节位置中字节位置是从1开始的,bit位置是从0开始的;
        #     - bit位置计算公式为:(N-1)*8+B,其中N是字节数,B是bit数;
        #     - 仅输出json,不要输出其他任何字符。
        #     #例子:
        #     {"last_par_pos":128, "par_num": 20}
        #     """
        # text = self.generate_text_json(_msg, '', doc_text=doc_text)
        # result = json.loads(text)
        # last_par_pos = result['last_par_pos']
        # par_num = result['par_num']
        _msg = f"""
            #角色
            你是一名资深的软件工程师。
            #指令
            我需要从文档中提取遥测源包信息列表,你要帮我完成遥测源包信息列表的提取。
            #需求
            输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包。
            """ + """
            遥测包字段包括:名称(name)、代号(id)、类型(type)、包头属性列表(headers)、数据域参数列表(datas),类型为 linear;
            包头的属性的字段包括:名称(name)、代号(id)、位置(pos)、定义(content)、长度(length)、类型(type),类型为 para;
            数据域参数字段包括:参数名称(name)、参数代号(id)、位置(pos)、长度(length)、字节顺序(byteOrder),类型为 para;
            包头属性包括:包版本号、包类型、副导头标识、应用过程标识、序列标记、包序列计数、包长、服务、子服务。
            包头属性的长度:包版本号(3)、包类型(1)、副导头标识(1)、应用过程标识(11)、序列标记(2)、包序列计数(14)、包长(16)、服务(8)、子服务(8)。
            表格单元格合并说明:包格中存在单元格合并的情况,如果水平或垂直相邻的单元格内容一样那么这几个内容一样的单元格有可能是一个合并单元格在分析时应该当作合并单元格分析。
            #约束
            - 代号命名规则:数字、英文字母和下划线组成且以英文字母和下划线开头;
            - 如果没有名称用代号代替,如果没有代号用名称的英文翻译代替,翻译尽量简短;
            - 如果有代号严格依照文档中的代号,文档中的代号如果不符合代号命名规则将特殊字符转换为下划线,例如:Rsv-1转换为Rsv_1;
            - 你需要理解数据包的位置信息,由位置信息得到长度,并且将所有输出单位统一转换为 bits;
            - pos字段:数值类型,从0开始计算,由长度(length)累加得到;
            - 应用过程标识:应用过程标识的定义如果不是十六进制转换为十六进制,转换完成后要验证是否正确,以0x开头;
            - 包头后面的每一行都对应一个参数,逐行输出参数,不要遗漏任何参数;
            - 类似”保留(Rsv)“的行也要当参数生成;
            - 重复的行也要生成;
            - 注意包内容的范围,不要提取到其他包中的内容,包内容都在同一个表格中;
            - 字节顺序:值为大端“B”,小端“L”,默认为“B”;
            - 输出严格按照文档中的内容生成,不要创造文档中不存在的内容;
            - 仅输出json,不要输出任何其他内容。
            #例子
            {
            "name": "数管缓变遥测包",
            "id": "PMS003",
            "type": "linear",
            "headers": [
                {
                "name": "包标识",
                "id": "packetIdentifier",
                "pos": 0,
                "content": "000",
                "length": 8,
                "type": "para"
                }
            ],
            "datas": [
                {
                "name": "XXX包",
                "id": "XXX",
                "pos": 0,
                "length": 8,
                "byteOrder": ""
                }
            ]
            """
        print(f'遥测源包“{pkt_name}”信息:')
        def validation(gen_text):
            _pkt = json.loads(gen_text)
            with open(f'out/tmp/{time.time()}.json', 'w') as f:
                f.write(gen_text)
            assert 'headers' in _pkt, '包结构中必须包含headers字段'
            assert 'datas' in _pkt, '包结构中必须包含datas字段'
            # assert par_num == len(_pkt['datas']), f'数据域参数个数不对!预计{par_num}个,实际{len(_pkt["datas"])}'
            # assert last_par_pos == _pkt['datas'][-1]['pos'], '最后一个参数的字节位置不对!'
    async def gen_pkt_details(self, pkt_name, pkt_id):
        cache_file = f"{self.json_path}/数据包-{utils.to_file_name(pkt_name)}.json"
        doc_text = self.get_text_with_entity([pkt_id])
        pkt = {
            "name": pkt_name,
            "id": pkt_id,
            "type": "linear",
            "headers": [],
            "datas": [],
        }
        if doc_text == '':
            return None
        text = self.generate_text_json(_msg, cache_file, [], doc_text, validation)
        pkt = json.loads(text)
            return pkt
        print(f'遥测源包“{pkt_name}”信息:')
        pkt_len = 0
        for par in pkt['datas']:
            par['pos'] = pkt_len
            pkt_len += par['length']
        pkt['length'] = pkt_len
        # 1. 获取包头和参数列表
        # 2. 遍历包头和参数列表,获取bit位置和长度,规范代号并生成,生成byteOrder
        async def get_header_params(_pkt_name, _doc_text: str):
            _msg = ("""
                    # 需求
                    提取文档中描述的遥测包包头信息。
                    包头信息包括:包版本号(Ver)、包类型(Type)、副导头标识(Subheader)、应用过程标识(apid)、序列标记(SequenceFlag)、包序列计数(SequenceCount)、包长(PacketLength)、服务(Service)、子服务(SubService)信息。
                    服务、子服务:一般在表格中的包头区域提取,如果表格中没有包头区域只有数据域则在标题中提取,例如:“在轨维护遥测包(APID=0x384) (3,255)”其中服务是3子服务是255;
                    表格单元格合并说明:包格中存在单元格合并的情况,如果水平或垂直相邻的单元格内容一样那么这几个内容一样的单元格有可能是一个合并单元格在分析时应该当作合并单元格分析;
                    输出json,不要有注释。
                    # 输出例子
                    ```json
                    {
                        "Ver": "000",
                        "Type": "0",
                        "Subheader": "1",
                        "apid": "0",
                        "SequenceFlag": "11",
                        "SequenceCount": "00000000000000",
                        "PacketLength": "1",
                        "Service": "03",
                        "SubService": "FF"
                    }
                    ```""")
            # 截取前70行
            _doc_text = '\n'.join(_doc_text.splitlines()[0:100])
            tpl = os.path.dirname(__file__) + "/tpl/tm_pkt_headers_yg.json"
            tpl_text = utils.read_from_file(tpl)
            _cache_file = f"{self.json_path}/数据包-{utils.to_file_name(pkt_name)}-包头参数.json"
            _text = await asyncio.to_thread(self.generate_text_json, _msg, _cache_file, [], _doc_text, None)
            result = json.loads(_text)
            if re.match(r'^(0x)?[01]{11}$', result['apid']):
                result['apid'] = hex(int(re.sub('0x', '', result['apid']), 2))
            for k in result:
                tpl_text = tpl_text.replace("{{" + k + "}}", result[k])
            return json.loads(tpl_text)
        async def get_data_area_params(_pkt_name, _doc_text: str):
            _msg = ("""
                        # 指令
                        我需要从文档中提取遥测源包的参数信息列表,你要帮我完成遥测源包的参数信息的提取。
                        # 需求
                        提取文档中描述的遥测包数据域中的所有参数,以及参数的位置、名称、代号信息,输出的信息要与文档中的文本要一致,不要遗漏任何参数。
                        如果文档中没有参数表则输出空数组。
                        严格按照输出示例中的格式输出,仅输出json。
                        # 要求
                        1个字节的长度为8位,使用B0-B7来表示。
                        所有位置信息需要转换为要求格式"Byte1_B6~Byte2_B0"进行输出,如果与要求格式不同的要进行补全或转换,例如:"Byte1_B0~B2" 转换为 "Byte1_B0~Byte1_B2"。例如:"Byte1~Byte2" 转换为 "Byte1_B0~Byte2_B7"。例如:"Byte1_B5" 转换为 "Byte1_B5~Byte1_B5"。
                        # 输出示例
                        ```json
                        [
                            {
                                "posText": "Byte1_B6~Byte2_B0",
                                "name": "xxx",
                                "id": "xxxxxx"
                            }
                        ]
                        ```
                        # 没有参数时的输出示例
                        ```json
                        []
                        ```""")
            _cache_file = f"{self.json_path}/数据包-{utils.to_file_name(pkt_name)}-参数列表.json"
            if utils.file_exists(_cache_file):
                return json.loads(utils.read_from_file(_cache_file))
            title_line = _doc_text.splitlines()[0]
            tables = re.findall(r"```json(.*)```", _doc_text, re.DOTALL)
            if tables:
                table_text = tables[0]
                table_blocks = []
                if len(table_text)>50000:
                    table = json.loads(table_text)
                    header:list = table[0:20]
                    for i in range(math.ceil((len(table)-20)/200)):
                        body = table[20 + i * 200:20 + (i + 1) * 200]
                        block = []
                        block.extend(header)
                        block.extend(body)
                        table_blocks.append(f"{title_line}\n```json\n{json.dumps(block,indent=2,ensure_ascii=False)}\n```")
                else:
                    table_blocks.append(table_text)
                param_list = []
                block_idx = 0
                for tb_block in table_blocks:
                    _block_cache_file = f"{self.json_path}/pkts/数据包-{utils.to_file_name(pkt_name)}-参数列表-{block_idx}.json"
                    block_idx += 1
                    text = await asyncio.to_thread(self.generate_text_json, _msg, _block_cache_file, [], tb_block, None)
                    json_list = json.loads(text)
                    for par in json_list:
                        if not re.match('^Byte\d+_B[0-7]~Byte\d+_B[0-7]$', par['posText']):
                            par['posText'] = get_single_pos(par['posText'])
                        if not any(filter(lambda p: p['posText']==par['posText'], param_list)):
                            param_list.append(par)
                for par in param_list:
                    if par['posText'] is not None:
                        par['id'] = re.sub('[^_a-zA-Z0-9]', '_', par['id'])
                        pos_data = self.handle_pos(par['posText'])
                        par['pos'] = pos_data['pos']
                        par['length'] = pos_data['length']
                save_to_file(json.dumps(param_list, ensure_ascii=False, indent=2), _cache_file)
                return param_list
            else:
                return []
        # 单独处理未正确获取的位置信息
        def get_single_pos(txt):
            _msg = f"""
                1个字节的长度为8位,使用B0-B7来表示。
                将“{txt}”转换为要求格式"Byte1_B6~Byte2_B0"进行输出,如果与要求格式不同的要进行补全或转换,例如:"Byte1_B0~B2" 转换为 "Byte1_B0~Byte1_B2"。例如:"Byte1~Byte2" 转换为 "Byte1_B0~Byte2_B7"。例如:"Byte1_B5" 转换为 "Byte1_B5~Byte1_B5"。
                输出示例:Byte1_B6~Byte2_B0
                仅输出结果,不输出其他文字
            """
            def validation(return_txt):
                assert re.match('^Byte\d+_B[0-7]~Byte\d+_B[0-7]$', return_txt), '格式不正确'
            text = self.generate_text_json(_msg, "", doc_text="", validation=validation)
            return text
        params = []
        header_params, data_area_params = (
            await asyncio.gather(get_header_params(pkt_name, doc_text),
                                 get_data_area_params(pkt_name, doc_text)))
        params.extend(data_area_params)
        pkt['headers'] = header_params
        pkt['datas'] = data_area_params
        # async def get_param_info(para):
        #     _msg2 = """
        #         # 需求
        #         从文本中提取区间起始偏移位置和区间长度,单位为比特。文本中的内容为区间描述,其中:Byte<N> 表示第 N 个字节,N 从 1 开始,B<X> 表示第 X 个比特,X 从 0 - 7 ,区间为闭区间。
        #         所有数学计算是需要算数表达式,不需要计算结果。计算公式如下:
        #         - ByteN_BX起始和结束位置:(N - 1)*8 + X
        #         - ByteN 起始位置:(N - 1)*8
        #         - ByteN 结束位置:(N - 1) * 8 + 7
        #         - 长度:结束偏移位置 + 1 - 起始偏移位置,闭区间的长度需要结束位置加1再减去起始位置
        #         # 生成模板
        #         推理过程:简要说明提取信息及调用 tool 计算的过程。输出结果:按 JSON 格式输出,格式如下:
        #         {
        #           "offset": "起始偏移位置表达式",
        #           "length": "长度计算表达式"
        #         }
        #         文本:
        #         """+f"""
        #         {para['posText']}
        #     """
        #     text2 = await asyncio.to_thread(self.generate_text_json, _msg2, '', [], '')
        #     try:
        #         out = json.loads(text2)
        #         para['pos'] = eval(out['offset'])
        #         para['posRet'] = text2
        #         para['length'] = eval(out['length'])
        #         para['id'] = re.sub(r"[^0-9a-zA-Z_]", "_", para['code'])
        #         para['type'] = 'para'
        #     except Exception as e:
        #         print(e)
        # tasks = []
        # for param in params:
        #     tasks.append(get_param_info(param))
        #
        # s = time.time()
        # await asyncio.gather(*tasks)
        # e = time.time()
        if params:
            offset = params[0]['pos']
            for para in params:
                para['pos'] -= offset
        # print(f'======参数数量:{len(params)},耗时:{e - s}')
        utils.save_text_to_file(json.dumps(pkt, ensure_ascii=False, indent=4), cache_file)
        return pkt
    def gen_pkts(self):
    async def gen_pkts(self):
        _msg = """
#角色
你是一名资深软件工程师。
@@ -878,7 +950,7 @@
- name:名称中不要包含代号,仅从文档中提取源包名称;
- 如果没有代号,使用遥测包名称的英文翻译代替;
- 如果没有名称用代号代替;
- 不要漏掉任何遥测包;
- 注意,一定要输出所有的遥测包,不要漏掉任何一个遥测包;
- 数据结构最外层为数组数组元素为遥测包,不包括遥测包下面的参数。
#例子
[
@@ -890,29 +962,24 @@
"""
        print(f'遥测源包列表:')
        doc_text = self.get_text_with_entity(['源包列表'])
        text = self.generate_text_json(_msg, 'out/源包列表.json', doc_text=doc_text)
        text = await asyncio.to_thread(self.generate_text_json, _msg, f'{self.json_path}/源包列表.json', doc_text=doc_text)
        pkt = json.loads(text)
        return pkt
    def gen_pkt_vc(self):
    async def gen_pkt_vc(self):
        _msg = """
#角色
你是一名资深软件工程师。
#指令
我需要从文档中提取所有遥测源包信息,你要帮助我完成遥测源包信息的提取。
#需求
根据文档内容输出遥测源包信息,顶级结构为数组,元素为遥测源包,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs),下传时机(timeTags)。
#约束
- 所属虚拟信道:必须是文档中描述的遥测虚拟信道代号(序号);
- 下传时机:与表格中定义的一致;
- 不要遗漏任何遥测源包。
#例子:
根据文档内容输出遥测源包信息,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs)。
所有字段仅使用文档内容输出。
表格中遥测源包不是按名称来排序的,按照文档中的表格中的遥测源包顺序进行输出。
每个包都要输出。
所属虚拟信道:通过表格中描述的下传时机和虚拟信道的划分,获取下传时机对应的虚拟信道代号(序号),并组织为一个数据进行输出,例如:下传时机为实时和延时,那么就表示该包的所属虚拟信道为VC1和VC3。如果没有匹配下传时机,就填入空数组。
# 输出示例:
[
  {
    "id": "PMS001",
    "name": "数管数字量快速源包",
    "vcs": ["VC1"],
    "timeTags": ["实时"]
    "vcs": ["VC1",'VC2']
  },
]
        """
@@ -923,7 +990,8 @@
            assert len(pkts), 'VC源包列表不能为空'
        doc_text = self.get_text_with_entity(['虚拟信道定义', '遥测源包下传时机'])
        text = self.generate_text_json(_msg, 'out/遥测VC源包.json', doc_text=doc_text, validation=validation)
        text = await asyncio.to_thread(
            lambda: self.generate_text_json(_msg, f'{self.json_path}/遥测VC源包.json', doc_text=doc_text, validation=validation))
        pkt_vcs = json.loads(text)
        return pkt_vcs
@@ -967,46 +1035,45 @@
}
"""
        print('遥测包格式:')
        text = self.generate_text_json(_msg, 'out/数据包格式.json', files=[file_map['遥测大纲']])
        text = self.generate_text_json(_msg, f'{self.json_path}/数据包格式.json', files=[file_map['遥测大纲']])
        pkt_formats = json.loads(text)
        return pkt_formats
    def compute_length_pos(self, items: list):
        length = 0
        pos = 0
        for child in items:
            if 'children' in child:
                self.compute_length_pos(child['children'])
            child['pos'] = pos
            if 'length' in child and isinstance(child['length'], int):
                length = length + child['length']
                pos = pos + child['length']
        items.sort(key=lambda x: x['pos'])
        # for child in items:
        #     if 'children' in child:
        #         self.compute_length_pos(child['children'])
        #     if 'length' in child and isinstance(child['length'], int):
        #         length = length + child['length']
        #         pos = pos + child['length']
        # node['length'] = length
    def gen_bus(self, proj_pk, rule_enc, rule_id, ds, name_path, dev_name):
    async def gen_bus(self, proj_pk, rule_enc, rule_id, ds, name_path, dev_name):
        _msg = """
#角色
你是一名资深的软件工程师
#指令
我需要从文档中提取经总线的数据包列表,你要帮助我完成经总线的数据包列表的提取。
你是一名资深的软件工程师。
#需求
请析文档,列出总线通信包传输约定中描述的所有数据包列表,
数据包字段包括:id(数据包代号)、name(数据包名称)、apid(16进制字符串)、service(服务子服务)、length(bit长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、
transSer(传输服务)、note(备注)、rtAddr(所属RT的地址十进制)、rt(所属rt名称)、throughBus(是否经过总线)、burst(是否突发)、transDirect(传输方向)。
请分析文档中的表格,按表格顺序输出表格中的所有源包信息;
数据包字段包括:id(数据包代号)、name(数据包名称)、apid(16进制字符串)、service(服务子服务)、length(长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、
transSer(传输服务)、note(备注)、throughBus(是否经过总线)、transDirect(传输方向)。
文档中如果没有数据包表则输出:[]。
# 数据包字段说明
- frameNum(通信帧号):文档中通信帧号列的内容;
- subAddr(子地址/模式):值只能是:“深度”、“平铺”、数字或null,如果是“/”则是null;
- throughBus(是否经过总线)的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线;
- transSer(传输服务分三种):置数(SetData)、取数(GetData)、数据块传输(DataBlock),根据表格中的“传输服务”列进行判断;
#约束
- frameNum:使用文档中的文本不要做任何转换;
- subAddr:值为“深度”、“平铺”、“数字”或null;
- 是否经过总线的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线;
- 传输服务分三种:SetData(置数)、GetData(取数)、DataBlock(数据块传输);
- 传输方向分”收“和”发“,传输服务如果是”取数“是”收“,如果是”数据块传输“则根据包所在的分系统以及表格的”传输方向“列进行判断,判断对于SMU来说是收还是发;
- 是否突发:根据表格中的”传输周期“列进行判断,如果填写了类似”突发“的文字表示是突发否则表示不是突发;
- 不要漏掉任何一个数据包;
- 数据结构最外层是数组,数组元素为数据包,以JSON格式输出,不要输出JSON以外的任何文本。
- 仅输出json。
- 按照表格中的顺序进行输出。
- 不要漏包。
#例子
[
    {
        "id": "PCS005",
        "name": "总线管理(内部指令)",
        "id": "P001",
        "name": "xxx",
        "apid": "418",
        "service": "(1, 2)",
        "length": 1,
@@ -1015,53 +1082,142 @@
        "frameNum": "1|2",
        "transSer": "DataBlock",
        "note": "",
        "rtAddr": 28,
        "rt": "数据接口单元XIU",
        "throughBus": true,
        "burst": true,
        "transDirect": "发"
        "transDirect": ""
    }
]
"""
        print('总线数据包:')
        def validation(gen_text):
            json.loads(gen_text)
            pkts2 = json.loads(gen_text)
            assert not next(filter(lambda pkt2: 'transSer' not in pkt2, pkts2), None), '总线包属性生成不完整,缺少transSer。'
        doc_text = self.get_text_with_entity(['RT地址分配', '分系统源包'])
        text = self.generate_text_json(_msg, 'out/总线.json', doc_text=doc_text,
        rt_doc_text = self.get_text_with_entity(['RT地址分配'])
        subsys_pkt_texts = self.get_text_list_with_entity(['分系统源包'])
        tasks = []
        rt_adds = []
        for subsys_pkt_text in subsys_pkt_texts:
            doc_text = f'{rt_doc_text}\n{subsys_pkt_text}'
            subsys = subsys_pkt_text[:subsys_pkt_text.index("\n")]
            # 单独获取RT地址,并应用到章节下所有包
            get_rt_msg = f"""返回{subsys}的RT地址,仅输出十进制的结果,不要输出其他内容,如果是系统管理单元(SMU)则返回0。"""
            rt_info = self.generate_text_json(get_rt_msg, "", doc_text=rt_doc_text)
            if rt_info == '0':
                continue
            rt_adds.append({
                "rt": subsys,
                "rt_addr": rt_info
            })
            # md5 = utils.generate_text_md5(subsys_pkt_text)
            task = asyncio.to_thread(self.generate_text_json, _msg,
                                     f"{self.json_path}/总线-{utils.to_file_name(subsys)}.json", doc_text=doc_text,
                                       validation=validation)
        pkts = json.loads(text)
            tasks.append(task)
        results = await asyncio.gather(*tasks)
        pkts = []
        # 判断是否存在总线数据包.json
        if os.path.isfile(f"{self.json_path}/总线数据包列表.json"):
            pkts = read_from_file(f"{self.json_path}/总线数据包列表.json")
            pkts = json.loads(pkts)
        else:
            pktid_apid_map = {}
            for index, result in enumerate(results):
                pkts_diretions = []
                # 全角空格去除
                result = re.sub(r' ', '', result)
                _pkts = json.loads(result)
                rt_name = rt_adds[index]["rt"]
                for _pkt in _pkts:
                    # 应用RT地址
                    _pkt['rt'] = rt_name
                    _pkt['rtAddr'] = rt_adds[index]["rt_addr"]
                    _pkt['burst'] = "突发" in f"{_pkt['interval']}"
                    if _pkt['apid'] is None or not re.match(r'[0-9A-Fa-f]+', _pkt['apid']):
                        _pkt['apid'] = ''
                    if _pkt['id'] in pktid_apid_map:
                        if _pkt['apid']:
                            pktid_apid_map[_pkt['id']] = _pkt['apid']
                        else:
                            _pkt['apid'] = pktid_apid_map[_pkt['id']]
                    else:
                        pktid_apid_map[_pkt['id']] = _pkt['apid']
                        # 转为bit长度
                        if _pkt['length']:
                            if isinstance(_pkt['length'], str) and re.match(r'^\d+$', _pkt['length']):
                                _pkt['length'] = int(_pkt['len']) * 8
                            elif isinstance(_pkt['length'], int):
                                _pkt['length'] = _pkt['length'] * 8
                        pkts.append(_pkt)
                    # 获取待处理的传输方向信息
                    pkts_diretions.append({
                        'id': _pkt['id'],
                        'rt': _pkt['rt'],
                        'transDirect': _pkt['transDirect'],
                    })
                # 处理传输方向
                _msg = """
                    处理传入的json数组,每个数组对象中包含字段:rt(自身设备)、transDirect(传输方向)。
                    需要你给数组对象中多加一个字段,输出数组中单个对象的传输类型(transType),传输类型有两种值“收”和“发”,判断依据是根据传输方向的内容进行判断,由rt发送给SMU的传输类型是“收”,由SMU发送给rt的传输类型是“发”
                    rt字段为空的数据不用处理。
                    在transDirect字段中rt可能为缩写,缩写对应的rt名称可以从文档中进行读取。
                    输出结果将增加了字段的json数组直接输出,不用输出其他内容。
                    输出示例:[{"id": "PMK013", "rt": "中心控制单元CCU", "transDirect": "CCU→SMU→地面", "transType": "收"},{"id": "PMK055", "rt": "中心控制单元CCU", "transDirect": "SMU→CUU", "transType": "发"}]
                    """ + f"""
                    JSON:{pkts_diretions}
                """
                result_json = self.generate_text_json(_msg, f"{self.json_path}/总线-rt-{utils.to_file_name(rt_name)}.json", doc_text=rt_doc_text)
                # 将处理结果同步修改到result
                for pkt in _pkts:
                    for data in json.loads(result_json):
                        if "transType" in data:
                            if data['id'] == pkt['id']:
                                pkt['transType'] = data['transType']
                                break
        print(f"总线源包个数:{len(pkts)}")
        # 筛选经总线的数据包
        pkts = list(filter(lambda it: it['throughBus'], pkts))
        no_apid_pkts = list(filter(lambda it: not it['apid'], pkts))
        # 筛选有apid的数据包
        pkts = list(filter(lambda it: it['apid'], pkts))
        # 筛选rtAddr不为0的数据包,SMU的
        pkts = list(filter(lambda it: it['rtAddr'] != '0', pkts))
        # 储存所有总线包
        save_to_file(json.dumps(pkts, ensure_ascii=False, indent=2), f"{self.json_path}/总线数据包列表.json")
        tasks = []
        def _run(gen_pkt_details, pkt2):
            _pkt2 = asyncio.run(gen_pkt_details(pkt2['name'], pkt2['id']))
            if _pkt2 is not None:
                pkt2['children'] = []
                pkt2['children'].extend(_pkt2['datas'])
        for pkt in pkts:
            _pkt = self.gen_pkt_details(pkt['name'], pkt['id'])
            if _pkt:
                pkt['children'] = []
                pkt['children'].extend(_pkt['datas'])
                pkt['length'] = _pkt['length']
            pkt_task = asyncio.to_thread(_run, self.gen_pkt_details, pkt)
            tasks.append(pkt_task)
        await asyncio.gather(*tasks)
        rt_pkt_map = {}
        for pkt in pkts:
            # 根据数据块传输和取数分组
            # 逻辑封装包的解析规则ID:RT[rt地址]SUB[子地址]S(S代表取数,方向是AA表示发送;R代表置数,方向是BB表示接受)
            # 取数:逻辑封装包根据子地址和帧号组合创建,有几个组合就创建几个逻辑封装包
            # 数据块:只有一个逻辑封装包
            if pkt['subAddr'] is not None and not isinstance(pkt['subAddr'], int) and pkt['subAddr'].find("/") > -1:
                pkt['subAddr'] = pkt['subAddr'].split("/")[0]
            # 处理子地址
            if pkt['burst']:
                # 突发包子地址是18~26
                pkt['subAddr'] = 26
            elif pkt['subAddr'] == '平铺' or pkt['subAddr'] is None:
            if pkt['subAddr'] == '平铺' or not pkt['subAddr']:
                # 平铺:11~26,没有填写的默认为平铺
                pkt['subAddr'] = 26
                pkt['subAddr'] = '11~26'
            elif pkt['subAddr'] == '深度':
                # 深度:11
                pkt['subAddr'] = 11
                pkt['subAddr'] = '11'
            pkt['burst'] = "突发" in f"{pkt['interval']}"
            # 处理帧号
            if pkt['burst']:
                # 突发:ALL
@@ -1082,12 +1238,15 @@
                # 取数
                pkt_id = f"RT{rt_addr}SUB{sub_addr}"
                vals = f"{rt_addr}/{sub_addr}/0xAA/{frame_no}/"
                rt_pkt_map_gen(pkt, '取数', rt_pkt_map, pkt_id, vals)
                rt_pkt_map_gen(pkt, '取数', rt_pkt_map, pkt_id, vals, pkts)
            elif trans_ser == 'DataBlock':
                # 数据块
                direct = '0xAA'
                rt_pkt_map_gen(pkt, '数据块传输', rt_pkt_map, f"RT{rt_addr}SUB{sub_addr}{direct}",
                               f"{rt_addr}/{sub_addr}/{direct}/ALL/")
                if pkt['transDirect'] == '发':
                    direct = '0xBB'
                pkt_id = f"RT{rt_addr}SUB{sub_addr}{direct}"
                vals = f"{rt_addr}/{sub_addr}/{direct}/ALL/"
                rt_pkt_map_gen(pkt, '数据块传输', rt_pkt_map, pkt_id, vals, pkts)
        _pkts = []
        for k in rt_pkt_map:
            _pkts.append(rt_pkt_map[k])
@@ -1123,16 +1282,26 @@
            rule_enc.C_KEY = sub_key
            update_rule_enc(rule_enc)
    def gen_tc(self):
    async def gen_tc(self):
        # 数据帧格式
        frame = self.gen_tc_transfer_frame_format()
        frame_task = self.gen_tc_transfer_frame_format()
        # 遥控包格式
        pkt_format = self.gen_tc_pkt_format()
        pkt_format_task = self.gen_tc_pkt_format()
        # 遥控包列表
        instructions = self.gen_tc_transfer_pkts()
        instructions_task = self.gen_tc_transfer_pkts()
        result = await asyncio.gather(frame_task, pkt_format_task, instructions_task)
        frame = result[0]
        pkt_format = result[1]
        instructions = result[2]
        tasks = []
        for inst in instructions:
            # 遥控指令数据区内容
            self.gen_tc_pkt_details(inst)
            tasks.append(self.gen_tc_details(inst))
        await asyncio.gather(*tasks)
        for inst in instructions:
            inst['type'] = 'insUnit'
            format_text = json.dumps(pkt_format, ensure_ascii=False)
            format_text = utils.replace_tpl_paras(format_text, inst)
@@ -1164,7 +1333,9 @@
            elif item['type'] == 'pkt':
                return '''{"MaxLength":1024,"IsSplit8":false,"Split8Start":null,"Split8End":null,"PadCode":null,"Alignment":null,"InputParams":[],"OutPutParams":[],"MatchItems":[]}'''
            elif item['type'] == 'pktSeqCnt':
                return json.dumps({"FirstPackValue":"PackCount","MiddlePackValue":"PackIndex","LastPackValue":"PackIndex","IndependPackValue":"InsUnitCount"})
                return json.dumps(
                    {"FirstPackValue": "PackCount", "MiddlePackValue": "PackIndex", "LastPackValue": "PackIndex",
                     "IndependPackValue": "InsUnitCount"})
            elif 'value' in item:
                return item['value']
@@ -1202,8 +1373,11 @@
            # 即时输入长度为null则是变长字段,需要把类型改为variableLength
            if field['type'] == 'input' and field['length'] is None:
                field['type'] = 'variableLength'
                if isinstance(field['value'], dict):
                    field['range'] = f'{field["value"]["minLength"]}~{field["value"]["maxLength"]}'
            # 枚举值默认值设置
            if field['type'] == 'enum' and len(field['enums']) and not next(filter(lambda x: 'default' in x and x['default'], field['enums']), None):
            if field['type'] == 'enum' and len(field['enums']) and not next(
                    filter(lambda x: 'default' in x and x['default'], field['enums']), None):
                field['enums'][0]['default'] = True
            # 校验和
            if field['type'] == 'checkSum':
@@ -1234,7 +1408,7 @@
        create_tc_format(None, frame)
    def gen_tc_transfer_frame_format(self):
    async def gen_tc_transfer_frame_format(self):
        _msg = '''
# 角色
你是一名资深的软件工程师。
@@ -1251,7 +1425,7 @@
# 数据类型
- const:固定码字,数值,二进制以B结尾,十进制,十六进制以0x开头;
- sendFlag:发送标记,类似枚举,定义样例:[{"n":"name","v":"value","c":"code","default":true}],n表示名称,v表示值,c表示code(没有空着),default表示是默认值;
- checkSum:校验和,如果是校验和类型还需要分析校验和的算法,并保存在value中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other)
- checkSum:校验和,如果是校验和类型还需要分析校验和的算法,并保存在value的type中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other)
# 约束
- 以JSON格式输出;
- 仅输出JSON文本,不要输出任何其他文本。
@@ -1267,15 +1441,16 @@
            json.loads(gen_text)
        doc_text = self.get_text_with_entity(['遥控帧格式'])
        text = self.generate_tc_text(_msg, 'out/tc_transfer_frame.json', doc_text=doc_text,
                                     validation=validation)
        text = await asyncio.to_thread(
            lambda: self.generate_tc_text(_msg, f'{self.json_path}/tc_transfer_frame.json', doc_text=doc_text,
                                          validation=validation))
        result: dict = json.loads(text)
        format_text = utils.read_from_file('tpl/tc_transfer_frame.json')
        format_text = utils.replace_tpl_paras(format_text, result)
        frame = json.loads(format_text)
        return frame
    def gen_tc_pkt_format(self):
    async def gen_tc_pkt_format(self):
        _msg = '''
# 角色
你是一名资深的软件工程师。
@@ -1283,26 +1458,30 @@
分析遥控包格式,提取遥控包格式的字段定义。
# 需求
要提取值的包格式字段:
- 包版本号: const,二进制;
- 包类型: const,二进制;
- 数据区头标志: const,二进制;
- 序列标志: const,二进制;
- 包长:length,
- 副导头标志: const,二进制;
- 遥控包版本号: const,二进制;
- 命令正确应答: const,二进制;
- 源地址: const,十六进制。
- packetVersionNumber:包版本号,const,二进制;
- packetType:包类型,const,二进制;
- dataFieldHeaderFlag:数据区头标志,const,二进制;
- sequenceFlags:序列标志,const,二进制;
- ccsdsSecondaryHeaderFlag:副导头标志,const,二进制;
- tcPktVersionNumber:遥控包版本号,const,二进制;
- acknowledgmentFlag:命令正确应答,const,二进制;
- sourceAddr:源地址,const,十六进制。
# 数据类型
- 固定码字:const,数值,二进制以B结尾,十进制,十六进制以0x开头;
- 长度:length,如果字段描述内容为数据区域的长度则表示是长度,长度的value为数值、null或范围定义,
- 枚举值:enum,
- 校验和:checkSum,如果是校验和类型还需要分析校验和的算法,并保存在value中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other)
- 即时输入:input。
# 长度类型的范围定义描述
- 校验和:checkSum,如果是校验和类型还需要分析校验和的算法,并保存在value的type中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other)
- 即时输入:input,如果是即时输入value的值为变长定义。
## 长度类型的范围定义描述
{"start": "起始字段code", "end": "结束字段code", "formula": "计算公式"}
- start:起始字段code,长度包括起始字段,字段描述中说明了起始字段,
- end:结束字段code,长度包括结束字段,字段描述中说明了结束字段,
- formula:计算公式,如果没有计算相关描述则表示不需要计算公式。
## 即使输入类型的变长定义描述
{"minLength": "最小长度", "maxLength": "最大长度", "variableLength": true}
- minLength:最小长度,
- maxLength:最大长度,
- variableLength:是否是变长。
计算公式定义:
- BYTES:按字节计算;
- N-x:总字节数减x,例如总字节数减1的公式为N-1。
@@ -1311,8 +1490,8 @@
- 仅输出JSON文本,不要输出任何其他文本。
# 输出例子:
{
    "包版本号": "00B",
    "包类型": "1B",
    "packetVersionNumber": "00B",
    "packetType": "1B",
    ...
}
'''
@@ -1321,8 +1500,9 @@
            json.loads(gen_text)
        doc_text = self.get_text_with_entity(['遥控包格式'])
        text = self.generate_tc_text(_msg, 'out/tc_transfer_pkt.json', doc_text=doc_text,
                                     validation=validation)
        text = await asyncio.to_thread(
            lambda: self.generate_tc_text(_msg, f'{self.json_path}/tc_transfer_pkt.json', doc_text=doc_text,
                                          validation=validation))
        result = json.loads(text)
        format_text = utils.read_from_file('tpl/tc_pkt_format.json')
@@ -1330,7 +1510,7 @@
        pkt_format = json.loads(format_text)
        return pkt_format
    def gen_tc_transfer_pkts(self):
    async def gen_tc_transfer_pkts(self):
        _msg = '''
# 角色
你是一名资深的软件工程师。
@@ -1338,17 +1518,22 @@
分析文档列出所有的遥控指令。
#  约束
- 应用过程标识:应用过程标识就是APID,一般会在名称后的括号中列出来;
- code:指令代号,没有就空着;
- name:指令名称,根据表格内容提取,注意名称需要提取完整,如果有多列则合并用-分割;
- 应用数据区:提取表格中的应用数据区内容。
- code:指令代号,如果没有填写或者是“/”则使用空字符串代替;
- name:指令名称,根据表格行内容提取,注意是行内容,注意名称需要提取完整,如果有多列则合并用-分割;
- shortName:指令名称,根据表格内容提取;
- apid: 应用过程标识符;
- serviceType:服务类型;
- serviceSubtype:服务子类型;
- dataArea:应用数据区,提取表格中的应用数据区内容。
# 输出例子:
[{
"name": "xxx",
"name": "aaa-xxx",
"shortName": "xxx"
"code":"pkt",
"应用过程标识符":"0xAA",
"服务类型":"0x1",
"服务子类型":"0x2",
"应用数据区": ""
"apid":"0xAA",
"serviceType":"0x1",
"serviceSubtype":"0x2",
"dataArea": ""
}]
'''
@@ -1356,14 +1541,15 @@
            json.loads(gen_text)
        doc_text = self.get_text_with_entity(['APID分配'])
        text = self.generate_tc_text(_msg, 'out/tc_transfer_pkts.json', doc_text=doc_text,
                                     validation=validation)
        text = await asyncio.to_thread(
            lambda: self.generate_tc_text(_msg, f'{self.json_path}/tc_transfer_pkts.json', doc_text=doc_text,
                                          validation=validation))
        pkts = json.loads(text)
        return pkts
    def gen_tc_pkt_details(self, pkt):
    async def gen_tc_details(self, pkt):
        result = []
        tc_name = pkt['name']
        tc_name = pkt['shortName']
        tc_code = pkt['code']
        pkt['name'] = f'{tc_code} {tc_name}'
        _msg = f"""
@@ -1380,10 +1566,10 @@
- 固定码字:const,数值,二进制以B结尾,十进制,十六进制以0x开头;
- 长度:length,如果字段描述内容为数据区域的长度则表示是长度,长度的value为数值、null或范围定义,
- 枚举值:enum,
- 校验和:checkSum,如果是校验和类型还需要分析校验和的算法是什么,并保存在value中,
- 即时输入:input,如果是即时输入value的值为空字符串。
- 校验和:checkSum,如果是校验和类型还需要分析校验和的算法是什么以及校验数据域范围,并保存在value中,例如:{ "type":"CRC-CCITT", "start": "START", "end":"END" },
- 即时输入:input,如果是即时输入value的值为变长定义。
# 长度类型的范围定义描述
## 长度类型的范围定义描述
{"start": "起始字段code", "end": "结束字段code", "formula": "计算公式"}
- start:起始字段code,长度包括起始字段,字段描述中说明了起始字段,
- end:结束字段code,长度包括结束字段,字段描述中说明了结束字段,
@@ -1391,22 +1577,31 @@
计算公式定义:
- BYTES:按字节计算,字节数;
- N-x:总字节数减x,例如总字节数减1的公式为N-1。
## 即使输入类型的变长定义描述
{"minLength": "最小长度", "maxLength": "最大长度", "variableLength": true}
- minLength:最小长度,
- maxLength:最大长度,
- variableLength:是否是变长。
# 字段类型分析方法
- 根据字段描述分析字段的类型;
- 字段描述中明确指定了字段值的,类型为const;
- 字段描述中没有明确指定字段值,但是罗列了取值范围的,类型为enum;
- 字段描述中如果没有明确指定字段值也没有罗列取值范围的,类型为input;
- 字段如果是和“长度”有关,类型为length;
- 字段如果描述了当前指令中的数据域长度以及长度范围则是长度类型length,否则不是长度类型;
- 如果和数据域有关,类型为const;
- 字段如果和校验和有关,类型为checkSum,分析校验和的算法,并保存在value中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other)。
- 校验和类型:字段如果与当前指令数据区的校验和有关则为校验和类型否则不是校验和类型,分析校验和的算法,并保存在value中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other)。
# 约束
- code 如果没有明确定义则使用名称的英文翻译,尽量简短;
## 字段属性
- code 如果没有明确定义则使用名称的英文翻译,尽量简短,如果没有填写或者为斜线表示没有明确定义;
- length 自动转换为bit长度,必须是数值、null或范围定义,不能为0;
- value 根据字段描述提取字段值,字段值一般为数值类型,需要根据字段类型来分析,如果是length类型value的值为范围定义;
- enums 枚举类型的字段必须要有enums,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""};
- length类型字段的范围定义中的start和end必须是生成结果中的字段code,长度包括start和end,必须使用长度描述中的字段;
## 字段类型
- 长度类型字段的范围定义中的start和end必须是生成结果中的字段code,长度范围包括start和end,必须使用长度描述中的字段;
- 如果没有长度范围描述则不是长度类型;
- 校验和类型字段必须描述的是当前指令数据域校验和,如果描述的不是当前指令的数据域校验和则不是校验和类型;
- 输出数据结构为数组,数组元素为字段信息;
- 输出内容必须为严格的json,不能输出除json以外的任何内容。
@@ -1427,11 +1622,26 @@
        "value": {"start": "data", "end": "data", "formula": "BYTES"}
    },
    {
        "name": "para3",
        "code": "para3",
        "length": 8,
        "type": "enum",
        "value": "",
        "enums": [{"n":"参数1","v":"0x0A","c":"Para1"}]
    },
    {
        "name": "数据",
        "code": "data",
        "length": null,
        "type": "input",
        "value": ""
    },
    {
        "name": "校验和",
        "code": "checksum",
        "length": 2,
        "type": "checkSum",
        "value": { "type": "CRC-CCITT", "start":"para1", "end":"data" }
    }
]
"""
@@ -1461,21 +1671,56 @@
        doc_text = self.get_text_with_entity([tc_name])
        if doc_text == '':
            doc_text = pkt['应用数据区']
        text = self.generate_tc_text(_msg,
                                     f'out/遥控指令数据域-{tc_code}-{utils.to_file_name(tc_name)}.json',
                                     doc_text=doc_text,
                                     validation=validation)
            doc_text = self.get_text_with_tc_name(tc_name)
        if doc_text == '':
            doc_text = pkt['dataArea']
        text = await asyncio.to_thread(self.generate_tc_text, _msg,
                                       f"{self.json_path}/遥控指令数据域-{tc_code}-{utils.to_file_name(tc_name)}.json",
                                       doc_text=doc_text, validation=validation)
        result = json.loads(text)
        pkt['children'] = result
    def get_text_with_tc_name(self, tc_name: str):
        entities = doc_dbh.get_entities_by_type('指令格式配置')
        entity_names = '\n'.join([f'- {e.name}' for e in entities])
        msg = f"""
# 需求
请从下列指令名称中匹配一个与“{tc_name}”相似度最高的指令名称。
指令名称列表:
{entity_names}
"""
        name = self.generate_text(msg,None)
        entity = next(filter(lambda e: e.name == name, entities,None))
        if entity:
            return self.get_text_with_entity([entity.name])
        else:
            return ''
if __name__ == '__main__':
def tc_data_generate():
    exe_path = os.path.dirname(__file__) + "/db_tc_generator/InstructionGenerator.exe"
    db_path = os.path.dirname(__file__) + "/db.db"
    try:
        os.makedirs("./out/pkts", exist_ok=True)
        os.makedirs("./out/tmp", exist_ok=True)
        # 超时时间240秒
        result = subprocess.run([exe_path, db_path], timeout=240)
        print(result.stdout)
        print(result.returncode)
    except subprocess.TimeoutExpired:
        print("警告:指令数据生成失败!")
def main():
    try:
        project_path = r'D:\projects\KnowledgeBase'
        doc_dbh.set_project_path(project_path)
        # 启动大模型处理流程
        ret_text = DbStructFlow().run()
        asyncio.run(DbStructFlow(f'{project_path}').run())
        # 生成指令数据表
        tc_data_generate()
    except KeyboardInterrupt:
        if g_completion:
            g_completion.close()
if __name__ == '__main__':
    main()