lyg
2025-05-08 494637879bc8f5dd9c3d43481927b4a0c07e2f34
db_struct_flow.py
@@ -6,50 +6,64 @@
import re
import json
from langchain_community.chat_models import ChatOpenAI
from langchain_core.prompts import HumanMessagePromptTemplate, SystemMessagePromptTemplate
import data_templates
from knowledgebase import utils
from knowledgebase.db.db_helper import create_project, create_device, create_data_stream, \
    update_rule_enc, create_extend_info, create_ref_ds_rule_stream, create_ins_format
from knowledgebase.db.data_creator import create_prop_enc, create_enc_pkt, get_data_ty, create_any_pkt
from knowledgebase.db.models import TProject
file_map = {
    "文档合并": "./doc/文档合并.md",
    "遥测源包设计报告": "./doc/XA-5D无人机分系统探测源包设计报告(公开).md",
    "遥测大纲": "./doc/XA-5D无人机探测大纲(公开).md",
    "总线传输通信帧分配": "./doc/XA-5D无人机1314A总线传输通信帧分配(公开).md",
    "应用软件用户需求": "./doc/XA-5D无人机软件用户需求(公开).docx.md",
    "指令格式": "./doc/ZL格式(公开).docx.md"
}
# file_map = {
#     # "遥测源包设计报告": "./doc/HY-4A数管分系统遥测源包设计报告 Z 240824 更改3(内部) .docx.md",
#     # "遥测源包设计报告": "./doc/数管数字量快速源包.md",
#     # "遥测源包设计报告": "./doc/数管数字量中速源包.md",
#     # "遥测源包设计报告": "./doc/硬通道设备工作状态数据包.md",
#     # "遥测源包设计报告": "./doc/DIU遥测模块采集的DS量4.md",
#     "遥测源包设计报告": "./doc/DIU遥测模块模拟量.md",
#     "遥测大纲": "./doc/HY-4A卫星遥测大纲 Z 240824 更改3(内部).docx.md",
#     # "总线传输通信帧分配": "./doc/HY-4A卫星1553B总线传输通信帧分配 Z 240824 更改3(内部).docx.md",
#     "总线传输通信帧分配": "./doc/总线.md",
#     "应用软件用户需求": "./doc/HY-4A数管分系统应用软件用户需求(星务管理分册) Z 240831 更改4(内部).docx.md"
# }
# file_map = {
#     "遥测源包设计报告": "./docs/HY-4A数管分系统遥测源包设计报告 Z 240824 更改3(内部) .docx.md",
#     "遥测大纲": "./docs/HY-4A卫星遥测大纲 Z 240824 更改3(内部).docx.md",
#     "总线传输通信帧分配": "./docs/HY-4A卫星1553B总线传输通信帧分配 Z 240824 更改3(内部).docx.md",
#     "应用软件用户需求": "./docs/HY-4A数管分系统应用软件用户需求(星务管理分册) Z 240831 更改4(内部).docx.md"
# }
# file_map = {
#     "文档合并": "./doc/文档合并.md",
#     "遥测源包设计报告": "./doc/XA-5D无人机分系统探测源包设计报告(公开).md",
#     "遥测大纲": "./doc/XA-5D无人机探测大纲(公开).md",
#     "总线传输通信帧分配": "./doc/XA-5D无人机1314A总线传输通信帧分配(公开).md"
# }
file_map = {
    "文档合并": "./doc/文档合并.md",
    "遥测源包设计报告": "./doc/XA-5D无人机分系统探测源包设计报告(公开).md",
    "遥测大纲": "./doc/XA-5D无人机探测大纲(公开).md",
    "总线传输通信帧分配": "./doc/XA-5D无人机1314A总线传输通信帧分配(公开).md",
    "指令格式": "./doc/ZL格式(公开).docx.md"
}
BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1'
API_KEY = 'sk-15ecf7e273ad4b729c7f7f42b542749e'
MODEL_NAME = 'qwen2.5-14b-instruct-1m'
MODEL_NAME = 'qwen2.5-72b-instruct'
# BASE_URL = 'http://10.74.15.164:11434/v1/'
# API_KEY = 'ollama'
# MODEL_NAME = 'qwen2.5:32b-128k'
# BASE_URL = 'http://10.74.15.164:1001/api'
# API_KEY = 'sk-a909385bc14d4491a718b6ee264c3227'
# MODEL_NAME = 'qwen2.5:32b-128k'
# BASE_URL = 'http://chat.com/api'
# API_KEY = 'sk-49457e83f734475cb4cf7066c649d563'
# MODEL_NAME = 'qwen2.5:72b-120k'
# BASE_URL = 'http://10.74.15.171:8000/v1'
# API_KEY = 'EMPTY'
# MODEL_NAME = 'QwQ:32b'
# MODEL_NAME = 'vllm-Qwen-72b-4bit'
USE_CACHE = True
assistant_msg = """
# 角色
你是一个专业的文档通信分析师,擅长进行文档分析和通信协议分析,同时能够解析 markdown 类型的文档。拥有成熟准确的文档阅读与分析能力,能够妥善处理多文档间存在引用关系的复杂情况。
你是一名资深的软件工程师,擅长进行文档分析和通信协议分析,同时能够解析 markdown 类型的文档。拥有成熟准确的文档阅读与分析能力,能够妥善处理多文档间存在引用关系的复杂情况。
## 技能
### 技能 1:文档分析(包括 markdown 文档)
@@ -91,6 +105,7 @@
10. 以 JSON 格式组织输出内容,确保数据结构的完整性和可读性,注意:生成的JSON语法格式必须符合json规范,避免出现错误。
    
## 限制:
- id和code的命名规则:英文字母、数字、下划线组成,且以英文字母或下划线开头。
- 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。
- 不输出任何注释等描述性信息。
"""
@@ -110,10 +125,16 @@
            f.write(text)
def remove_think_tag(text):
    pattern = r'<think>(.|\n)*?</think>'
    result = re.sub(pattern, '', text)
    return result
json_pat = re.compile(r'```json(.*?)```', re.DOTALL)
def remove_markdown(text):
def get_json_text(text):
    # 使用正则表达式提取json文本
    try:
        return json_pat.findall(text)[0]
@@ -310,6 +331,7 @@
            # api_key="ollama",
            # base_url="http://192.168.1.48:11434/v1/",
        )
        # self.llm = ChatOpenAI(model=MODEL_NAME, temperature=0, api_key=API_KEY, base_url=BASE_URL)
    def run(self):
        # 生成型号结构
@@ -319,9 +341,9 @@
        # 生成遥测数据包结构
        self.proj = self.gen_project()
        devs = self.gen_device(self.proj)
        # devs = self.gen_device(self.proj)
        # self.gen_tc()
        self.gen_tc()
        return ''
    def _gen(self, msgs, msg, files=None):
@@ -341,10 +363,10 @@
            model=MODEL_NAME,
            messages=messages,
            stream=True,
            temperature=0.0,
            top_p=0,
            temperature=0.6,
            # top_p=0,
            timeout=30 * 60000,
            max_completion_tokens=1000000,
            max_completion_tokens=32000,
            seed=0
            # stream_options={"include_usage": True}
        )
@@ -358,7 +380,7 @@
        g_completion = None
        return text
    def generate_text(self, msg, cache_file, msgs=None, files=None, validation=None, try_cnt=5):
    def generate_text(self, msg, cache_file, msgs=None, files=None, validation=None, try_cnt=5, json_text=False):
        if msgs is None:
            msgs = []
        if USE_CACHE and os.path.isfile(cache_file):
@@ -366,7 +388,9 @@
        else:
            s = time.time()
            text = self._gen(msgs, msg, files)
            text = remove_markdown(text)
            text = remove_think_tag(text)
            if json_text:
                text = get_json_text(text)
            if validation:
                try:
                    validation(text)
@@ -374,10 +398,14 @@
                    print(e)
                    if try_cnt <= 0:
                        raise RuntimeError('生成失败,重试次数太多,强制结束!')
                    return self.generate_text(msg, cache_file, msgs, files, validation, try_cnt - 1)
            save_to_file(text, cache_file)
                    return self.generate_text_json(msg, cache_file, msgs, files, validation, try_cnt - 1)
            if cache_file:
                save_to_file(text, cache_file)
            print(f'耗时:{time.time() - s}')
        return text
    def generate_text_json(self, msg, cache_file, msgs=None, files=None, validation=None, try_cnt=5):
        return self.generate_text(msg, cache_file, msgs, files, validation, try_cnt, True)
    def generate_tc_text(self, msg, cache_file, messages=None, files=None, validation=None, try_cnt=5):
        if messages is None:
@@ -388,7 +416,7 @@
        if len(messages) == 0:
            # 如果是第一次提问加入system消息
            messages.append({'role': 'user', 'content': "以下是文档内容:\n" + doc_text})
        return self.generate_text(msg, cache_file, messages, files, validation, try_cnt)
        return self.generate_text_json(msg, cache_file, messages, files, validation, try_cnt)
    def gen_project(self):
        #         _msg = """
@@ -424,13 +452,41 @@
        proj_pk = proj.C_PROJECT_PK
        devices = []
        _msg = f"""
输出分系统下的硬件产品(设备)列表,字段包括:名称(name)、代号(code),硬件产品名称一般会包含“管理单元”或者“接口单元”,如果没有代号则使用名称的英文缩写代替缩写长度不超过5个字符;
并且给每个硬件产品增加三个字段:第一个字段hasTcTm“是否包含遥控遥测”,判断该硬件产品是否包含遥控遥测的功能、
第二个字段hasTemperatureAnalog“是否包含温度量、模拟量等数据的采集”,判断该硬件产品是否包含温度量等信息的采集功能、
第三个字段hasBus“是否是总线硬件产品”,判断该设备是否属于总线硬件产品,是否有RT地址;每个字段的值都使用true或false来表示。
仅输出JSON,结构最外层为数组,数组元素为设备信息,不要输出JSON以外的任何字符。
        """
        _msg = """
# 角色
你是一名资深软件工程师。
# 指令
我需要从文档提取设备列表信息,你要帮助我完成设备列表信息提取。
# 需求
输出分系统下的硬件产品(设备)列表,硬件产品名称一般会包含“管理单元”或者“接口单元”;
# 字段包括:
- 名称(name):设备名称;
- 代号(code):设备代号;
- 是否包含遥控遥测(hasTcTm):标识该硬件产品是否包含遥控遥测的功能,布尔值true或false;
- 是否包含温度量模拟量等数据的采集(hasTemperatureAnalog):标识该硬件产品是否包含温度量等信息的采集功能,布尔值true或false;
- 是否有总线硬件产品(hasBus):标识该设备是否属于总线硬件产品,是否有RT地址,布尔值true或false;
# 约束
- 如果没有代号则使用名称的英文缩写代替缩写长度不超过5个字符;
- 数据结构最外层为数组,数组元素为设备信息
- 仅输出JSON,不要输出JSON以外的任何字符。
# 例子
[
    {
        "name": "系统管理单元",
        "code": "SMU",
        "hasTcTm": true,
        "hasTemperatureAnalog": false,
        "hasBus": true
    },
    {
        "name": "1553B总线",
        "code": "1553",
        "hasTcTm": true,
        "hasTemperatureAnalog": true,
        "hasBus": true
    }
]
"""
        print('设备列表:')
        cache_file = 'out/设备列表.json'
@@ -439,7 +495,7 @@
            assert isinstance(_devs, list), '数据结构最外层不是数组'
            assert next(filter(lambda it: it['name'].endswith('管理单元'), _devs), None), '生成的设备列表中没有管理单元'
        text = self.generate_text(_msg, cache_file, files=[file_map['应用软件用户需求']], validation=validation)
        text = self.generate_text_json(_msg, cache_file, files=[file_map['应用软件用户需求']], validation=validation)
        devs = json.loads(text)
        # 类SMU设备,包含遥测和遥控功能,名称结尾为“管理单元”
@@ -485,10 +541,30 @@
    def gen_insert_domain_params(self):
        _msg = """
分析文档,输出插入域的参数列表,将所有参数全部输出,不要有遗漏。
数据结构最外层为数组,数组元素为参数信息对象,参数信息字段包括:name、id、pos、length、type。
1个字节的长度为8位,使用B0-B7来表示,请认真计算参数长度。
文档中位置描述信息可能存在跨字节的情况,,例如:"Byte1_B6~Byte2_B0":表示从第1个字节的第7位到第2个字节的第1位,长度是3;"Byte27_B7~Byte28_B0":表示从第27个字节的第8位到第28个字节的第1位,长度是2。
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中提取插入域的参数列表,你要帮助我完成插入域参数列表的提取。
#需求
分析文档,输出插入域的参数列表,将所有参数全部输出。
参数信息字段包括:name(参数名称)、id(参数代号)、pos(参数起始bit位置)、length(参数bit长度)、type(类型:para)。
注意:
1个字节的长度为8位,使用B0-B7来表示,请精确计算参数长度。
文档中位置描述信息可能存在跨字节的情况,例如:"Byte1_B6~Byte2_B0":表示从第1个字节的第7位到第2个字节的第1位,长度是3;"Byte27_B7~Byte28_B0":表示从第27个字节的第8位到第28个字节的第1位,长度是2。
#约束
- 不要遗漏任何参数;
- 数据结构最外层为数组,数组元素为参数信息对象;
- 仅输出JSON文本。
#例子
[
  {
    "name": "遥测模式字",
    "id": "TMS215",
    "pos": 0,
    "length": 8,
    "type": "para"
  }
]
"""
        print('插入域参数列表:')
        files = [file_map['遥测大纲']]
@@ -498,7 +574,7 @@
            assert isinstance(params, list), '插入域参数列表数据结构最外层必须是数组'
            assert len(params), '插入域参数列表不能为空'
        text = self.generate_text(_msg, './out/插入域参数列表.json', files=files, validation=validation)
        text = self.generate_text_json(_msg, './out/插入域参数列表.json', files=files, validation=validation)
        return json.loads(text)
    def gen_tm_frame_data(self):
@@ -613,12 +689,28 @@
    def gen_vc(self):
        _msg = """
请分析文档中的遥测包格式,输出遥测虚拟信道的划分,数据结构最外层为数组,数组元素为虚拟信道信息字典,字典包含以下键值对:
id: 虚拟信道代号
name: 虚拟信道名称
VCID: 虚拟信道VCID(二进制)
format: 根据虚拟信道类型获取对应的数据包的格式的名称
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中提取虚拟信道列表,你要帮助我完成虚拟信道列表的提取。
#需求
请分析文档中的遥测包格式以及遥测虚拟信道,输出遥测虚拟信道列表。
字段包括:id(虚拟信道代号)、name(虚拟信道名称)、VCID(虚拟信道VCID,二进制)、format(根据虚拟信道类型获取对应的数据包的格式的名称)
#上下文
深入理解文档中描述的关系,例如:文档中描述了常规遥测是常规数据的下传信道,并且还描述了分系统常规遥测参数包就是实时遥测参数包,并且文档中对实时遥测参数包的格式进行了描述,所以常规遥测VC应该输出为:{"id": "1", "name": "常规遥测VC", "VCID": "0", "format": "实时遥测参数包"}
#约束
- 数据结构最外层为数组,数组元素为虚拟信道信息;
- format:必须是数据包格式的名称;
- 仅输出JSON文本。
#例子:
[
    {
        "id": "VC0",
        "name": "空闲信道",
        "VCID": "111111",
        "format": "空闲包"
    }
]
"""
        def validation(gen_text):
@@ -626,17 +718,28 @@
            assert next(filter(lambda it: re.match('^[0-1]+$', it['VCID']), vcs)), '生成的VCID必须是二进制'
        print('虚拟信道:')
        text = self.generate_text(_msg, "out/虚拟信道.json", files=[file_map['遥测大纲']], validation=validation)
        text = self.generate_text_json(_msg, "out/虚拟信道.json", files=[file_map['遥测大纲']], validation=validation)
        vcs = json.loads(text)
        return vcs
    def gen_dev_pkts(self):
        _msg = f"""
输出文档中遥测源包类型定义描述的设备以及设备下面的遥测包,数据结构:最外层为数组 > 设备 > 遥测包列表(pkts),设备字段包括:名称(name)、代号(id),源包字段包括:名称(name)、代号(id)
        """
        _msg = """
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中提取设备以及设备下面的遥测包信息,你要帮助我完成提取。
#需求
输出文档中遥测源包类型定义描述的设备以及设备下面的遥测包。
#约束
- 数据结构:数组 > 设备 > 遥测包列表(pkts);
- 设备字段包括:名称(name)、代号(id);
- 源包字段包括:名称(name)、代号(id);
- 仅输出JSON文本。
#例子
"""
        print('设备遥测源包信息:')
        files = [file_map["遥测源包设计报告"]]
        text = self.generate_text(_msg, 'out/设备数据包.json', [], files)
        text = self.generate_text_json(_msg, 'out/设备数据包.json', [], files)
        dev_pkts = json.loads(text)
        return dev_pkts
@@ -647,9 +750,20 @@
        files = [file_map['遥测源包设计报告']]
        print(f'文档中有无“{pkt_name}”的字段描述:', end='')
        _msg = f"""
文档中有遥测包“{pkt_name}”的字段表描述吗?遥测包名称必须完全匹配。输出:“无”或“有”,不要输出其他任何内容。
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中分析判读是否有某个遥测包的字段表描述,你要帮助我判断。
#问题
文档中有遥测包“{pkt_name}”的字段表描述吗?
注意:遥测包的字段表紧接着遥测包章节标题,如果章节标题后面省略了或者详见xxx则是没有字段表描述。
根据文档内容输出。"""
#约束
- 根据文档内容输出;
- 遥测包名称必须完全匹配;
- 输出“无”或“有”,不要输出其他任何内容。
#例子
"""
        text = self.generate_text(_msg, f'out/pkts/有无数据包-{pkt_name}.txt', [], files)
        return text == '有'
@@ -657,22 +771,100 @@
        cache_file = f'out/数据包-{pkt_name}.json'
        files = [file_map['遥测源包设计报告']]
        if not os.path.isfile(cache_file):
            # 先问最后一个参数的字节位置
            print(f'遥测源包“{pkt_name}”信息:')
            _msg = f"""
输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包;
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中提取遥测源包的最后一个参数的bit位置和数据域参数个数,你要帮我完成参数bit位置和数据域参数个数的提取。
#需求
输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包的最后一个参数的bit位置和数据域参数个数。
""" + """
#约束
- 遥测源包的内容在一个表格中定义,表格结束则包内容结束;
- 数据域中每一行对应一个参数;
- 不要跨表格提取;
- 字节位置中字节位置是从1开始的,bit位置是从0开始的;
- bit位置计算公式为:(N-1)*8+B,其中N是字节数,B是bit数;
- 仅输出json,不要输出其他任何字符。
#例子:
{"last_par_pos":128, "par_num": 20}
"""
            text = self.generate_text_json(_msg, '', files=files)
            result = json.loads(text)
            last_par_pos = result['last_par_pos']
            par_num = result['par_num']
            _msg = f"""
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中提取遥测源包信息列表,你要帮我完成遥测源包信息列表的提取。
#需求
输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包。
注意:最后一个参数的起始bit偏移位置为{last_par_pos}。
""" + """
遥测包字段包括:名称(name)、代号(id)、类型(type)、包头属性列表(headers)、数据域参数列表(datas),类型为 linear;
包头属性字段包括:名称(name)、代号(id)、位置(pos)、定义(content)、长度(length)、类型(type),类型为 para;
包头的属性的字段包括:名称(name)、代号(id)、位置(pos)、定义(content)、长度(length)、类型(type),类型为 para;
数据域参数字段包括:参数名称(name)、参数代号(id)、位置(pos)、长度(length)、字节顺序(byteOrder),类型为 para;
如果没有名称用代号代替,如果没有代号用名称的英文翻译代替,翻译尽量简短;
你需要理解数据包的位置信息,并且将所有输出单位统一转换为 bits,位置字段的输出格式必须为数值类型;
数据结构仅只包含遥测包,仅输出json,不要输出任何其他内容。"""
包头属性包括:包版本号、包类型、副导头标识、应用过程标识、序列标记、包序列计数、包长、服务、子服务。
包头属性的长度:包版本号(3)、包类型(1)、副导头标识(1)、应用过程标识(11)、序列标记(2)、包序列计数(14)、包长(16)、服务(8)、子服务(8)。
表格单元格合并说明:包格中存在单元格合并的情况,如果水平或垂直相邻的单元格内容一样那么这几个内容一样的单元格有可能是一个合并单元格在分析时应该当作合并单元格分析。
#约束
- 代号命名规则:数字、英文字母和下划线组成且以英文字母和下划线开头;
- 如果没有名称用代号代替,如果没有代号用名称的英文翻译代替,翻译尽量简短;
- 如果有代号严格依照文档中的代号,文档中的代号如果不符合代号命名规则将特殊字符转换为下划线,例如:Rsv-1转换为Rsv_1;
- 你需要理解数据包的位置信息,由位置信息得到长度,并且将所有输出单位统一转换为 bits;
- pos字段:数值类型,从0开始计算,由长度(length)累加得到;
- 应用过程标识:如果不是十六进制转换为十六进制,转换完成后要验证是否正确,以0x开头,;
- 包头后面的每一行都对应一个参数,逐行输出参数,不要遗漏任何参数;
- 类似”保留(Rsv)“的行也要当参数生成;
- 重复的行也要生成;
- 注意包内容的范围,不要提取到其他包中的内容,包内容都在同一个表格中;
- 字节顺序:值为大端“B”,小端“L”,默认为“B”;
- 输出严格按照文档中的内容生成,不要创造文档中不存在的内容;
- 仅输出json,不要输出任何其他内容。
#例子
{
  "name": "数管缓变遥测包",
  "id": "PMS003",
  "type": "linear",
  "headers": [
    {
      "name": "包标识",
      "id": "packetIdentifier",
      "pos": 0,
      "content": "000",
      "length": 8,
      "type": "para"
    }
  ],
  "datas": [
    {
      "name": "XXX包",
      "id": "XXX",
      "pos": 0,
      "length": 8,
      "byteOrder": ""
    }
  ]
"""
            print(f'遥测源包“{pkt_name}”信息:')
            def validation(gen_text):
                _pkt = json.loads(gen_text)
                with open(f'out/tmp/{time.time()}.json', 'w') as f:
                    f.write(gen_text)
                assert 'headers' in _pkt, '包结构中必须包含headers字段'
                assert 'datas' in _pkt, '包结构中必须包含datas字段'
                print(f'参数个数:{len(_pkt["datas"])}')
                # assert par_num == len(_pkt['datas']), f'数据域参数个数不对!预计{par_num}个,实际{len(_pkt["datas"])}'
                assert last_par_pos == _pkt['datas'][-1]['pos'], '最后一个参数的字节位置不对!'
            text = self.generate_text(_msg, cache_file, [], files, validation)
            text = self.generate_text_json(_msg, cache_file, [], files, validation)
            pkt = json.loads(text)
        else:
            pkt = json.loads(read_from_file(cache_file))
@@ -684,23 +876,61 @@
        return pkt
    def gen_pkts(self):
        _msg = f"""
输出文档中描述的遥测包。
遥测包字段包括:名称(name)、代号(id)、hasParams,
名称中不要包含代号,
hasParams表示当前遥测包是否有参数列表,遥测包的参数表紧接着遥测包章节标题,如果章节标题后面省略了或者详见xxx则是没有参数表,
如果没有代号用名称的英文翻译代替,如果没有名称用代号代替,
数据结构最外层为数组数组元素为遥测包,不包括遥测包下面的参数。
        _msg = """
#角色
你是一名资深软件工程师。
#指令
我需要从文档中提取遥测包数据,你要根据文档内容帮我完成遥测包数据的提取。
#需求
输出文档中描述的遥测包列表,遥测包字段包括:名称(name)、代号(id)、是否有参数(hasParams)。
字段描述:
1.名称:遥测包的名称;
2.代号:遥测包的代号;
3.是否有参数:表示当前遥测包是否有参数列表,遥测包的参数表紧接着遥测包章节标题,如果章节标题后面省略了或者类似”详见xxx“则是没有参数表。
#约束
- name:名称中不要包含代号,仅从文档中提取源包名称;
- hasParams:值为布尔值,true或false;
- 如果没有代号,使用遥测包名称的英文翻译代替;
- 如果没有名称用代号代替;
- 不要漏掉任何遥测包;
- 数据结构最外层为数组数组元素为遥测包,不包括遥测包下面的参数。
#例子
[
    {
        "name": "数管数字量快速源包",
        "id": "PMS001",
        "hasParams": true
    }
]
"""
        print(f'遥测源包列表:')
        files = [file_map['遥测源包设计报告']]
        text = self.generate_text(_msg, 'out/源包列表.json', [], files)
        text = self.generate_text_json(_msg, 'out/源包列表.json', [], files)
        pkt = json.loads(text)
        return pkt
    def gen_pkt_vc(self):
        _msg = f"""
根据遥测源包下传时机定义,输出各个遥测源包信息列表,顶级结构为数组元素为遥测源包,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs),下传时机(timeTags)
        _msg = """
#角色
你是一名资深软件工程师。
#指令
我需要从文档中提取遥测源包信息,你要帮助我完成遥测源包信息的提取。
#需求
根据”遥测源包下传时机定义“章节的内容输出各个遥测源包信息列表,顶级结构为数组元素为遥测源包,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs),下传时机(timeTags)。
#约束
- 从”遥测源包下传时机定义“章节中提取遥测源包信息;
- 所属虚拟信道:必须是文档中描述的遥测虚拟信道代号(序号);
- 下传时机:与表格中定义的一致;
- 不要遗漏任何遥测源包。
#例子:
[
  {
    "id": "PMS001",
    "name": "数管数字量快速源包",
    "vcs": ["VC1"],
    "timeTags": ["实时"]
  },
]
        """
        files = [file_map['遥测大纲']]
        print('遥测源包所属虚拟信道:')
@@ -709,19 +939,51 @@
            pkts = json.loads(gen_text)
            assert len(pkts), 'VC源包列表不能为空'
        text = self.generate_text(_msg, 'out/遥测VC源包.json', files=files, validation=validation)
        text = self.generate_text_json(_msg, 'out/遥测VC源包.json', files=files, validation=validation)
        pkt_vcs = json.loads(text)
        return pkt_vcs
    def gen_pkt_format(self):
        _msg = f"""
请仔细分系文档,输出各个数据包的格式,数据结构最外层为数组,数组元素为数据包格式,将主导头的子级提升到主导头这一级并且去除主导头,数据包type为logic,包数据域type为any。
包格式children包括:版本号(id:Ver)、类型(id:TM_Type)、副导头标志(id:Vice_Head)、应用过程标识符(id:Proc_Sign)、分组标志(id:Group_Sign)、包序列计数(id:Package_Count)、包长(id:Pack_Len)、数据域(id:EPDU_DATA)。
children元素的字段包括:name、id、pos、length、type
注意:生成的JSON语法格式要合法。
        _msg = """
#角色
你是一名资深软件工程师。
#指令
我需要从文档中提取数据包的格式,你要帮助我完成数据包格式的提取。
#需求
请仔细分系文档,输出各个数据包的格式。
数据结构最外层为数组,数组元素为数据包格式,将主导头的子级提升到主导头这一级并且去除主导头,数据包type为logic,包数据域type为any。
包格式字段包括:名称(name)、代号(id)、类型(type)、子级(children)。
children元素的字段包括:name、id、pos、length、type。
children元素包括:版本号(Ver)、类型(TM_Type)、副导头标志(Vice_Head)、应用过程标识符(Proc_Sign)、分组标志(Group_Sign)、包序列计数(Package_Count)、包长(Pack_Len)、数据域(EPDU_DATA)。
#约束
- 生成的JSON语法格式要合法。
#例子
{
        "name": "实时遥测参数包",
        "id": "EPDU",
        "type": "logic",
        "children": [
            {
                "name": "版本号",
                "id": "Ver",
                "pos": 0,
                "length": 3,
                "type": "para",
                "content": "0",
                "dataTy": "INVAR"
            },
            {
                "name": "数据域",
                "id": "EPDU_DATA",
                "pos": 3,
                "length": "变长",
                "type": "any"
            }
        ]
}
"""
        print('遥测包格式:')
        text = self.generate_text(_msg, 'out/数据包格式.json', files=[file_map['遥测大纲']])
        text = self.generate_text_json(_msg, 'out/数据包格式.json', files=[file_map['遥测大纲']])
        pkt_formats = json.loads(text)
        return pkt_formats
@@ -738,24 +1000,52 @@
        # node['length'] = length
    def gen_bus(self, proj_pk, rule_enc, rule_id, ds, name_path, dev_name):
        _msg = f"""
        _msg = """
#角色
你是一名资深的软件工程师
#指令
我需要从文档中提取经总线的数据包列表,你要帮助我完成经总线的数据包列表的提取。
#需求
请析文档,列出总线通信包传输约定中描述的所有数据包列表,
数据包字段包括:id、name、apid(16进制字符串)、service(服务子服务)、length(bit长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、
transSer(传输服务)、note(备注)、rtAddr(所属RT的地址十进制)、rt(所属rt名称)、throughBus(是否经过总线)、burst(是否突发)、transDirect(传输方向),
数据结构最外层是数组,数组元素为数据包,以JSON格式输出,不要输出JSON以外的任何文本。
通信帧号:使用文档中的文本不要做任何转换。
subAddr:值为“深度”、“平铺”、“数字”或null。
是否经过总线的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线。
传输服务分三种:SetData(置数)、GetData(取数)、DataBlock(数据块传输)。
传输方向分:”收“和”发“,传输服务如果是”取数“是”收“,如果是”数据块传输“则根据包所在的分系统以及表格的”传输方向“列进行判断,判断对于SMU来说是收还是发。
是否突发的判断依据:根据表格中的”传输周期“列进行判断,如果填写了类似”突发“的文字表示是突发否则表示不是突发。
数据包字段包括:id(数据包代号)、name(数据包名称)、apid(16进制字符串)、service(服务子服务)、length(bit长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、
transSer(传输服务)、note(备注)、rtAddr(所属RT的地址十进制)、rt(所属rt名称)、throughBus(是否经过总线)、burst(是否突发)、transDirect(传输方向)。
#约束
- frameNum:使用文档中的文本不要做任何转换;
- subAddr:值为“深度”、“平铺”、“数字”或null;
- 是否经过总线的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线;
- 传输服务分三种:SetData(置数)、GetData(取数)、DataBlock(数据块传输);
- 传输方向分”收“和”发“,传输服务如果是”取数“是”收“,如果是”数据块传输“则根据包所在的分系统以及表格的”传输方向“列进行判断,判断对于SMU来说是收还是发;
- 是否突发:根据表格中的”传输周期“列进行判断,如果填写了类似”突发“的文字表示是突发否则表示不是突发;
- 不要漏掉任何一个数据包;
- 数据结构最外层是数组,数组元素为数据包,以JSON格式输出,不要输出JSON以外的任何文本。
#例子
[
    {
        "id": "PCS005",
        "name": "总线管理(内部指令)",
        "apid": "418",
        "service": "(1, 2)",
        "length": 1,
        "interval": 1000,
        "subAddr": null,
        "frameNum": "1|2",
        "transSer": "DataBlock",
        "note": "",
        "rtAddr": 28,
        "rt": "数据接口单元XIU",
        "throughBus": true,
        "burst": true,
        "transDirect": "发"
    }
]
"""
        print('总线数据包:')
        def validation(gen_text):
            json.loads(gen_text)
        text = self.generate_text(_msg, 'out/总线.json', files=[file_map['总线传输通信帧分配']], validation=validation)
        text = self.generate_text_json(_msg, 'out/总线.json', files=[file_map['总线传输通信帧分配']],
                                       validation=validation)
        pkts = json.loads(text)
        # 筛选经总线的数据包
        pkts = list(filter(lambda it: it['throughBus'], pkts))
@@ -854,43 +1144,85 @@
    def gen_tc(self):
        # 数据帧格式
        frame = self.gen_tc_transfer_frame()
        # 数据包格式
        pkt_format = self.gen_tc_transfer_pkt()
        # 数据包列表
        frame = self.gen_tc_transfer_frame_format()
        # 遥控包格式
        pkt_format = self.gen_tc_pkt_format()
        # 遥控包列表
        pkts = self.gen_tc_transfer_pkts()
        for pkt in pkts:
            pf = json.loads(json.dumps(pkt_format))
            # 遥控包数据区内容
            self.gen_tc_pkt_details(pkt)
            pkt['type'] = 'insUnit'
            format_text = json.dumps(pkt_format, ensure_ascii=False)
            format_text = utils.replace_tpl_paras(format_text, pkt)
            pf = json.loads(format_text)
            pf['name'] = pkt['name']
            ph = next(filter(lambda x: x['name'] == '主导头', pf['children']), None)
            apid = next(filter(lambda x: x['name'] == '应用进程标识符(APID)', ph['children']), None)
            apid['value'] = pkt['apid']
            apid['type'] = 'const'
            sh = next(filter(lambda x: x['name'] == '副导头', pf['children']), None)
            ser = next(filter(lambda x: x['name'] == '服务类型', sh['children']), None)
            sub_ser = next(filter(lambda x: x['name'] == '服务子类型', sh['children']), None)
            ser['value'] = pkt['server']
            ser['type'] = 'const'
            sub_ser['value'] = pkt['subServer']
            sub_ser['type'] = 'const'
            pf['code'] = pkt['code']
            data_area = next(filter(lambda x: x['name'] == '应用数据区', pf['children']))
            data_area['children'].append(pkt)
            frame['subPkts'].append(pf)
        self.order = 0
        def build_def(item: dict):
            if item['type'] == 'enum':
                return json.dumps({"EnumItems": item['enums'], "CanInput": True})
            if item['type'] in ['enum', 'sendFlag']:
                if isinstance(item['enums'], str):
                    enums = json.loads(item['enums'])
                else:
                    enums = item['enums']
                return json.dumps({"EnumItems": enums, "CanInput": True}, ensure_ascii=False)
            elif item['type'] == 'length':
                return None
            elif item['type'] == 'checkSum':
                return json.dumps({"ChecksumType": "CRC-CCITT"})
            elif item['type'] == 'subPkt':
                return json.dumps({"CanInput": False})
            elif item['type'] == 'combPkt':
            elif item['type'] in ['combPkt', 'insUnitList', 'input']:
                return None
            elif item['type'] == 'insUnit':
                return '{"MinLength":null,"MaxLength":null,"IsSubPackage":false,"InputParams":[],"OutPutParams":[],"MatchItems":[]}'
            elif item['type'] == 'pkt':
                return '''{"MaxLength":1024,"IsSplit8":false,"Split8Start":null,"Split8End":null,"PadCode":null,"Alignment":null,"InputParams":[],"OutPutParams":[],"MatchItems":[]}'''
            elif 'value' in item:
                return item['value']
        def create_tc_format(parent_pk, field):
        def make_attr(ty: str):
            """
            获取字段定义的ATTR。
            位掩码,用于标识节点类型。
            类型:0~2 BinaryType;
                3~5 DataType;
                6~8: InputFormat;
                9 : IsSubPackage;
                10: IsSendFlag;
                11~13: ProcessMethod;
                14~16: ExpressionType;
                17~19: EnumType
            :param ty:
            :return:
            """
        def create_tc_format(parent_pk, field, parent_parent_pk=None):
            """
            创建遥控格式
            数据库数据结构:
            帧字段 parent_pk=null, pk=pk_001, type=1
                匿名字段(子包) parent_pk=pk_001, pk=pk_002, type=22
                    字段1 parent_pk=pk_002, pk=pk_003, type=15
                    字段2 parent_pk=pk_002, pk=pk_004, type=15
                包字段 parent_pk=pk_001, pk=pk_005, type=1
                    匿名字段(子包) parent_pk=pk_005, pk=pk_006, type=22
                        字段3 parent_pk=pk_006, pk=pk_007, type=15
                    指令单元 parent_pk=pk_005, pk=pk_007, type=4
                        字段4 parent_pk=pk_007, pk=pk_008, type=15
            :param parent_pk: 父级pk
            :param field: 格式字段
            :param parent_parent_pk: 父级的父级pk
            :return:
            """
            field['order'] = self.order
            self.order += 1
            field['def'] = build_def(field)
@@ -898,89 +1230,61 @@
                field['bitWidth'] = field['length']
            field['bitOrder'] = None
            field['attr'] = 0
            if field['type'] == 'length':
            if field['type'] == 'length' and 'value' in field and field['value']:
                val = field['value']
                field['range'] = val['start'] + "~" + val['end']
                field['formula'] = val['formula']
            ins_format = create_ins_format(self.proj.C_PROJECT_PK, parent_pk, field)
            ins_format_pk = ins_format.C_INS_FORMAT_PK
            if 'children' in field:
                autocode = 1
                if field['type'] == 'pkt':
                    ins_format = create_ins_format(self.proj.C_PROJECT_PK, ins_format.C_INS_FORMAT_PK,
                                                   {'order': self.order, 'type': 'subPkt',
                                                    'def': json.dumps({"CanInput": False})})
                    info = {
                        'order': self.order,
                        'type': 'subPkt',
                        'def': json.dumps({"CanInput": False})
                    }
                    ins_format = create_ins_format(self.proj.C_PROJECT_PK, ins_format_pk, info)
                    self.order += 1
                for child in field['children']:
                    child['autocode'] = autocode
                    autocode += 1
                    create_tc_format(ins_format.C_INS_FORMAT_PK, child)
            # if 'subPkts' in field:
            #     for pkt in field['subPkts']:
            #         ins_format = create_ins_format(self.proj.C_PROJECT_PK, ins_format.C_INS_FORMAT_PK,
            #                                        {'order': self.order, 'type': 'subPkt',
            #                                         'def': json.dumps({"CanInput": False})})
            #         create_tc_format(ins_format.C_INS_FORMAT_PK, pkt)
                    if field['type'] == 'insUnitList':
                        _parent_pk = parent_parent_pk
                    else:
                        _parent_pk = ins_format.C_INS_FORMAT_PK
                    create_tc_format(_parent_pk, child, ins_format_pk)
            if 'subPkts' in field:
                for _pkt in field['subPkts']:
                    create_tc_format(ins_format_pk, _pkt, parent_pk)
        create_tc_format(None, frame)
    def gen_tc_transfer_frame(self):
    def gen_tc_transfer_frame_format(self):
        _msg = '''
分析YK传送帧格式,提取YK传送帧的数据结构,不包括数据包的数据结构。
## 经验:
字段类型包括:
1.组合包:combPkt,
2.固定码字:const,
3.长度:length,
4.枚举值:enum,
5.校验和:checkSum,
6.数据区:subPkt。
根据字段描述分析字段的类型,分析方法:
1.字段描述中明确指定了字段值的,类型为const,
2.字段中没有明确指定字段值,但是罗列了取值范围的,类型为enum,
3.字段描述中如果存在多层级描述则父级字段的类型为combPkt,
4.字段如果是和“长度”有关,类型为length,
5.如果和数据域有关,类型为subPkt,
6.字段如果和校验和有关,类型为checkSum。
字段值提取方法:
1.字段描述中明确指定了字段值,
2.长度字段的值要根据描述确定起止字段范围以及计算公式,value格式例如:{"start":"<code>","end":"<code>","formula":"N-1"},注意:start和end的值为字段code。
## 限制:
- length 自动转换为bit长度。
- value 根据字段描述提取。
- enums 有些字段是枚举值,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""}。
- 输出内容必须为严格的json,不能输出除json以外的任何内容。
字段数据结构:
主导头
    版本号、通过标志、控制命令标志、空闲位、HTQ标识、虚拟信道标识、帧长、帧序列号
传送帧数据域
帧差错控制域。
# 输出内容例子:
# 角色
你是一名资深的软件工程师。
# 指令
分析遥控传送帧格式,提取遥控传送帧格式的字段定义。
# 需求
要提取值的帧格式字段:
- 版本号:const,二进制,以B结尾;
- 通过标志:const,二进制,以B结尾;
- 控制命令标志:const,二进制,以B结尾;
- 空闲位:const,二进制,以B结尾;
- 航天器标识:const,十六进制,以0x开头;
- 虚拟信道标识:sendFlag,发送标记,默认为“任务注入帧”,所有的值都要列举出来;
# 数据类型
- const:固定码字,数值,二进制以B结尾,十进制,十六进制以0x开头;
- sendFlag:发送标记,类似枚举,定义样例:[{"n":"name","v":"value","c":"code","default":true}],n表示名称,v表示值,c表示code(没有空着),default表示是默认值;
# 约束
- 以JSON格式输出;
- 仅输出JSON文本,不要输出任何其他文本。
# 输出例子:
{
    "name": "YK帧",
    "type": "pkt"
    "children":[
        {
            "name": "主导头",
            "code": "primaryHeader",
            "length": 2,
            "value": "00",
            "type": "combPkt",
            "children": [
                {
                    "name": "版本号",
                    "code": "verNum"
                    "length": 1,
                    "value": "00"
                }
            ]
        }
    ],
    "subPkts":[]
    "版本号": "00B",
    "通过标志": "0",
    ...
}
'''
@@ -989,80 +1293,39 @@
        text = self.generate_tc_text(_msg, 'out/tc_transfer_frame.json', files=[file_map['指令格式']],
                                     validation=validation)
        frame = json.loads(text)
        result: dict = json.loads(text)
        format_text = utils.read_from_file('tpl/tc_transfer_frame.json')
        format_text = utils.replace_tpl_paras(format_text, result)
        frame = json.loads(format_text)
        return frame
    def gen_tc_transfer_pkt(self):
    def gen_tc_pkt_format(self):
        _msg = '''
仅分析YK包格式,提取YK包数据结构。
## 经验:
字段类型包括:
1.组合包:combPkt,
2.固定码字:const,
3.长度:length,
4.枚举值:enum,
5.校验和:checkSum,
6.数据区:subPkt。
根据字段描述分析字段的类型,分析方法:
1.字段描述中明确指定了字段值的,类型为const,
2.字段中没有明确指定字段值,但是罗列了取值范围的,类型为enum,
3.字段描述中如果存在多层级描述则父级字段的类型为combPkt,
4.字段如果是和“长度”有关,类型为length,
5.如果和数据域有关,类型为subPkt,
6.字段如果和校验和有关,类型为checkSum。
字段值提取方法:
1.字段描述中明确指定了字段值,
2.长度字段的值要根据描述确定起止字段范围以及计算公式,value格式例如:{"start":"<code>","end":"<code>","formula":"N-1"},注意:start和end的值为字段code。
## 限制:
- length 自动转换为bit长度。
- value 根据字段描述提取。
- enums 有些字段是枚举值,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""}。
- 输出内容必须为严格的json,不能输出除json以外的任何内容。
字段数据结构:
主导头
    包识别
        包版本号、包类型、数据区头标志、应用进程标识符(APID)
    包序列控制
        序列标志
        包序列计数
    包长
副导头
    CCSDS副导头标志
    YK包版本号
    命令正确应答(Ack)
    服务类型
    服务子类型
    源地址
应用数据区
帧差错控制域。
# 输出内容例子:
# 角色
你是一名资深的软件工程师。
# 指令
分析遥控包格式,提取遥控包格式的字段定义。
# 需求
要提取值的包格式字段:
- 包版本号: const,二进制;
- 包类型: const,二进制;
- 数据区头标志: const,二进制;
- 序列标志: const,二进制;
- 包长:length,
- 副导头标志: const,二进制;
- 遥控包版本号: const,二进制;
- 命令正确应答: const,二进制;
- 源地址: const,十六进制。
# 数据类型
- const:固定码字,数值,二进制以B结尾,十进制,十六进制以0x开头;
# 约束
- 以JSON格式输出;
- 仅输出JSON文本,不要输出任何其他文本。
# 输出例子:
{
    "name": "YK包",
    "type": "pkt"
    "children":[
        {
            "name": "主导头",
            "code": "primaryHeader",
            "length": 2,
            "value": "00",
            "type": "combPkt",
            "children": [
                {
                    "name": "版本号",
                    "code": "verNum"
                    "length": 1,
                    "value": "00"
                }
            ]
        }
    ],
    "subPkts":[]
    "包版本号": "00B",
    "包类型": "1B",
    ...
}
'''
@@ -1071,19 +1334,26 @@
        text = self.generate_tc_text(_msg, 'out/tc_transfer_pkt.json', files=[file_map['指令格式']],
                                     validation=validation)
        pkt_format = json.loads(text)
        result = json.loads(text)
        format_text = utils.read_from_file('tpl/tc_pkt_format.json')
        format_text = utils.replace_tpl_paras(format_text, result)
        pkt_format = json.loads(format_text)
        return pkt_format
    def gen_tc_transfer_pkts(self):
        _msg = '''
# 角色
你是一名资深的软件工程师。
# 指令
分析文档列出所有的遥控源包。
## 数据结构如下:
# 输出例子:
[{
"name": "xxx",
"code":"pkt",
"apid":"0xAA",
"server":"0x1",
"subServer":"0x2"
"应用过程标识符":"0xAA",
"服务类型":"0x1",
"服务子类型":"0x2"
}]
'''
@@ -1095,10 +1365,67 @@
        pkts = json.loads(text)
        return pkts
    def gen_tc_pkt_details(self, pkt):
        result = []
        tc_name = pkt['name']
        tc_code = pkt['code']
        pkt['name'] = f'{tc_code} {tc_name}'
        _msg = f"""
# 角色
你是一个资深软件工程师。
# 指令
分析文档,从文档中提取遥控指令名称为“{tc_name}”代号为“{tc_code}”的指令应用数据区定义。
""" + """
# 约束
- code 如果没有明确定义则使用名称的英文翻译,尽量简短;
- length 自动转换为bit长度,必须是数值或null,不能为0;
- value 根据字段描述提取;
- enums 有些字段是枚举值,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""};
- 输出内容必须为严格的json,不能输出除json以外的任何内容。
# 字段类型
- 固定码字:const,
- 长度:length,
- 枚举值:enum,
- 校验和:checkSum,
- 即时输入:input。
# 字段类型分析方法
- 根据字段描述分析字段的类型;
- 字段描述中明确指定了字段值的,类型为const;
- 字段中没有明确指定字段值,但是罗列了取值范围的,类型为enum;
- 字段如果是和“长度”有关,类型为length;
- 如果和数据域有关,类型为const;
- 字段如果和校验和有关,类型为checkSum。
# 输出例子:
[
    {
        "name": "para1",
        "code": "para1",
        "length": 8,
        "type": "const",
        "value": "0xAA"
    }
    ...
]
"""
        def validation(gen_text):
            json.loads(gen_text)
        text = self.generate_tc_text(_msg, f'out/遥控指令数据域-{tc_code}-{utils.to_file_name(tc_name)}.json',
                                     files=[file_map['指令格式']],
                                     validation=validation)
        result = json.loads(text)
        pkt['children'] = result
if __name__ == '__main__':
    try:
        os.makedirs("./out/pkts", exist_ok=True)
        os.makedirs("./out/tmp", exist_ok=True)
        # 启动大模型处理流程
        ret_text = DbStructFlow().run()
    except KeyboardInterrupt: