import os import time from datetime import datetime from openai import OpenAI import re import json from langchain_community.chat_models import ChatOpenAI from langchain_core.prompts import HumanMessagePromptTemplate, SystemMessagePromptTemplate import data_templates from knowledgebase import utils from knowledgebase.db.db_helper import create_project, create_device, create_data_stream, \ update_rule_enc, create_extend_info, create_ref_ds_rule_stream, create_ins_format, make_attr from knowledgebase.db.data_creator import create_prop_enc, create_enc_pkt, get_data_ty, create_any_pkt from knowledgebase.db.models import TProject from knowledgebase.db.doc_db_helper import doc_dbh # file_map = { # # "遥测源包设计报告": "./doc/HY-4A数管分系统遥测源包设计报告 Z 240824 更改3(内部) .docx.md", # # "遥测源包设计报告": "./doc/数管数字量快速源包.md", # # "遥测源包设计报告": "./doc/数管数字量中速源包.md", # # "遥测源包设计报告": "./doc/硬通道设备工作状态数据包.md", # # "遥测源包设计报告": "./doc/DIU遥测模块采集的DS量4.md", # "遥测源包设计报告": "./doc/DIU遥测模块模拟量.md", # "遥测大纲": "./doc/HY-4A卫星遥测大纲 Z 240824 更改3(内部).docx.md", # # "总线传输通信帧分配": "./doc/HY-4A卫星1553B总线传输通信帧分配 Z 240824 更改3(内部).docx.md", # "总线传输通信帧分配": "./doc/总线.md", # "应用软件用户需求": "./doc/HY-4A数管分系统应用软件用户需求(星务管理分册) Z 240831 更改4(内部).docx.md" # } # file_map = { # "遥测源包设计报告": "./docs/HY-4A数管分系统遥测源包设计报告 Z 240824 更改3(内部) .docx.md", # "遥测大纲": "./docs/HY-4A卫星遥测大纲 Z 240824 更改3(内部).docx.md", # "总线传输通信帧分配": "./docs/HY-4A卫星1553B总线传输通信帧分配 Z 240824 更改3(内部).docx.md", # "应用软件用户需求": "./docs/HY-4A数管分系统应用软件用户需求(星务管理分册) Z 240831 更改4(内部).docx.md" # } file_map = { "文档合并": "./doc/文档合并.md", "遥测源包设计报告": "./doc/XA-5D无人机分系统探测源包设计报告(公开).md", "遥测大纲": "./doc/XA-5D无人机探测大纲(公开).md", "总线传输通信帧分配": "./doc/XA-5D无人机1314A总线传输通信帧分配(公开).md", "指令格式": "./doc/ZL格式(公开).docx.md" } BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1' API_KEY = 'sk-15ecf7e273ad4b729c7f7f42b542749e' MODEL_NAME = 'qwen2.5-72b-instruct' # BASE_URL = 'http://10.74.15.164:11434/v1/' # API_KEY = 'ollama' # MODEL_NAME = 'qwen2.5:32b-128k' # BASE_URL = 'http://chat.com/api' # API_KEY = 'sk-49457e83f734475cb4cf7066c649d563' # MODEL_NAME = 'qwen2.5:72b-120k' # BASE_URL = 'http://10.74.15.171:8000/v1' # API_KEY = 'EMPTY' # MODEL_NAME = 'QwQ:32b' # MODEL_NAME = 'vllm-Qwen-72b-4bit' USE_CACHE = True assistant_msg = """ # 角色 你是一名资深的软件工程师,擅长进行文档分析和通信协议分析,同时能够解析 markdown 类型的文档。拥有成熟准确的文档阅读与分析能力,能够妥善处理多文档间存在引用关系的复杂情况。 ## 技能 ### 技能 1:文档分析(包括 markdown 文档) 1. 当用户提供文档时,仔细阅读文档内容,严格按照文档中的描述提取关键信息,不得加入自己的回答或建议。 2. 分析文档的结构、主题和重点内容,同样只依据文档进行表述。 3. 如果文档间存在引用关系,梳理引用脉络,明确各文档之间的关联,且仅呈现文档中体现的内容。 ### 技能 2:通信协议分析 1. 接收通信协议相关信息,理解协议的规则和流程,仅依据所给信息进行分析。 ## 背景知识 ###软件主要功能与运行机制总结如下: 1. 数据采集和处理: DIU负责根据卫星的工作状态或模式提供遥测数据,包括模拟量(AN)、总线信号(BL)以及温度(TH)和数字量(DS),并将这些信息打包,通过总线发送给SMU。 SMU则收集硬通道上的遥测参数,并通过总线接收DIU采集的信息。 2. 多路复用与数据传输: 遥测源包被组织成E-PDU,进一步复用为M-PDU,并填充到VCDU中构成遥测帧。 利用CCSDS AOS CADU格式进行遥测数据的多路复用和传输。 3. 虚拟信道(VC)调度机制: 通过常规遥测VC、突发数据VC、延时遥测VC、记录数据VC以及回放VC实现不同类型的数据下传。 4. 遥控指令处理: 上行遥控包括直接指令和间接指令,需经过格式验证后转发给相应单机执行。 遥控帧通过特定的虚拟信道(VC)进行传输。 这些知识需要你记住,再后续的处理中可以帮助你理解要处理的数据。 ## 目标导向 1. 通过对文档和通信协议的分析,为用户提供清晰、准确的数据结构,帮助用户更好地理解和使用相关信息。 ## 规则 1. 每一个型号都会有一套文档,需准确判断是否为同一个型号的文档后再进行整体分析,每次只分析同一个型号的文档。 2. 大多数文档结构为:型号下包含设备,设备下包含数据流,数据流下包含数据帧,数据帧中有一块是包域,包域中会挂载各种类型的数据包。 3. 文档都是对于数据传输协议的描述,在数据流、数据帧、数据包等传输实体中都描述了各个字段的分布、各个字段的大小和位置等信息,且大小单位不统一,需理解这些单位,并将所有输出单位统一为 bits,长度字段使用 length 表示,位置字段使用 pos 表示,如果为变长使用“"变长"”表示。 4. 如果有层级,使用树形 JSON 输出,如果有子节点,子节点 key 使用children;需保证一次输出的数据结构统一,并且判断每个层级是什么类型,输出类型字段(type),类型字段的 key 使用 type,类型包括:型号(project)、设备(dev)、封装包(enc)、线性包(linear)、参数(para),封装包子级有数据包,所以type为enc,线性包子级只有参数,所以type为linear;每个层级都包含偏移位置(pos),每个层级的偏移位置从0开始。 5. 名称相关的字段的 key 使用name;代号、编号或者唯一标识相关的字段的key使用id,id由数字、英文字母、下划线组成且以英文字母开头,长度尽量简短;序号相关的字段的key使用number;偏移位置相关字段的key使用pos;其他没有举例的字段使用精简的翻译作为字段的key;每个结构必须包含name和id。 6. 遥测帧为CADU,其中包含同步头和VCDU,按照习惯需要使用VCDU层级嵌套传输帧主导头、传输帧插入域、传输帧数据域、传输帧尾的结构。 7. 数据包字段包括:name、id、type、pos、length、children;参数字段包括:name、id、pos、type、length;必须包含pos和length字段。 8. 常用id参考:遥测(TM)、遥控(TC)、总线(BUS)、版本号(Ver)、应用过程标识(APID)。 9. 注意:一定要记得morkdown文档中会将一些特殊字符进行转义,以此来保证文档的正确性,这些转义符号(也就是反斜杠‘\’)不需要在结果中输出。 10. 以 JSON 格式组织输出内容,确保数据结构的完整性和可读性,注意:生成的JSON语法格式必须符合json规范,避免出现错误。 ## 限制: - 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。 - 不输出任何注释等描述性信息。 """ tc_system_msg = """ # 角色 你是一个资深软件工程师。 # 约束 - 输出内容必须根据文档和问题回答,不要创造其他内容; - 输出内容必须是,JSON格式,不要输出其他文本。 """ g_completion = None def read_from_file(cache_file): with open(cache_file, 'r', encoding='utf-8') as f: text = f.read() return text def save_to_file(text, file_path): if USE_CACHE: with open(file_path, 'w', encoding='utf-8') as f: f.write(text) def remove_think_tag(text): pattern = r'(.|\n)*?' result = re.sub(pattern, '', text) return result json_pat = re.compile(r'```json(.*?)```', re.DOTALL) def get_json_text(text): # 使用正则表达式提取json文本 try: return json_pat.findall(text)[0] except IndexError: return text def rt_pkt_map_gen(pkt, trans_ser, rt_pkt_map, pkt_id, vals): # 逻辑封装包,数据块传输的只有一个,取数的根据RT地址、子地址和帧号划分 frame_num = pkt['frameNum'] if trans_ser == '数据块传输': # 数据块传输根据RT地址和子地址划分 key = f'{pkt["rt"]}_{pkt["subAddr"]}' name = f'{pkt["rt"]}_{pkt["subAddr"]}_{trans_ser}' else: # 取数根据RT地址、子地址和帧号划分 key = f'{pkt["rt"]}_{pkt["subAddr"]}_{pkt["frameNum"]}' name = f'{pkt["rt"]}_{pkt["subAddr"]}_帧号{frame_num}_{trans_ser}' # if key not in rt_pkt_map: rt_pkt_map[key] = { "name": name, "id": pkt_id, "type": "logic", "pos": 0, "content": "CYCLEBUFFER,Message,28,0xFFFF", "length": "", "vals": vals, "children": [] } frame = f'{pkt["frameNum"]}' interval = f'{pkt["interval"]}'.replace(".", "_") if trans_ser == '取数': _key = f'RT{pkt["rtAddr"]}Frame{frame.replace("|", "_")}_Per{interval}' else: # 数据块传输 if pkt['burst']: _key = f'RT{pkt["rtAddr"]}FrameALL' else: _key = f'RT{pkt["rtAddr"]}Frame{frame}Per{interval}' _pkt = next(filter(lambda it: it['name'] == _key, rt_pkt_map[key]['children']), None) if _pkt is None: ext_info = None if trans_ser == '数据块传输' and not pkt['burst']: # 数据块传输且有周期的包需要 ext_info = [{"id": "PeriodTriger", "name": "时分复用总线触发属性", "val": f"{pkt['interval']}"}, {"id": "FrameNumber", "name": "时分复用协议帧号", "val": frame}] _pkt = { "name": _key, "id": _key, "type": "enc", "pos": 0, "content": "1:N;EPDU", "length": "length", "extInfo": ext_info, "children": [ { "id": "C02_ver", "name": "遥测版本", "type": "para", "pos": 0, "length": 3, "dataTy": "INVAR", "content": "0" }, { "id": "C02_type", "name": "类型", "type": "para", "pos": 3, "length": 1, "dataTy": "INVAR", "content": "0" }, { "id": "C02_viceHead", "name": "副导头标识", "type": "para", "pos": 4, "length": 1, "content": "1", "dataTy": "INVAR" }, { "id": "C02_PackSign", "name": "APID", "type": "para", "pos": 5, "length": 11, "is_key": True, "dataTy": "ENUM" }, { "id": "C02_SerCtr_1", "name": "序列标记", "type": "para", "pos": 16, "length": 2, "content": "3" }, { "id": "C02_SerCtr_2", "name": "包序计数", "type": "para", "pos": 18, "length": 14, "content": "0:167772:1", "dataTy": "INCREASE" }, { "id": "C02_PackLen", "name": "包长", "type": "para", "pos": 32, "length": 16, "content": "1Bytes/C02_Data.length+1", "dataTy": "LEN" }, { "id": "C02_Ser", "name": "服务", "type": "para", "pos": 48, "length": 8, "is_key": True, "dataTy": "ENUM" }, { "id": "C02_SubSer", "name": "子服务", "type": "para", "pos": 56, "length": 8, "is_key": True, "dataTy": "ENUM" }, { "id": "C02_Data", "name": "数据区", "type": "linear", "pos": 64, "length": 'length-current', "children": [] }, ] } rt_pkt_map[key]['children'].append(_pkt) # 数据区下面的包 data_area = next(filter(lambda it: it['name'] == '数据区', _pkt['children']), None) ser_sub_ser: str = pkt['service'] ser = '' sub_ser = '' if ser_sub_ser: nums = re.findall(r'\d+', ser_sub_ser) if len(nums) == 2: ser = nums[0] sub_ser = nums[1] if 'children' not in pkt: pkt['children'] = [] p_name = pkt['id'] + '_' + pkt['name'] data_area['children'].append({ "name": p_name, "id": pkt["id"], "type": "linear", "pos": 0, "length": pkt["length"], "vals": f"0x{pkt['apid']}/{ser}/{sub_ser}/", "children": pkt['children'], }) def build_vcid_content(vcs): _vcs = [] for vc in vcs: _vcs.append(vc['name'] + ',' + vc['VCID']) return ' '.join(_vcs) class DbStructFlow: # 工程 proj: TProject = None # 遥测源包列表,仅包名称、包id和hasParams tm_pkts = [] # vc源包 vc_pkts = [] def __init__(self): self.client = OpenAI( api_key=API_KEY, base_url=BASE_URL, # api_key="ollama", # base_url="http://192.168.1.48:11434/v1/", ) # self.llm = ChatOpenAI(model=MODEL_NAME, temperature=0, api_key=API_KEY, base_url=BASE_URL) def run(self): # 生成型号结构 # 生成设备结构 # 生成数据流结构 CADU # 生成VCDU结构 # 生成遥测数据包结构 self.proj = self.gen_project() # devs = self.gen_device(self.proj) self.gen_tc() return '' def get_text_with_entity(self, entity_names: list[str]) -> str: """ 根据实体词获取文档文本 :param entity_names: str - 实体词名称 :return: str - 文本内容 """ return doc_dbh.get_text_with_entities(entity_names) def _gen(self, msgs, msg, doc_text): # if files is None: # files = [file_map['文档合并']] messages = [] if msgs is None else msgs # doc_text = '' # for file in files: # doc_text += '\n' + read_from_file(file) if len(messages) == 0: # 如果是第一次提问加入system消息 messages.append({'role': 'system', 'content': assistant_msg}) messages.append({'role': 'user', 'content': "以下是文档内容:\n" + doc_text}) messages.append({'role': 'user', 'content': msg}) completion = self.client.chat.completions.create( model=MODEL_NAME, messages=messages, stream=True, temperature=0.6, # top_p=0, timeout=30 * 60000, max_completion_tokens=32000, seed=0 # stream_options={"include_usage": True} ) g_completion = completion text = '' for chunk in completion: if chunk.choices[0].delta.content is not None: text += chunk.choices[0].delta.content print(chunk.choices[0].delta.content, end="") print("") g_completion = None return text def generate_text(self, msg, cache_file, msgs=None, doc_text=None, validation=None, try_cnt=5, json_text=False): if msgs is None: msgs = [] if USE_CACHE and os.path.isfile(cache_file): text = read_from_file(cache_file) else: s = time.time() text = self._gen(msgs, msg, doc_text) text = remove_think_tag(text) if json_text: text = get_json_text(text) if validation: try: validation(text) except BaseException as e: print(e) if try_cnt <= 0: raise RuntimeError('生成失败,重试次数太多,强制结束!') return self.generate_text_json(msg, cache_file, msgs, doc_text, validation, try_cnt - 1) if cache_file: save_to_file(text, cache_file) print(f'耗时:{time.time() - s}') return text def generate_text_json(self, msg, cache_file, msgs=None, doc_text=None, validation=None, try_cnt=5): return self.generate_text(msg, cache_file, msgs, doc_text, validation, try_cnt, True) def generate_tc_text(self, msg, cache_file, msgs=None, doc_text=None, validation=None, try_cnt=5): msgs = [ {'role': 'system', 'content': tc_system_msg}, {'role': 'user', 'content': "以下是文档内容:\n" + doc_text}] return self.generate_text(msg, cache_file, msgs, doc_text, validation, try_cnt, True) def gen_project(self): _msg = """ 根据文档内容输出分系统信息,分系统字段包括:名称和型号代号。仅输出分系统这一级。如果型号代号中有符号也要输出,保证输出完整。 例如:{"name":"xxx","id":"xxx"} """ print('型号信息:') doc_text = self.get_text_with_entity(['系统概述']) text = self.generate_text_json(_msg, 'out/型号信息.json', doc_text=doc_text) proj_dict = json.loads(text) code = proj_dict['id'] name = proj_dict['name'] proj = create_project(code, name, code, name, "", datetime.now()) return proj def gen_device(self, proj): """ 设备列表生成规则: 1.如文档中有1553协议描述,加入1553设备 2.如是类SMU软件(遥测遥控包含,BC或者RT),加入对应相关设备,文档只有设备名称和设备ID,设备类型90%是标准类型 3.如是类RTU软件,加入对应相关设备,文档里面有设备名称和设备ID,同上 4.如基于软平台,如是SMU软件,加入SMU工控机设备,待定 设备类型:工控机[0]、1553B[1] :param proj: :return: """ proj_pk = proj.C_PROJECT_PK devices = [] _msg = """ # 角色 你是一名资深软件工程师。 # 指令 我需要从文档提取设备列表信息,你要帮助我完成设备列表信息提取。 # 需求 输出分系统下的硬件产品(设备)列表,硬件产品名称一般会包含“管理单元”或者“接口单元”; # 字段包括: - 名称(name):设备名称; - 代号(code):设备代号; - 是否包含遥控遥测(hasTcTm):标识该硬件产品是否包含遥控遥测的功能,布尔值true或false; - 是否包含温度量模拟量等数据的采集(hasTemperatureAnalog):标识该硬件产品是否包含温度量等信息的采集功能,布尔值true或false; - 是否有总线硬件产品(hasBus):标识该设备是否属于总线硬件产品,是否有RT地址,布尔值true或false; # 约束 - 如果没有代号则使用名称的英文缩写代替缩写长度不超过5个字符; - 数据结构最外层为数组,数组元素为设备信息 - 仅输出JSON,不要输出JSON以外的任何字符。 # 例子 [ { "name": "系统管理单元", "code": "SMU", "hasTcTm": true, "hasTemperatureAnalog": false, "hasBus": true }, { "name": "1553B总线", "code": "1553", "hasTcTm": true, "hasTemperatureAnalog": true, "hasBus": true } ] """ print('设备列表:') cache_file = 'out/设备列表.json' def validation(gen_text): _devs = json.loads(gen_text) assert isinstance(_devs, list), '数据结构最外层不是数组' assert next(filter(lambda it: it['name'].endswith('管理单元'), _devs), None), '生成的设备列表中没有管理单元' doc_text = self.get_text_with_entity(['系统概述', '总线管理']) text = self.generate_text_json(_msg, cache_file, doc_text=doc_text, validation=validation) devs = json.loads(text) # 类SMU设备,包含遥测和遥控功能,名称结尾为“管理单元” like_smu_devs = list(filter(lambda it: it['hasTcTm'] and it['name'].endswith('管理单元'), devs)) for dev in like_smu_devs: dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK) devices.append(dev) # 创建数据流 ds_tmfl, rule_stream, _ = create_data_stream(proj_pk, dev.C_DEV_PK, 'AOS遥测', 'TMF1', 'TMFL', '1', 'TMF1', '001') self.gen_tm_frame(proj_pk, rule_stream.C_RULE_PK, ds_tmfl, rule_stream.C_PATH) # ds_tcfl, rule_stream, _ = create_data_stream(proj_pk, dev.C_DEV_PK, '遥控指令', 'TCFL', 'TCFL', '0', 'TCFL', # '006') hasBus = any(d['hasBus'] for d in devs) if hasBus: # 总线设备 dev = create_device("1553", "1553总线", '1', 'StandardProCommunicationDev', proj_pk) create_extend_info(proj_pk, "BusType", "总线类型", "ECSS_Standard", dev.C_DEV_PK) devices.append(dev) # 创建数据流 ds_u153, rs_u153, rule_enc = create_data_stream(proj_pk, dev.C_DEV_PK, '上行总线数据', 'U15E', 'B153', '0', '1553', '001') # 创建总线结构 self.gen_bus(proj_pk, rule_enc, '1553', ds_u153, rs_u153.C_PATH, dev.C_DEV_NAME) ds_d153, rule_stream, rule_enc = create_data_stream(proj_pk, dev.C_DEV_PK, '下行总线数据', 'D15E', 'B153', '1', '1553', '001', rs_u153.C_RULE_PK) create_ref_ds_rule_stream(proj_pk, rule_stream.C_STREAM_PK, rule_stream.C_STREAM_ID, rule_stream.C_STREAM_NAME, rule_stream.C_STREAM_DIR, rs_u153.C_STREAM_PK) # 类RTU设备,包含温度量和模拟量功能,名称结尾为“接口单元” # like_rtu_devs = list(filter(lambda it: it['hasTemperatureAnalog'] and it['name'].endswith('接口单元'), devs)) # for dev in like_rtu_devs: # dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK) # for dev in like_rtu_devs: # dev = create_device(dev['code'], dev['name'], '0', '', proj.C_PROJECT_PK) # devices.append(dev) # # 创建数据流 # ds_tmfl = create_data_stream(proj.C_PROJECT_PK, '温度量', 'TMFL', 'TMFL', '1', 'TMFL', '001') # ds_tcfl = create_data_stream(proj.C_PROJECT_PK, '模拟量', 'TCFL', 'TCFL', '0', 'TCFL', '006') return devices def gen_insert_domain_params(self): _msg = """ #角色 你是一名资深的软件工程师。 #指令 我需要从文档中提取插入域的参数列表,你要帮助我完成插入域参数列表的提取。 #需求 分析文档,输出插入域的参数列表,将所有参数全部输出。 参数信息字段包括:name(参数名称)、id(参数代号)、pos(参数起始bit位置)、length(参数bit长度)、type(类型:para)。 注意: 1个字节的长度为8位,使用B0-B7来表示,请精确计算参数长度。 文档中位置描述信息可能存在跨字节的情况,例如:"Byte1_B6~Byte2_B0":表示从第1个字节的第7位到第2个字节的第1位,长度是3;"Byte27_B7~Byte28_B0":表示从第27个字节的第8位到第28个字节的第1位,长度是2;"Byte38~Byte74":表示从第38个字节到第74个字节,中间有37个字节,长度是298。 #约束 - 不要遗漏任何参数; - 如果有代号严格依照文档中的代号,文档中的代号如果不符合代号命名规则将特殊字符转换为下划线,例如:Rsv-1转换为Rsv_1; - 数据结构最外层为数组,数组元素为参数信息对象; - 仅输出JSON文本。 #例子 [ { "name": "遥测模式字", "id": "TMS215", "pos": 0, "length": 8, "type": "para" } ] """ print('插入域参数列表:') def validation(gen_text): params = json.loads(gen_text) assert isinstance(params, list), '插入域参数列表数据结构最外层必须是数组' assert len(params), '插入域参数列表不能为空' doc_text = self.get_text_with_entity(['插入域']) text = self.generate_text_json(_msg, './out/插入域参数列表.json', doc_text=doc_text, validation=validation) return json.loads(text) def gen_tm_frame_data(self): _msg = """ """ files = [file_map['遥测大纲']] def validation(gen_text): pass def gen_tm_frame(self, proj_pk, rule_pk, ds, name_path): # 插入域参数列表 insert_domain = self.gen_insert_domain_params() # VC源包格式 vc_pkt_fields = data_templates.vc_pkt_fields # self.gen_pkt_format() # 获取虚拟信道 vc vcs = self.gen_vc() for vc in vcs: vc['children'] = [] vc['VCID'] = str(int(vc['VCID'], 2)) for field in vc_pkt_fields: if field['name'] == '数据域': field['children'] = [] vc['children'].append(dict(field)) # VCID 字段内容 vcid_content = build_vcid_content(vcs) # 遥测帧结构由模板生成,只需提供特定参数 tm_data = { "vcidContent": vcid_content, 'insertDomain': insert_domain, } cadu = data_templates.get_tm_frame(tm_data) # VC源包 self.vc_pkts = self.gen_pkt_vc() # 遥测源包设计中的源包列表 self.tm_pkts = self.gen_pkts() # 处理VC下面的遥测包数据 for vc in vcs: # 此VC下的遥测包过滤 _vc_pkts = filter(lambda it: it['vcs'].__contains__(vc['id']), self.vc_pkts) for _pkt in _vc_pkts: # 判断遥测包是否有详细定义 # if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], self.tm_pkts), None): # continue # 获取包详情 _pkt = self.gen_pkt_details(_pkt['name'], _pkt['id']) epdu = next(filter(lambda it: it['name'] == '数据域', vc['children']), None) if epdu and _pkt: _pkt['children'] = _pkt['datas'] # todo 当数据包获取到东西但不是参数时,获取到的包结构有问题,需要过滤 if len(_pkt['children']) > 0: _last_par = _pkt['children'][len(_pkt['children']) - 1] _pkt['length'] = (_last_par['pos'] + _last_par['length']) _pkt['pos'] = 0 if 'children' not in epdu: epdu['children'] = [] # 添加解析规则后缀防止重复 _pkt['id'] = _pkt['id'] + '_' + vc['VCID'] # 给包名加代号前缀 if not _pkt['name'].startswith(_pkt['id']): _pkt['name'] = _pkt['id'] + '_' + _pkt['name'] epdu['children'].append(_pkt) apid_node = next(filter(lambda it: it['name'].__contains__('应用过程'), _pkt['headers']), None) ser_node = next(filter(lambda it: it['name'] == '服务', _pkt['headers']), None) sub_ser_node = next(filter(lambda it: it['name'] == '子服务', _pkt['headers']), None) _pkt['vals'] = \ f"{apid_node['content']}/{int(ser_node['content'], 16)}/{int(sub_ser_node['content'], 16)}/" # 重新计数起始偏移 self.compute_length_pos(cadu['children']) # 将数据插入数据库 seq = 1 for cadu_it in cadu['children']: if cadu_it['name'] == 'VCDU': # VCDU # 将信道替换到数据域位置 vc_data = next(filter(lambda it: it['name'].__contains__('数据域'), cadu_it['children']), None) if vc_data: idx = cadu_it['children'].index(vc_data) cadu_it['children'].pop(idx) for vc in vcs: # 处理虚拟信道属性 vc['type'] = 'logic' vc['length'] = vc_data['length'] vc['pos'] = vc_data['pos'] vc['content'] = 'CCSDSMPDU' vcid = vc['VCID'] vc['condition'] = f'VCID=={vcid}' # 将虚拟信道插入到VCDU cadu_it['children'].insert(idx, vc) idx += 1 for vc in vcs: self.compute_length_pos(vc['children']) # 设置VCID的content vcid_node = next(filter(lambda it: it['name'].__contains__('VCID'), cadu_it['children']), None) if vcid_node: vcid_node['content'] = vcid_content create_enc_pkt(proj_pk, rule_pk, cadu_it, rule_pk, seq, name_path, ds, '001', 'ENC') else: # 参数 create_prop_enc(proj_pk, rule_pk, cadu_it, get_data_ty(cadu_it), seq) seq += 1 return cadu def gen_vc(self): _msg = """ #角色 你是一名资深的软件工程师。 #指令 我需要从文档中提取虚拟信道列表,你要帮助我完成虚拟信道列表的提取。 #需求 请分析文档中的遥测包格式以及遥测虚拟信道,输出遥测虚拟信道列表。 字段包括:id(虚拟信道代号)、name(虚拟信道名称)、VCID(虚拟信道VCID,二进制)、format(根据虚拟信道类型获取对应的数据包的格式的名称) #上下文 深入理解文档中描述的关系,例如:文档中描述了常规遥测是常规数据的下传信道,并且还描述了分系统常规遥测参数包就是实时遥测参数包,并且文档中对实时遥测参数包的格式进行了描述,所以常规遥测VC应该输出为:{"id": "1", "name": "常规遥测VC", "VCID": "0", "format": "实时遥测参数包"} #约束 - 数据结构最外层为数组,数组元素为虚拟信道信息; - format:必须是数据包格式的名称; - 仅输出JSON文本。 #例子: [ { "id": "VC0", "name": "空闲信道", "VCID": "111111", "format": "空闲包" } ] """ def validation(gen_text): vcs = json.loads(gen_text) assert next(filter(lambda it: re.match('^[0-1]+$', it['VCID']), vcs)), '生成的VCID必须是二进制' print('虚拟信道:') doc_text = self.get_text_with_entity(['虚拟信道定义']) text = self.generate_text_json(_msg, "out/虚拟信道.json", doc_text=doc_text, validation=validation) vcs = json.loads(text) return vcs def gen_dev_pkts(self): _msg = """ #角色 你是一名资深的软件工程师。 #指令 我需要从文档中提取设备以及设备下面的遥测包信息,你要帮助我完成提取。 #需求 输出文档中遥测源包类型定义描述的设备以及设备下面的遥测包。 #约束 - 数据结构:数组 > 设备 > 遥测包列表(pkts); - 设备字段包括:名称(name)、代号(id); - 源包字段包括:名称(name)、代号(id); - 仅输出JSON文本。 #例子 """ print('设备遥测源包信息:') files = [file_map["遥测源包设计报告"]] text = self.generate_text_json(_msg, 'out/设备数据包.json', [], files) dev_pkts = json.loads(text) return dev_pkts def gen_pkt_details(self, pkt_name, pkt_id): cache_file = f'out/数据包-{pkt_name}.json' # _msg = f""" # #角色 # 你是一名资深的软件工程师。 # #指令 # 我需要从文档中提取遥测源包的最后一个参数的bit位置和数据域参数个数,你要帮我完成参数bit位置和数据域参数个数的提取。 # #需求 # 输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包的最后一个参数的bit位置和数据域参数个数。 # """ + """ # #约束 # - 遥测源包的内容在一个表格中定义,表格结束则包内容结束; # - 数据域中每一行对应一个参数; # - 不要跨表格提取; # - 字节位置中字节位置是从1开始的,bit位置是从0开始的; # - bit位置计算公式为:(N-1)*8+B,其中N是字节数,B是bit数; # - 仅输出json,不要输出其他任何字符。 # #例子: # {"last_par_pos":128, "par_num": 20} # """ # text = self.generate_text_json(_msg, '', doc_text=doc_text) # result = json.loads(text) # last_par_pos = result['last_par_pos'] # par_num = result['par_num'] _msg = f""" #角色 你是一名资深的软件工程师。 #指令 我需要从文档中提取遥测源包信息列表,你要帮我完成遥测源包信息列表的提取。 #需求 输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包。 """ + """ 遥测包字段包括:名称(name)、代号(id)、类型(type)、包头属性列表(headers)、数据域参数列表(datas),类型为 linear; 包头的属性的字段包括:名称(name)、代号(id)、位置(pos)、定义(content)、长度(length)、类型(type),类型为 para; 数据域参数字段包括:参数名称(name)、参数代号(id)、位置(pos)、长度(length)、字节顺序(byteOrder),类型为 para; 包头属性包括:包版本号、包类型、副导头标识、应用过程标识、序列标记、包序列计数、包长、服务、子服务。 包头属性的长度:包版本号(3)、包类型(1)、副导头标识(1)、应用过程标识(11)、序列标记(2)、包序列计数(14)、包长(16)、服务(8)、子服务(8)。 表格单元格合并说明:包格中存在单元格合并的情况,如果水平或垂直相邻的单元格内容一样那么这几个内容一样的单元格有可能是一个合并单元格在分析时应该当作合并单元格分析。 #约束 - 代号命名规则:数字、英文字母和下划线组成且以英文字母和下划线开头; - 如果没有名称用代号代替,如果没有代号用名称的英文翻译代替,翻译尽量简短; - 如果有代号严格依照文档中的代号,文档中的代号如果不符合代号命名规则将特殊字符转换为下划线,例如:Rsv-1转换为Rsv_1; - 你需要理解数据包的位置信息,由位置信息得到长度,并且将所有输出单位统一转换为 bits; - pos字段:数值类型,从0开始计算,由长度(length)累加得到; - 应用过程标识:应用过程标识的定义如果不是十六进制转换为十六进制,转换完成后要验证是否正确,以0x开头; - 包头后面的每一行都对应一个参数,逐行输出参数,不要遗漏任何参数; - 类似”保留(Rsv)“的行也要当参数生成; - 重复的行也要生成; - 注意包内容的范围,不要提取到其他包中的内容,包内容都在同一个表格中; - 字节顺序:值为大端“B”,小端“L”,默认为“B”; - 输出严格按照文档中的内容生成,不要创造文档中不存在的内容; - 仅输出json,不要输出任何其他内容。 #例子 { "name": "数管缓变遥测包", "id": "PMS003", "type": "linear", "headers": [ { "name": "包标识", "id": "packetIdentifier", "pos": 0, "content": "000", "length": 8, "type": "para" } ], "datas": [ { "name": "XXX包", "id": "XXX", "pos": 0, "length": 8, "byteOrder": "" } ] """ print(f'遥测源包“{pkt_name}”信息:') def validation(gen_text): _pkt = json.loads(gen_text) with open(f'out/tmp/{time.time()}.json', 'w') as f: f.write(gen_text) assert 'headers' in _pkt, '包结构中必须包含headers字段' assert 'datas' in _pkt, '包结构中必须包含datas字段' # assert par_num == len(_pkt['datas']), f'数据域参数个数不对!预计{par_num}个,实际{len(_pkt["datas"])}' # assert last_par_pos == _pkt['datas'][-1]['pos'], '最后一个参数的字节位置不对!' doc_text = self.get_text_with_entity([pkt_id]) if doc_text == '': return None text = self.generate_text_json(_msg, cache_file, [], doc_text, validation) pkt = json.loads(text) pkt_len = 0 for par in pkt['datas']: par['pos'] = pkt_len pkt_len += par['length'] pkt['length'] = pkt_len return pkt def gen_pkts(self): _msg = """ #角色 你是一名资深软件工程师。 #指令 我需要从文档中提取遥测包数据,你要根据文档内容帮我完成遥测包数据的提取。 #需求 输出文档中描述的遥测包列表,遥测包字段包括:名称(name)、代号(id)。 字段描述: 1.名称:遥测包的名称; 2.代号:遥测包的代号; #约束 - name:名称中不要包含代号,仅从文档中提取源包名称; - 如果没有代号,使用遥测包名称的英文翻译代替; - 如果没有名称用代号代替; - 不要漏掉任何遥测包; - 数据结构最外层为数组数组元素为遥测包,不包括遥测包下面的参数。 #例子 [ { "name": "数管数字量快速源包", "id": "PMS001", } ] """ print(f'遥测源包列表:') doc_text = self.get_text_with_entity(['源包列表']) text = self.generate_text_json(_msg, 'out/源包列表.json', doc_text=doc_text) pkt = json.loads(text) return pkt def gen_pkt_vc(self): _msg = """ #角色 你是一名资深软件工程师。 #指令 我需要从文档中提取所有遥测源包信息,你要帮助我完成遥测源包信息的提取。 #需求 根据文档内容输出遥测源包信息,顶级结构为数组,元素为遥测源包,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs),下传时机(timeTags)。 #约束 - 所属虚拟信道:必须是文档中描述的遥测虚拟信道代号(序号); - 下传时机:与表格中定义的一致; - 不要遗漏任何遥测源包。 #例子: [ { "id": "PMS001", "name": "数管数字量快速源包", "vcs": ["VC1"], "timeTags": ["实时"] }, ] """ print('遥测源包所属虚拟信道:') def validation(gen_text): pkts = json.loads(gen_text) assert len(pkts), 'VC源包列表不能为空' doc_text = self.get_text_with_entity(['虚拟信道定义', '遥测源包下传时机']) text = self.generate_text_json(_msg, 'out/遥测VC源包.json', doc_text=doc_text, validation=validation) pkt_vcs = json.loads(text) return pkt_vcs def gen_pkt_format(self): _msg = """ #角色 你是一名资深软件工程师。 #指令 我需要从文档中提取数据包的格式,你要帮助我完成数据包格式的提取。 #需求 请仔细分系文档,输出各个数据包的格式。 数据结构最外层为数组,数组元素为数据包格式,将主导头的子级提升到主导头这一级并且去除主导头,数据包type为logic,包数据域type为any。 包格式字段包括:名称(name)、代号(id)、类型(type)、子级(children)。 children元素的字段包括:name、id、pos、length、type。 children元素包括:版本号(Ver)、类型(TM_Type)、副导头标志(Vice_Head)、应用过程标识符(Proc_Sign)、分组标志(Group_Sign)、包序列计数(Package_Count)、包长(Pack_Len)、数据域(EPDU_DATA)。 #约束 - 生成的JSON语法格式要合法。 #例子 { "name": "实时遥测参数包", "id": "EPDU", "type": "logic", "children": [ { "name": "版本号", "id": "Ver", "pos": 0, "length": 3, "type": "para", "content": "0", "dataTy": "INVAR" }, { "name": "数据域", "id": "EPDU_DATA", "pos": 3, "length": "变长", "type": "any" } ] } """ print('遥测包格式:') text = self.generate_text_json(_msg, 'out/数据包格式.json', files=[file_map['遥测大纲']]) pkt_formats = json.loads(text) return pkt_formats def compute_length_pos(self, items: list): length = 0 pos = 0 for child in items: if 'children' in child: self.compute_length_pos(child['children']) child['pos'] = pos if 'length' in child and isinstance(child['length'], int): length = length + child['length'] pos = pos + child['length'] # node['length'] = length def gen_bus(self, proj_pk, rule_enc, rule_id, ds, name_path, dev_name): _msg = """ #角色 你是一名资深的软件工程师 #指令 我需要从文档中提取经总线的数据包列表,你要帮助我完成经总线的数据包列表的提取。 #需求 请析文档,列出总线通信包传输约定中描述的所有数据包列表, 数据包字段包括:id(数据包代号)、name(数据包名称)、apid(16进制字符串)、service(服务子服务)、length(bit长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、 transSer(传输服务)、note(备注)、rtAddr(所属RT的地址十进制)、rt(所属rt名称)、throughBus(是否经过总线)、burst(是否突发)、transDirect(传输方向)。 #约束 - frameNum:使用文档中的文本不要做任何转换; - subAddr:值为“深度”、“平铺”、“数字”或null; - 是否经过总线的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线; - 传输服务分三种:SetData(置数)、GetData(取数)、DataBlock(数据块传输); - 传输方向分”收“和”发“,传输服务如果是”取数“是”收“,如果是”数据块传输“则根据包所在的分系统以及表格的”传输方向“列进行判断,判断对于SMU来说是收还是发; - 是否突发:根据表格中的”传输周期“列进行判断,如果填写了类似”突发“的文字表示是突发否则表示不是突发; - 不要漏掉任何一个数据包; - 数据结构最外层是数组,数组元素为数据包,以JSON格式输出,不要输出JSON以外的任何文本。 #例子 [ { "id": "PCS005", "name": "总线管理(内部指令)", "apid": "418", "service": "(1, 2)", "length": 1, "interval": 1000, "subAddr": null, "frameNum": "1|2", "transSer": "DataBlock", "note": "", "rtAddr": 28, "rt": "数据接口单元XIU", "throughBus": true, "burst": true, "transDirect": "发" } ] """ print('总线数据包:') def validation(gen_text): json.loads(gen_text) doc_text = self.get_text_with_entity(['RT地址分配', '分系统源包']) text = self.generate_text_json(_msg, 'out/总线.json', doc_text=doc_text, validation=validation) pkts = json.loads(text) # 筛选经总线的数据包 pkts = list(filter(lambda it: it['throughBus'], pkts)) no_apid_pkts = list(filter(lambda it: not it['apid'], pkts)) # 筛选有apid的数据包 pkts = list(filter(lambda it: it['apid'], pkts)) for pkt in pkts: _pkt = self.gen_pkt_details(pkt['name'], pkt['id']) if _pkt: pkt['children'] = [] pkt['children'].extend(_pkt['datas']) pkt['length'] = _pkt['length'] rt_pkt_map = {} for pkt in pkts: # 根据数据块传输和取数分组 # 逻辑封装包的解析规则ID:RT[rt地址]SUB[子地址]S(S代表取数,方向是AA表示发送;R代表置数,方向是BB表示接受) # 取数:逻辑封装包根据子地址和帧号组合创建,有几个组合就创建几个逻辑封装包 # 数据块:只有一个逻辑封装包 # 处理子地址 if pkt['burst']: # 突发包子地址是18~26 pkt['subAddr'] = 26 elif pkt['subAddr'] == '平铺' or pkt['subAddr'] is None: # 平铺:11~26,没有填写的默认为平铺 pkt['subAddr'] = 26 elif pkt['subAddr'] == '深度': # 深度:11 pkt['subAddr'] = 11 # 处理帧号 if pkt['burst']: # 突发:ALL pkt['frameNum'] = 'ALL' elif not pkt['frameNum']: # 有 pkt['frameNum'] = '' # todo: 处理传输方向 rt_addr = pkt['rtAddr'] sub_addr = pkt['subAddr'] trans_ser = pkt['transSer'] frame_no = pkt['frameNum'].replace('|', ',') if trans_ser == 'GetData': # 取数 pkt_id = f"RT{rt_addr}SUB{sub_addr}" vals = f"{rt_addr}/{sub_addr}/0xAA/{frame_no}/" rt_pkt_map_gen(pkt, '取数', rt_pkt_map, pkt_id, vals) elif trans_ser == 'DataBlock': # 数据块 direct = '0xAA' rt_pkt_map_gen(pkt, '数据块传输', rt_pkt_map, f"RT{rt_addr}SUB{sub_addr}{direct}", f"{rt_addr}/{sub_addr}/{direct}/ALL/") _pkts = [] for k in rt_pkt_map: _pkts.append(rt_pkt_map[k]) bus_items = data_templates.get_bus_datas(_pkts) seq = 1 sub_key_nodes = list(filter(lambda it: 'is_key' in it, bus_items)) has_key = any(sub_key_nodes) rule_pk = rule_enc.C_ENC_PK sub_key = '' key_items = [] self.compute_length_pos(bus_items) for item in bus_items: if item['type'] == 'enc': if has_key: _prop_enc = create_any_pkt(proj_pk, rule_pk, item, seq, name_path, ds, 'ENC', sub_key_nodes, key_items) else: _prop_enc, rule_stream, _ = create_enc_pkt(proj_pk, rule_pk, item, rule_enc.C_ENC_PK, seq, name_path, ds, '001', 'ENC') else: # 参数 _prop_enc = create_prop_enc(proj_pk, rule_pk, item, get_data_ty(item), seq) if item.__contains__('is_key'): sub_key += _prop_enc.C_ENCITEM_PK + '/' key_items.append( {"pk": _prop_enc.C_ENCITEM_PK, 'id': _prop_enc.C_SEGMENT_ID, 'name': _prop_enc.C_NAME, 'val': ''}) seq += 1 if sub_key: rule_enc.C_KEY = sub_key update_rule_enc(rule_enc) def gen_tc(self): # 数据帧格式 frame = self.gen_tc_transfer_frame_format() # 遥控包格式 pkt_format = self.gen_tc_pkt_format() # 遥控包列表 instructions = self.gen_tc_transfer_pkts() for inst in instructions: # 遥控指令数据区内容 self.gen_tc_pkt_details(inst) inst['type'] = 'insUnit' format_text = json.dumps(pkt_format, ensure_ascii=False) format_text = utils.replace_tpl_paras(format_text, inst) pf = json.loads(format_text) pf['name'] = inst['name'] pf['code'] = inst['code'] data_area = next(filter(lambda x: x['name'] == '应用数据区', pf['children'])) data_area['children'].append(inst) frame['subPkts'].append(pf) self.order = 0 def build_def(item: dict): if item['type'] in ['enum', 'sendFlag']: if isinstance(item['enums'], str): enums = json.loads(item['enums']) else: enums = item['enums'] return json.dumps({"EnumItems": enums, "CanInput": True}, ensure_ascii=False) elif item['type'] == 'length': return None elif item['type'] == 'checkSum': return json.dumps({"ChecksumType": item['value']['type']}) elif item['type'] == 'subPkt': return json.dumps({"CanInput": False}) elif item['type'] in ['combPkt', 'insUnitList', 'input']: return None elif item['type'] == 'insUnit': return '{"MinLength":null,"MaxLength":null,"IsSubPackage":false,"InputParams":[],"OutPutParams":[],"MatchItems":[]}' elif item['type'] == 'pkt': return '''{"MaxLength":1024,"IsSplit8":false,"Split8Start":null,"Split8End":null,"PadCode":null,"Alignment":null,"InputParams":[],"OutPutParams":[],"MatchItems":[]}''' elif item['type'] == 'pktSeqCnt': return json.dumps({"FirstPackValue":"PackCount","MiddlePackValue":"PackIndex","LastPackValue":"PackIndex","IndependPackValue":"InsUnitCount"}) elif 'value' in item: return item['value'] def create_tc_format(parent_pk, field, parent_parent_pk=None): """ 创建遥控格式 数据库数据结构: 帧字段 parent_pk=null, pk=pk_001, type=1 匿名字段(子包) parent_pk=pk_001, pk=pk_002, type=22 字段1 parent_pk=pk_002, pk=pk_003, type=15 字段2 parent_pk=pk_002, pk=pk_004, type=15 包字段 parent_pk=pk_001, pk=pk_005, type=1 匿名字段(子包) parent_pk=pk_005, pk=pk_006, type=22 字段3 parent_pk=pk_006, pk=pk_007, type=15 指令单元 parent_pk=pk_005, pk=pk_007, type=4 字段4 parent_pk=pk_007, pk=pk_008, type=15 :param parent_pk: 父级pk :param field: 格式字段 :param parent_parent_pk: 父级的父级pk :return: """ field['order'] = self.order self.order += 1 field['def'] = build_def(field) if 'length' in field: field['bitWidth'] = field['length'] field['bitOrder'] = None field['attr'] = make_attr(field) if field['type'] == 'length' and 'value' in field and field['value']: val = field['value'] field['range'] = val['start'] + "~" + val['end'] field['formula'] = val['formula'] # 即时输入长度为null则是变长字段,需要把类型改为variableLength if field['type'] == 'input' and field['length'] is None: field['type'] = 'variableLength' # 枚举值默认值设置 if field['type'] == 'enum' and len(field['enums']) and not next(filter(lambda x: 'default' in x and x['default'], field['enums']), None): field['enums'][0]['default'] = True # 校验和 if field['type'] == 'checkSum': field['range'] = f'{field["value"]["start"]}~{field["value"]["end"]}' ins_format = create_ins_format(self.proj.C_PROJECT_PK, parent_pk, field) ins_format_pk = ins_format.C_INS_FORMAT_PK if 'children' in field: autocode = 1 if field['type'] == 'pkt': info = { 'order': self.order, 'type': 'subPkt', 'def': json.dumps({"CanInput": False}) } ins_format = create_ins_format(self.proj.C_PROJECT_PK, ins_format_pk, info) self.order += 1 for child in field['children']: child['autocode'] = autocode autocode += 1 if field['type'] == 'insUnitList': _parent_pk = parent_parent_pk else: _parent_pk = ins_format.C_INS_FORMAT_PK create_tc_format(_parent_pk, child, ins_format_pk) if 'subPkts' in field: for _pkt in field['subPkts']: create_tc_format(ins_format_pk, _pkt, parent_pk) create_tc_format(None, frame) def gen_tc_transfer_frame_format(self): _msg = ''' # 角色 你是一名资深的软件工程师。 # 指令 分析遥控传送帧格式,提取遥控传送帧格式的字段定义。 # 需求 要提取值的帧格式字段: - 版本号:const,二进制,以B结尾; - 通过标志:const,二进制,以B结尾; - 控制命令标志:const,二进制,以B结尾; - 空闲位:const,二进制,以B结尾; - 航天器标识:const,十六进制,以0x开头,如果是二进制或十进制需要转换为十六进制; - 虚拟信道标识:sendFlag,发送标记,默认为“任务注入帧”,所有的值都要列举出来; # 数据类型 - const:固定码字,数值,二进制以B结尾,十进制,十六进制以0x开头; - sendFlag:发送标记,类似枚举,定义样例:[{"n":"name","v":"value","c":"code","default":true}],n表示名称,v表示值,c表示code(没有空着),default表示是默认值; - checkSum:校验和,如果是校验和类型还需要分析校验和的算法,并保存在value中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other) # 约束 - 以JSON格式输出; - 仅输出JSON文本,不要输出任何其他文本。 # 输出例子: { "版本号": "00B", "通过标志": "0", ... } ''' def validation(gen_text): json.loads(gen_text) doc_text = self.get_text_with_entity(['遥控帧格式']) text = self.generate_tc_text(_msg, 'out/tc_transfer_frame.json', doc_text=doc_text, validation=validation) result: dict = json.loads(text) format_text = utils.read_from_file('tpl/tc_transfer_frame.json') format_text = utils.replace_tpl_paras(format_text, result) frame = json.loads(format_text) return frame def gen_tc_pkt_format(self): _msg = ''' # 角色 你是一名资深的软件工程师。 # 指令 分析遥控包格式,提取遥控包格式的字段定义。 # 需求 要提取值的包格式字段: - 包版本号: const,二进制; - 包类型: const,二进制; - 数据区头标志: const,二进制; - 序列标志: const,二进制; - 包长:length, - 副导头标志: const,二进制; - 遥控包版本号: const,二进制; - 命令正确应答: const,二进制; - 源地址: const,十六进制。 # 数据类型 - 固定码字:const,数值,二进制以B结尾,十进制,十六进制以0x开头; - 长度:length,如果字段描述内容为数据区域的长度则表示是长度,长度的value为数值、null或范围定义, - 枚举值:enum, - 校验和:checkSum,如果是校验和类型还需要分析校验和的算法,并保存在value中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other) - 即时输入:input。 # 长度类型的范围定义描述 {"start": "起始字段code", "end": "结束字段code", "formula": "计算公式"} - start:起始字段code,长度包括起始字段,字段描述中说明了起始字段, - end:结束字段code,长度包括结束字段,字段描述中说明了结束字段, - formula:计算公式,如果没有计算相关描述则表示不需要计算公式。 计算公式定义: - BYTES:按字节计算; - N-x:总字节数减x,例如总字节数减1的公式为N-1。 # 约束 - 以JSON格式输出; - 仅输出JSON文本,不要输出任何其他文本。 # 输出例子: { "包版本号": "00B", "包类型": "1B", ... } ''' def validation(gen_text): json.loads(gen_text) doc_text = self.get_text_with_entity(['遥控包格式']) text = self.generate_tc_text(_msg, 'out/tc_transfer_pkt.json', doc_text=doc_text, validation=validation) result = json.loads(text) format_text = utils.read_from_file('tpl/tc_pkt_format.json') format_text = utils.replace_tpl_paras(format_text, result) pkt_format = json.loads(format_text) return pkt_format def gen_tc_transfer_pkts(self): _msg = ''' # 角色 你是一名资深的软件工程师。 # 指令 分析文档列出所有的遥控指令。 # 约束 - 应用过程标识:应用过程标识就是APID,一般会在名称后的括号中列出来; - code:指令代号,没有就空着; - 应用数据区:提取表格中的应用数据区内容。 # 输出例子: [{ "name": "xxx", "code":"pkt", "应用过程标识符":"0xAA", "服务类型":"0x1", "服务子类型":"0x2", "应用数据区": "" }] ''' def validation(gen_text): json.loads(gen_text) doc_text = self.get_text_with_entity(['APID分配']) text = self.generate_tc_text(_msg, 'out/tc_transfer_pkts.json', doc_text=doc_text, validation=validation) pkts = json.loads(text) return pkts def gen_tc_pkt_details(self, pkt): result = [] tc_name = pkt['name'] tc_code = pkt['code'] pkt['name'] = f'{tc_code} {tc_name}' _msg = f""" # 角色 你是一个资深软件工程师。 # 指令 分析文档,从文档中提取遥控指令名称为“{tc_name}”代号为“{tc_code}”的指令应用数据区定义。 有些文档内容非常简单仅仅包含特定字节的内容描述,如果是这种文档,则每个特定字节的内容描述定义为一个字段,字段类型根据字节内容确定。 """ + """ # 字段类型 - 固定码字:const,数值,二进制以B结尾,十进制,十六进制以0x开头; - 长度:length,如果字段描述内容为数据区域的长度则表示是长度,长度的value为数值、null或范围定义, - 枚举值:enum, - 校验和:checkSum,如果是校验和类型还需要分析校验和的算法是什么,并保存在value中, - 即时输入:input,如果是即时输入value的值为空字符串。 # 长度类型的范围定义描述 {"start": "起始字段code", "end": "结束字段code", "formula": "计算公式"} - start:起始字段code,长度包括起始字段,字段描述中说明了起始字段, - end:结束字段code,长度包括结束字段,字段描述中说明了结束字段, - formula:计算公式,如果没有计算相关描述则表示不需要计算公式。 计算公式定义: - BYTES:按字节计算; - N-x:总字节数减x,例如总字节数减1的公式为N-1。 # 字段类型分析方法 - 根据字段描述分析字段的类型; - 字段描述中明确指定了字段值的,类型为const; - 字段描述中没有明确指定字段值,但是罗列了取值范围的,类型为enum; - 字段描述中如果没有明确指定字段值也没有罗列取值范围的,类型为input; - 字段如果是和“长度”有关,类型为length; - 如果和数据域有关,类型为const; - 字段如果和校验和有关,类型为checkSum,分析校验和的算法,并保存在value中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other)。 # 约束 - code 如果没有明确定义则使用名称的英文翻译,尽量简短; - length 自动转换为bit长度,必须是数值、null或范围定义,不能为0; - value 根据字段描述提取字段值,字段值一般为数值类型,需要根据字段类型来分析,如果是length类型value的值为范围定义; - enums 枚举类型的字段必须要有enums,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""}; - length类型的范围定义中的start和end必须是生成结果中的字段code; - 输出数据结构为数组,数组元素为字段信息; - 输出内容必须为严格的json,不能输出除json以外的任何内容。 # 输出例子: [ { "name": "para1", "code": "para1", "length": 8, "type": "const", "value": "0xAA" }, { "name": "para2", "code": "para2", "length": 8, "type": "length", "value": {"start": "para1", "end": "data", "formula": "BYTES"} }, { "name": "数据", "code": "data", "length": null, "type": "input", "value": "" } ] """ def validation(gen_text): fields = json.loads(gen_text) for field in fields: if field['type'] == 'length': if field['value'] is None: raise Exception('length类型的value不能为空') if 'start' not in field['value'] or 'end' not in field['value']: raise Exception('length类型的value必须包含start和end') if field['value']['start'] not in [f['code'] for f in fields]: raise Exception('length类型的value的start字段必须在fields中') if field['value']['end'] not in [f['code'] for f in fields]: raise Exception('length类型的value的end字段必须在fields中') elif field['type'] == 'enum': if 'enums' not in field: raise Exception('enum类型的field必须包含enums') if len(field['enums']) == 0: raise Exception('enum类型的field的enums不能为空') for enum in field['enums']: if 'n' not in enum or 'v' not in enum: raise Exception('enum类型的field的enums的元素必须包含n、v、c') if enum['n'] == '' or enum['v'] == '': raise Exception('enum类型的field的enums的元素不能为空') doc_text = self.get_text_with_entity([tc_name]) if doc_text == '': doc_text = pkt['应用数据区'] text = self.generate_tc_text(_msg, f'out/遥控指令数据域-{tc_code}-{utils.to_file_name(tc_name)}.json', doc_text=doc_text, validation=validation) result = json.loads(text) pkt['children'] = result if __name__ == '__main__': try: os.makedirs("./out/pkts", exist_ok=True) os.makedirs("./out/tmp", exist_ok=True) # 启动大模型处理流程 ret_text = DbStructFlow().run() except KeyboardInterrupt: if g_completion: g_completion.close()