import os import time from datetime import datetime from openai import OpenAI import re import json import data_templates from knowledgebase.db.db_helper import create_project, create_device, create_data_stream, \ update_rule_enc, create_extend_info, create_ref_ds_rule_stream, create_ins_format from knowledgebase.db.data_creator import create_prop_enc, create_enc_pkt, get_data_ty, create_any_pkt from knowledgebase.db.models import TProject file_map = { "文档合并": "./doc/文档合并.md", "遥测源包设计报告": "./doc/XA-5D无人机分系统探测源包设计报告(公开).md", "遥测大纲": "./doc/XA-5D无人机探测大纲(公开).md", "总线传输通信帧分配": "./doc/XA-5D无人机1314A总线传输通信帧分配(公开).md", "应用软件用户需求": "./doc/XA-5D无人机软件用户需求(公开).docx.md", "指令格式": "./doc/ZL格式(公开).docx.md" } # file_map = { # "遥测源包设计报告": "./docs/HY-4A数管分系统遥测源包设计报告 Z 240824 更改3(内部) .docx.md", # "遥测大纲": "./docs/HY-4A卫星遥测大纲 Z 240824 更改3(内部).docx.md", # "总线传输通信帧分配": "./docs/HY-4A卫星1553B总线传输通信帧分配 Z 240824 更改3(内部).docx.md", # "应用软件用户需求": "./docs/HY-4A数管分系统应用软件用户需求(星务管理分册) Z 240831 更改4(内部).docx.md" # } # file_map = { # "文档合并": "./doc/文档合并.md", # "遥测源包设计报告": "./doc/XA-5D无人机分系统探测源包设计报告(公开).md", # "遥测大纲": "./doc/XA-5D无人机探测大纲(公开).md", # "总线传输通信帧分配": "./doc/XA-5D无人机1314A总线传输通信帧分配(公开).md" # } BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1' API_KEY = 'sk-15ecf7e273ad4b729c7f7f42b542749e' MODEL_NAME = 'qwen2.5-14b-instruct-1m' # BASE_URL = 'http://10.74.15.164:11434/v1/' # API_KEY = 'ollama' # MODEL_NAME = 'qwen2.5:32b-128k' # BASE_URL = 'http://10.74.15.164:1001/api' # API_KEY = 'sk-a909385bc14d4491a718b6ee264c3227' # MODEL_NAME = 'qwen2.5:32b-128k' USE_CACHE = True assistant_msg = """ # 角色 你是一个专业的文档通信分析师,擅长进行文档分析和通信协议分析,同时能够解析 markdown 类型的文档。拥有成熟准确的文档阅读与分析能力,能够妥善处理多文档间存在引用关系的复杂情况。 ## 技能 ### 技能 1:文档分析(包括 markdown 文档) 1. 当用户提供文档时,仔细阅读文档内容,严格按照文档中的描述提取关键信息,不得加入自己的回答或建议。 2. 分析文档的结构、主题和重点内容,同样只依据文档进行表述。 3. 如果文档间存在引用关系,梳理引用脉络,明确各文档之间的关联,且仅呈现文档中体现的内容。 ### 技能 2:通信协议分析 1. 接收通信协议相关信息,理解协议的规则和流程,仅依据所给信息进行分析。 ## 背景知识 ###软件主要功能与运行机制总结如下: 1. 数据采集和处理: DIU负责根据卫星的工作状态或模式提供遥测数据,包括模拟量(AN)、总线信号(BL)以及温度(TH)和数字量(DS),并将这些信息打包,通过总线发送给SMU。 SMU则收集硬通道上的遥测参数,并通过总线接收DIU采集的信息。 2. 多路复用与数据传输: 遥测源包被组织成E-PDU,进一步复用为M-PDU,并填充到VCDU中构成遥测帧。 利用CCSDS AOS CADU格式进行遥测数据的多路复用和传输。 3. 虚拟信道(VC)调度机制: 通过常规遥测VC、突发数据VC、延时遥测VC、记录数据VC以及回放VC实现不同类型的数据下传。 4. 遥控指令处理: 上行遥控包括直接指令和间接指令,需经过格式验证后转发给相应单机执行。 遥控帧通过特定的虚拟信道(VC)进行传输。 这些知识需要你记住,再后续的处理中可以帮助你理解要处理的数据。 ## 目标导向 1. 通过对文档和通信协议的分析,为用户提供清晰、准确的数据结构,帮助用户更好地理解和使用相关信息。 ## 规则 1. 每一个型号都会有一套文档,需准确判断是否为同一个型号的文档后再进行整体分析,每次只分析同一个型号的文档。 2. 大多数文档结构为:型号下包含设备,设备下包含数据流,数据流下包含数据帧,数据帧中有一块是包域,包域中会挂载各种类型的数据包。 3. 文档都是对于数据传输协议的描述,在数据流、数据帧、数据包等传输实体中都描述了各个字段的分布、各个字段的大小和位置等信息,且大小单位不统一,需理解这些单位,并将所有输出单位统一为 bits,长度字段使用 length 表示,位置字段使用 pos 表示,如果为变长使用“"变长"”表示。 4. 如果有层级,使用树形 JSON 输出,如果有子节点,子节点 key 使用children;需保证一次输出的数据结构统一,并且判断每个层级是什么类型,输出类型字段(type),类型字段的 key 使用 type,类型包括:型号(project)、设备(dev)、封装包(enc)、线性包(linear)、参数(para),封装包子级有数据包,所以type为enc,线性包子级只有参数,所以type为linear;每个层级都包含偏移位置(pos),每个层级的偏移位置从0开始。 5. 名称相关的字段的 key 使用name;代号、编号或者唯一标识相关的字段的key使用id,id由数字、英文字母、下划线组成且以英文字母开头,长度尽量简短;序号相关的字段的key使用number;偏移位置相关字段的key使用pos;其他没有举例的字段使用精简的翻译作为字段的key;每个结构必须包含name和id。 6. 遥测帧为CADU,其中包含同步头和VCDU,按照习惯需要使用VCDU层级嵌套传输帧主导头、传输帧插入域、传输帧数据域、传输帧尾的结构。 7. 数据包字段包括:name、id、type、pos、length、children;参数字段包括:name、id、pos、type、length;必须包含pos和length字段。 8. 常用id参考:遥测(TM)、遥控(TC)、总线(BUS)、版本号(Ver)、应用过程标识(APID)。 9. 注意:一定要记得morkdown文档中会将一些特殊字符进行转义,以此来保证文档的正确性,这些转义符号(也就是反斜杠‘\’)不需要在结果中输出。 10. 以 JSON 格式组织输出内容,确保数据结构的完整性和可读性,注意:生成的JSON语法格式必须符合json规范,避免出现错误。 ## 限制: - 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。 - 不输出任何注释等描述性信息。 """ g_completion = None def read_from_file(cache_file): with open(cache_file, 'r', encoding='utf-8') as f: text = f.read() return text def save_to_file(text, file_path): if USE_CACHE: with open(file_path, 'w', encoding='utf-8') as f: f.write(text) json_pat = re.compile(r'```json(.*?)```', re.DOTALL) def remove_markdown(text): # 使用正则表达式提取json文本 try: return json_pat.findall(text)[0] except IndexError: return text def rt_pkt_map_gen(pkt, trans_ser, rt_pkt_map, pkt_id, vals): # 逻辑封装包,数据块传输的只有一个,取数的根据RT地址、子地址和帧号划分 frame_num = pkt['frameNum'] if trans_ser == '数据块传输': # 数据块传输根据RT地址和子地址划分 key = f'{pkt["rt"]}_{pkt["subAddr"]}' name = f'{pkt["rt"]}_{pkt["subAddr"]}_{trans_ser}' else: # 取数根据RT地址、子地址和帧号划分 key = f'{pkt["rt"]}_{pkt["subAddr"]}_{pkt["frameNum"]}' name = f'{pkt["rt"]}_{pkt["subAddr"]}_帧号{frame_num}_{trans_ser}' # if key not in rt_pkt_map: rt_pkt_map[key] = { "name": name, "id": pkt_id, "type": "logic", "pos": 0, "content": "CYCLEBUFFER,Message,28,0xFFFF", "length": "", "vals": vals, "children": [] } frame = f'{pkt["frameNum"]}' interval = f'{pkt["interval"]}'.replace(".", "_") if trans_ser == '取数': _key = f'RT{pkt["rtAddr"]}Frame{frame.replace("|", "_")}_Per{interval}' else: # 数据块传输 if pkt['burst']: _key = f'RT{pkt["rtAddr"]}FrameALL' else: _key = f'RT{pkt["rtAddr"]}Frame{frame}Per{interval}' _pkt = next(filter(lambda it: it['name'] == _key, rt_pkt_map[key]['children']), None) if _pkt is None: ext_info = None if trans_ser == '数据块传输' and not pkt['burst']: # 数据块传输且有周期的包需要 ext_info = [{"id": "PeriodTriger", "name": "时分复用总线触发属性", "val": f"{pkt['interval']}"}, {"id": "FrameNumber", "name": "时分复用协议帧号", "val": frame}] _pkt = { "name": _key, "id": _key, "type": "enc", "pos": 0, "content": "1:N;EPDU", "length": "length", "extInfo": ext_info, "children": [ { "id": "C02_ver", "name": "遥测版本", "type": "para", "pos": 0, "length": 3, "dataTy": "INVAR", "content": "0" }, { "id": "C02_type", "name": "类型", "type": "para", "pos": 3, "length": 1, "dataTy": "INVAR", "content": "0" }, { "id": "C02_viceHead", "name": "副导头标识", "type": "para", "pos": 4, "length": 1, "content": "1", "dataTy": "INVAR" }, { "id": "C02_PackSign", "name": "APID", "type": "para", "pos": 5, "length": 11, "is_key": True, "dataTy": "ENUM" }, { "id": "C02_SerCtr_1", "name": "序列标记", "type": "para", "pos": 16, "length": 2, "content": "3" }, { "id": "C02_SerCtr_2", "name": "包序计数", "type": "para", "pos": 18, "length": 14, "content": "0:167772:1", "dataTy": "INCREASE" }, { "id": "C02_PackLen", "name": "包长", "type": "para", "pos": 32, "length": 16, "content": "1Bytes/C02_Data.length+1", "dataTy": "LEN" }, { "id": "C02_Ser", "name": "服务", "type": "para", "pos": 48, "length": 8, "is_key": True, "dataTy": "ENUM" }, { "id": "C02_SubSer", "name": "子服务", "type": "para", "pos": 56, "length": 8, "is_key": True, "dataTy": "ENUM" }, { "id": "C02_Data", "name": "数据区", "type": "linear", "pos": 64, "length": 'length-current', "children": [] }, ] } rt_pkt_map[key]['children'].append(_pkt) # 数据区下面的包 data_area = next(filter(lambda it: it['name'] == '数据区', _pkt['children']), None) ser_sub_ser: str = pkt['service'] ser = '' sub_ser = '' if ser_sub_ser: nums = re.findall(r'\d+', ser_sub_ser) if len(nums) == 2: ser = nums[0] sub_ser = nums[1] if 'children' not in pkt: pkt['children'] = [] p_name = pkt['id'] + '_' + pkt['name'] data_area['children'].append({ "name": p_name, "id": pkt["id"], "type": "linear", "pos": 0, "length": pkt["length"], "vals": f"0x{pkt['apid']}/{ser}/{sub_ser}/", "children": pkt['children'], }) def build_vcid_content(vcs): _vcs = [] for vc in vcs: _vcs.append(vc['name'] + ',' + vc['VCID']) return ' '.join(_vcs) class DbStructFlow: # 工程 proj: TProject = None # 遥测源包列表,仅包名称、包id和hasParams tm_pkts = [] # vc源包 vc_pkts = [] def __init__(self): self.client = OpenAI( api_key=API_KEY, base_url=BASE_URL, # api_key="ollama", # base_url="http://192.168.1.48:11434/v1/", ) def run(self): # 生成型号结构 # 生成设备结构 # 生成数据流结构 CADU # 生成VCDU结构 # 生成遥测数据包结构 self.proj = self.gen_project() devs = self.gen_device(self.proj) # self.gen_tc() return '' def _gen(self, msgs, msg, files=None): if files is None: files = [file_map['文档合并']] messages = [] if msgs is None else msgs doc_text = '' for file in files: doc_text += '\n' + read_from_file(file) if len(messages) == 0: # 如果是第一次提问加入system消息 messages.append({'role': 'system', 'content': assistant_msg}) messages.append({'role': 'user', 'content': "以下是文档内容:\n" + doc_text}) messages.append({'role': 'user', 'content': msg}) completion = self.client.chat.completions.create( model=MODEL_NAME, messages=messages, stream=True, temperature=0.0, top_p=0, timeout=30 * 60000, max_completion_tokens=1000000, seed=0 # stream_options={"include_usage": True} ) g_completion = completion text = '' for chunk in completion: if chunk.choices[0].delta.content is not None: text += chunk.choices[0].delta.content print(chunk.choices[0].delta.content, end="") print("") g_completion = None return text def generate_text(self, msg, cache_file, msgs=None, files=None, validation=None, try_cnt=5): if msgs is None: msgs = [] if USE_CACHE and os.path.isfile(cache_file): text = read_from_file(cache_file) else: s = time.time() text = self._gen(msgs, msg, files) text = remove_markdown(text) if validation: try: validation(text) except BaseException as e: print(e) if try_cnt <= 0: raise RuntimeError('生成失败,重试次数太多,强制结束!') return self.generate_text(msg, cache_file, msgs, files, validation, try_cnt - 1) save_to_file(text, cache_file) print(f'耗时:{time.time() - s}') return text def generate_tc_text(self, msg, cache_file, messages=None, files=None, validation=None, try_cnt=5): if messages is None: messages = [] doc_text = '' for file in files: doc_text += '\n' + read_from_file(file) if len(messages) == 0: # 如果是第一次提问加入system消息 messages.append({'role': 'user', 'content': "以下是文档内容:\n" + doc_text}) return self.generate_text(msg, cache_file, messages, files, validation, try_cnt) def gen_project(self): # _msg = """ # 根据文档输出型号信息,型号字段包括:名称和代号。仅输出型号这一级。 # 例如:{"name":"xxx","id":"xxx"} # """ # print('型号信息:') # text = self.generate_text(_msg, 'out/型号信息.json', files=[file_map['应用软件用户需求']]) # proj_dict = json.loads(text) # 工程信息从系统获取 proj_dict = { "id": "JB200001", "name": "HY-4A" } code = proj_dict['id'] name = proj_dict['name'] proj = create_project(code, name, code, name, "", datetime.now()) return proj def gen_device(self, proj): """ 设备列表生成规则: 1.如文档中有1553协议描述,加入1553设备 2.如是类SMU软件(遥测遥控包含,BC或者RT),加入对应相关设备,文档只有设备名称和设备ID,设备类型90%是标准类型 3.如是类RTU软件,加入对应相关设备,文档里面有设备名称和设备ID,同上 4.如基于软平台,如是SMU软件,加入SMU工控机设备,待定 设备类型:工控机[0]、1553B[1] :param proj: :return: """ proj_pk = proj.C_PROJECT_PK devices = [] _msg = f""" 输出分系统下的硬件产品(设备)列表,字段包括:名称(name)、代号(code),硬件产品名称一般会包含“管理单元”或者“接口单元”,如果没有代号则使用名称的英文缩写代替缩写长度不超过5个字符; 并且给每个硬件产品增加三个字段:第一个字段hasTcTm“是否包含遥控遥测”,判断该硬件产品是否包含遥控遥测的功能、 第二个字段hasTemperatureAnalog“是否包含温度量、模拟量等数据的采集”,判断该硬件产品是否包含温度量等信息的采集功能、 第三个字段hasBus“是否是总线硬件产品”,判断该设备是否属于总线硬件产品,是否有RT地址;每个字段的值都使用true或false来表示。 仅输出JSON,结构最外层为数组,数组元素为设备信息,不要输出JSON以外的任何字符。 """ print('设备列表:') cache_file = 'out/设备列表.json' def validation(gen_text): _devs = json.loads(gen_text) assert isinstance(_devs, list), '数据结构最外层不是数组' assert next(filter(lambda it: it['name'].endswith('管理单元'), _devs), None), '生成的设备列表中没有管理单元' text = self.generate_text(_msg, cache_file, files=[file_map['应用软件用户需求']], validation=validation) devs = json.loads(text) # 类SMU设备,包含遥测和遥控功能,名称结尾为“管理单元” like_smu_devs = list(filter(lambda it: it['hasTcTm'] and it['name'].endswith('管理单元'), devs)) for dev in like_smu_devs: dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK) devices.append(dev) # 创建数据流 ds_tmfl, rule_stream, _ = create_data_stream(proj_pk, dev.C_DEV_PK, 'AOS遥测', 'TMF1', 'TMFL', '1', 'TMF1', '001') self.gen_tm_frame(proj_pk, rule_stream.C_RULE_PK, ds_tmfl, rule_stream.C_PATH) # ds_tcfl, rule_stream, _ = create_data_stream(proj_pk, dev.C_DEV_PK, '遥控指令', 'TCFL', 'TCFL', '0', 'TCFL', # '006') hasBus = any(d['hasBus'] for d in devs) if hasBus: # 总线设备 dev = create_device("1553", "1553总线", '1', 'StandardProCommunicationDev', proj_pk) create_extend_info(proj_pk, "BusType", "总线类型", "ECSS_Standard", dev.C_DEV_PK) devices.append(dev) # 创建数据流 ds_u153, rs_u153, rule_enc = create_data_stream(proj_pk, dev.C_DEV_PK, '上行总线数据', 'U15E', 'B153', '0', '1553', '001') # 创建总线结构 self.gen_bus(proj_pk, rule_enc, '1553', ds_u153, rs_u153.C_PATH, dev.C_DEV_NAME) ds_d153, rule_stream, rule_enc = create_data_stream(proj_pk, dev.C_DEV_PK, '下行总线数据', 'D15E', 'B153', '1', '1553', '001', rs_u153.C_RULE_PK) create_ref_ds_rule_stream(proj_pk, rule_stream.C_STREAM_PK, rule_stream.C_STREAM_ID, rule_stream.C_STREAM_NAME, rule_stream.C_STREAM_DIR, rs_u153.C_STREAM_PK) # 类RTU设备,包含温度量和模拟量功能,名称结尾为“接口单元” # like_rtu_devs = list(filter(lambda it: it['hasTemperatureAnalog'] and it['name'].endswith('接口单元'), devs)) # for dev in like_rtu_devs: # dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK) # for dev in like_rtu_devs: # dev = create_device(dev['code'], dev['name'], '0', '', proj.C_PROJECT_PK) # devices.append(dev) # # 创建数据流 # ds_tmfl = create_data_stream(proj.C_PROJECT_PK, '温度量', 'TMFL', 'TMFL', '1', 'TMFL', '001') # ds_tcfl = create_data_stream(proj.C_PROJECT_PK, '模拟量', 'TCFL', 'TCFL', '0', 'TCFL', '006') return devices def gen_insert_domain_params(self): _msg = """ 分析文档,输出插入域的参数列表,将所有参数全部输出,不要有遗漏。 数据结构最外层为数组,数组元素为参数信息对象,参数信息字段包括:name、id、pos、length、type。 1个字节的长度为8位,使用B0-B7来表示,请认真计算参数长度。 文档中位置描述信息可能存在跨字节的情况,,例如:"Byte1_B6~Byte2_B0":表示从第1个字节的第7位到第2个字节的第1位,长度是3;"Byte27_B7~Byte28_B0":表示从第27个字节的第8位到第28个字节的第1位,长度是2。 """ print('插入域参数列表:') files = [file_map['遥测大纲']] def validation(gen_text): params = json.loads(gen_text) assert isinstance(params, list), '插入域参数列表数据结构最外层必须是数组' assert len(params), '插入域参数列表不能为空' text = self.generate_text(_msg, './out/插入域参数列表.json', files=files, validation=validation) return json.loads(text) def gen_tm_frame_data(self): _msg = """ """ files = [file_map['遥测大纲']] def validation(gen_text): pass def gen_tm_frame(self, proj_pk, rule_pk, ds, name_path): # 插入域参数列表 insert_domain = self.gen_insert_domain_params() # VC源包格式 vc_pkt_fields = data_templates.vc_pkt_fields # self.gen_pkt_format() # 获取虚拟信道 vc vcs = self.gen_vc() for vc in vcs: vc['children'] = [] vc['VCID'] = str(int(vc['VCID'], 2)) for field in vc_pkt_fields: if field['name'] == '数据域': field['children'] = [] vc['children'].append(dict(field)) # VCID 字段内容 vcid_content = build_vcid_content(vcs) # 遥测帧结构由模板生成,只需提供特定参数 tm_data = { "vcidContent": vcid_content, 'insertDomain': insert_domain, } cadu = data_templates.get_tm_frame(tm_data) # VC源包 self.vc_pkts = self.gen_pkt_vc() # 遥测源包设计中的源包列表 self.tm_pkts = self.gen_pkts() # 处理VC下面的遥测包数据 for vc in vcs: # 此VC下的遥测包过滤 _vc_pkts = filter(lambda it: it['vcs'].__contains__(vc['id']), self.vc_pkts) for _pkt in _vc_pkts: # 判断遥测包是否有详细定义 if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], self.tm_pkts), None): continue # 获取包详情 _pkt = self.gen_pkt_details(_pkt['name'], _pkt['id']) epdu = next(filter(lambda it: it['name'] == '数据域', vc['children']), None) if epdu and _pkt: _pkt['children'] = _pkt['datas'] _last_par = _pkt['children'][len(_pkt['children']) - 1] _pkt['length'] = (_last_par['pos'] + _last_par['length']) _pkt['pos'] = 0 if 'children' not in epdu: epdu['children'] = [] # 添加解析规则后缀防止重复 _pkt['id'] = _pkt['id'] + '_' + vc['VCID'] # 给包名加代号前缀 if not _pkt['name'].startswith(_pkt['id']): _pkt['name'] = _pkt['id'] + '_' + _pkt['name'] epdu['children'].append(_pkt) apid_node = next(filter(lambda it: it['name'].__contains__('应用过程'), _pkt['headers']), None) ser_node = next(filter(lambda it: it['name'] == '服务', _pkt['headers']), None) sub_ser_node = next(filter(lambda it: it['name'] == '子服务', _pkt['headers']), None) _pkt['vals'] = \ f"{apid_node['content']}/{int(ser_node['content'], 16)}/{int(sub_ser_node['content'], 16)}/" # 重新计数起始偏移 self.compute_length_pos(cadu['children']) # 将数据插入数据库 seq = 1 for cadu_it in cadu['children']: if cadu_it['name'] == 'VCDU': # VCDU # 将信道替换到数据域位置 vc_data = next(filter(lambda it: it['name'].__contains__('数据域'), cadu_it['children']), None) if vc_data: idx = cadu_it['children'].index(vc_data) cadu_it['children'].pop(idx) for vc in vcs: # 处理虚拟信道属性 vc['type'] = 'logic' vc['length'] = vc_data['length'] vc['pos'] = vc_data['pos'] vc['content'] = 'CCSDSMPDU' vcid = vc['VCID'] vc['condition'] = f'VCID=={vcid}' # 将虚拟信道插入到VCDU cadu_it['children'].insert(idx, vc) idx += 1 for vc in vcs: self.compute_length_pos(vc['children']) # 设置VCID的content vcid_node = next(filter(lambda it: it['name'].__contains__('VCID'), cadu_it['children']), None) if vcid_node: vcid_node['content'] = vcid_content create_enc_pkt(proj_pk, rule_pk, cadu_it, rule_pk, seq, name_path, ds, '001', 'ENC') else: # 参数 create_prop_enc(proj_pk, rule_pk, cadu_it, get_data_ty(cadu_it), seq) seq += 1 return cadu def gen_vc(self): _msg = """ 请分析文档中的遥测包格式,输出遥测虚拟信道的划分,数据结构最外层为数组,数组元素为虚拟信道信息字典,字典包含以下键值对: id: 虚拟信道代号 name: 虚拟信道名称 VCID: 虚拟信道VCID(二进制) format: 根据虚拟信道类型获取对应的数据包的格式的名称 深入理解文档中描述的关系,例如:文档中描述了常规遥测是常规数据的下传信道,并且还描述了分系统常规遥测参数包就是实时遥测参数包,并且文档中对实时遥测参数包的格式进行了描述,所以常规遥测VC应该输出为:{"id": "1", "name": "常规遥测VC", "VCID": "0", "format": "实时遥测参数包"} """ def validation(gen_text): vcs = json.loads(gen_text) assert next(filter(lambda it: re.match('^[0-1]+$', it['VCID']), vcs)), '生成的VCID必须是二进制' print('虚拟信道:') text = self.generate_text(_msg, "out/虚拟信道.json", files=[file_map['遥测大纲']], validation=validation) vcs = json.loads(text) return vcs def gen_dev_pkts(self): _msg = f""" 输出文档中遥测源包类型定义描述的设备以及设备下面的遥测包,数据结构:最外层为数组 > 设备 > 遥测包列表(pkts),设备字段包括:名称(name)、代号(id),源包字段包括:名称(name)、代号(id) """ print('设备遥测源包信息:') files = [file_map["遥测源包设计报告"]] text = self.generate_text(_msg, 'out/设备数据包.json', [], files) dev_pkts = json.loads(text) return dev_pkts def pkt_in_tm_pkts(self, pkt_name): cache_file = f'out/数据包-{pkt_name}.json' if os.path.isfile(cache_file): return True files = [file_map['遥测源包设计报告']] print(f'文档中有无“{pkt_name}”的字段描述:', end='') _msg = f""" 文档中有遥测包“{pkt_name}”的字段表描述吗?遥测包名称必须完全匹配。输出:“无”或“有”,不要输出其他任何内容。 注意:遥测包的字段表紧接着遥测包章节标题,如果章节标题后面省略了或者详见xxx则是没有字段表描述。 根据文档内容输出。""" text = self.generate_text(_msg, f'out/pkts/有无数据包-{pkt_name}.txt', [], files) return text == '有' def gen_pkt_details(self, pkt_name, pkt_id): cache_file = f'out/数据包-{pkt_name}.json' files = [file_map['遥测源包设计报告']] if not os.path.isfile(cache_file): _msg = f""" 输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包; 遥测包字段包括:名称(name)、代号(id)、类型(type)、包头属性列表(headers)、数据域参数列表(datas),类型为 linear; 包头属性字段包括:名称(name)、代号(id)、位置(pos)、定义(content)、长度(length)、类型(type),类型为 para; 数据域参数字段包括:参数名称(name)、参数代号(id)、位置(pos)、长度(length)、字节顺序(byteOrder),类型为 para; 如果没有名称用代号代替,如果没有代号用名称的英文翻译代替,翻译尽量简短; 你需要理解数据包的位置信息,并且将所有输出单位统一转换为 bits,位置字段的输出格式必须为数值类型; 数据结构仅只包含遥测包,仅输出json,不要输出任何其他内容。""" print(f'遥测源包“{pkt_name}”信息:') def validation(gen_text): _pkt = json.loads(gen_text) assert 'headers' in _pkt, '包结构中必须包含headers字段' assert 'datas' in _pkt, '包结构中必须包含datas字段' text = self.generate_text(_msg, cache_file, [], files, validation) pkt = json.loads(text) else: pkt = json.loads(read_from_file(cache_file)) pkt_len = 0 for par in pkt['datas']: par['pos'] = pkt_len pkt_len += par['length'] pkt['length'] = pkt_len return pkt def gen_pkts(self): _msg = f""" 输出文档中描述的遥测包。 遥测包字段包括:名称(name)、代号(id)、hasParams, 名称中不要包含代号, hasParams表示当前遥测包是否有参数列表,遥测包的参数表紧接着遥测包章节标题,如果章节标题后面省略了或者详见xxx则是没有参数表, 如果没有代号用名称的英文翻译代替,如果没有名称用代号代替, 数据结构最外层为数组数组元素为遥测包,不包括遥测包下面的参数。 """ print(f'遥测源包列表:') files = [file_map['遥测源包设计报告']] text = self.generate_text(_msg, 'out/源包列表.json', [], files) pkt = json.loads(text) return pkt def gen_pkt_vc(self): _msg = f""" 根据遥测源包下传时机定义,输出各个遥测源包信息列表,顶级结构为数组元素为遥测源包,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs),下传时机(timeTags) """ files = [file_map['遥测大纲']] print('遥测源包所属虚拟信道:') def validation(gen_text): pkts = json.loads(gen_text) assert len(pkts), 'VC源包列表不能为空' text = self.generate_text(_msg, 'out/遥测VC源包.json', files=files, validation=validation) pkt_vcs = json.loads(text) return pkt_vcs def gen_pkt_format(self): _msg = f""" 请仔细分系文档,输出各个数据包的格式,数据结构最外层为数组,数组元素为数据包格式,将主导头的子级提升到主导头这一级并且去除主导头,数据包type为logic,包数据域type为any。 包格式children包括:版本号(id:Ver)、类型(id:TM_Type)、副导头标志(id:Vice_Head)、应用过程标识符(id:Proc_Sign)、分组标志(id:Group_Sign)、包序列计数(id:Package_Count)、包长(id:Pack_Len)、数据域(id:EPDU_DATA)。 children元素的字段包括:name、id、pos、length、type 注意:生成的JSON语法格式要合法。 """ print('遥测包格式:') text = self.generate_text(_msg, 'out/数据包格式.json', files=[file_map['遥测大纲']]) pkt_formats = json.loads(text) return pkt_formats def compute_length_pos(self, items: list): length = 0 pos = 0 for child in items: if 'children' in child: self.compute_length_pos(child['children']) child['pos'] = pos if 'length' in child and isinstance(child['length'], int): length = length + child['length'] pos = pos + child['length'] # node['length'] = length def gen_bus(self, proj_pk, rule_enc, rule_id, ds, name_path, dev_name): _msg = f""" 请析文档,列出总线通信包传输约定中描述的所有数据包列表, 数据包字段包括:id、name、apid(16进制字符串)、service(服务子服务)、length(bit长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、 transSer(传输服务)、note(备注)、rtAddr(所属RT的地址十进制)、rt(所属rt名称)、throughBus(是否经过总线)、burst(是否突发)、transDirect(传输方向), 数据结构最外层是数组,数组元素为数据包,以JSON格式输出,不要输出JSON以外的任何文本。 通信帧号:使用文档中的文本不要做任何转换。 subAddr:值为“深度”、“平铺”、“数字”或null。 是否经过总线的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线。 传输服务分三种:SetData(置数)、GetData(取数)、DataBlock(数据块传输)。 传输方向分:”收“和”发“,传输服务如果是”取数“是”收“,如果是”数据块传输“则根据包所在的分系统以及表格的”传输方向“列进行判断,判断对于SMU来说是收还是发。 是否突发的判断依据:根据表格中的”传输周期“列进行判断,如果填写了类似”突发“的文字表示是突发否则表示不是突发。 """ print('总线数据包:') def validation(gen_text): json.loads(gen_text) text = self.generate_text(_msg, 'out/总线.json', files=[file_map['总线传输通信帧分配']], validation=validation) pkts = json.loads(text) # 筛选经总线的数据包 pkts = list(filter(lambda it: it['throughBus'], pkts)) no_apid_pkts = list(filter(lambda it: not it['apid'], pkts)) # 筛选有apid的数据包 pkts = list(filter(lambda it: it['apid'], pkts)) pkts2 = [] for pkt in pkts: if self.pkt_in_tm_pkts(pkt["name"]): pkts2.append(pkt) for pkt in pkts2: _pkt = self.gen_pkt_details(pkt['name'], pkt['id']) if _pkt: pkt['children'] = [] pkt['children'].extend(_pkt['datas']) pkt['length'] = _pkt['length'] rt_pkt_map = {} for pkt in pkts: # 根据数据块传输和取数分组 # 逻辑封装包的解析规则ID:RT[rt地址]SUB[子地址]S(S代表取数,方向是AA表示发送;R代表置数,方向是BB表示接受) # 取数:逻辑封装包根据子地址和帧号组合创建,有几个组合就创建几个逻辑封装包 # 数据块:只有一个逻辑封装包 # 处理子地址 if pkt['burst']: # 突发包子地址是18~26 pkt['subAddr'] = 26 elif pkt['subAddr'] == '平铺' or pkt['subAddr'] is None: # 平铺:11~26,没有填写的默认为平铺 pkt['subAddr'] = 26 elif pkt['subAddr'] == '深度': # 深度:11 pkt['subAddr'] = 11 # 处理帧号 if pkt['burst']: # 突发:ALL pkt['frameNum'] = 'ALL' elif not pkt['frameNum']: # 有 pkt['frameNum'] = '' # todo: 处理传输方向 rt_addr = pkt['rtAddr'] sub_addr = pkt['subAddr'] trans_ser = pkt['transSer'] frame_no = pkt['frameNum'].replace('|', ',') if trans_ser == 'GetData': # 取数 pkt_id = f"RT{rt_addr}SUB{sub_addr}" vals = f"{rt_addr}/{sub_addr}/0xAA/{frame_no}/" rt_pkt_map_gen(pkt, '取数', rt_pkt_map, pkt_id, vals) elif trans_ser == 'DataBlock': # 数据块 direct = '0xAA' rt_pkt_map_gen(pkt, '数据块传输', rt_pkt_map, f"RT{rt_addr}SUB{sub_addr}{direct}", f"{rt_addr}/{sub_addr}/{direct}/ALL/") _pkts = [] for k in rt_pkt_map: _pkts.append(rt_pkt_map[k]) bus_items = data_templates.get_bus_datas(_pkts) seq = 1 sub_key_nodes = list(filter(lambda it: 'is_key' in it, bus_items)) has_key = any(sub_key_nodes) rule_pk = rule_enc.C_ENC_PK sub_key = '' key_items = [] self.compute_length_pos(bus_items) for item in bus_items: if item['type'] == 'enc': if has_key: _prop_enc = create_any_pkt(proj_pk, rule_pk, item, seq, name_path, ds, 'ENC', sub_key_nodes, key_items) else: _prop_enc, rule_stream, _ = create_enc_pkt(proj_pk, rule_pk, item, rule_enc.C_ENC_PK, seq, name_path, ds, '001', 'ENC') else: # 参数 _prop_enc = create_prop_enc(proj_pk, rule_pk, item, get_data_ty(item), seq) if item.__contains__('is_key'): sub_key += _prop_enc.C_ENCITEM_PK + '/' key_items.append( {"pk": _prop_enc.C_ENCITEM_PK, 'id': _prop_enc.C_SEGMENT_ID, 'name': _prop_enc.C_NAME, 'val': ''}) seq += 1 if sub_key: rule_enc.C_KEY = sub_key update_rule_enc(rule_enc) def gen_tc(self): # 数据帧格式 frame = self.gen_tc_transfer_frame() # 数据包格式 pkt_format = self.gen_tc_transfer_pkt() # 数据包列表 pkts = self.gen_tc_transfer_pkts() for pkt in pkts: pf = json.loads(json.dumps(pkt_format)) pf['name'] = pkt['name'] ph = next(filter(lambda x: x['name'] == '主导头', pf['children']), None) apid = next(filter(lambda x: x['name'] == '应用进程标识符(APID)', ph['children']), None) apid['value'] = pkt['apid'] apid['type'] = 'const' sh = next(filter(lambda x: x['name'] == '副导头', pf['children']), None) ser = next(filter(lambda x: x['name'] == '服务类型', sh['children']), None) sub_ser = next(filter(lambda x: x['name'] == '服务子类型', sh['children']), None) ser['value'] = pkt['server'] ser['type'] = 'const' sub_ser['value'] = pkt['subServer'] sub_ser['type'] = 'const' frame['subPkts'].append(pf) self.order = 0 def build_def(item: dict): if item['type'] == 'enum': return json.dumps({"EnumItems": item['enums'], "CanInput": True}) elif item['type'] == 'length': return None elif item['type'] == 'checkSum': return json.dumps({"ChecksumType": "CRC-CCITT"}) elif item['type'] == 'subPkt': return json.dumps({"CanInput": False}) elif item['type'] == 'combPkt': return None elif 'value' in item: return item['value'] def create_tc_format(parent_pk, field): field['order'] = self.order self.order += 1 field['def'] = build_def(field) if 'length' in field: field['bitWidth'] = field['length'] field['bitOrder'] = None field['attr'] = 0 if field['type'] == 'length': val = field['value'] field['range'] = val['start'] + "~" + val['end'] field['formula'] = val['formula'] ins_format = create_ins_format(self.proj.C_PROJECT_PK, parent_pk, field) if 'children' in field: autocode = 1 if field['type'] == 'pkt': ins_format = create_ins_format(self.proj.C_PROJECT_PK, ins_format.C_INS_FORMAT_PK, {'order': self.order, 'type': 'subPkt', 'def': json.dumps({"CanInput": False})}) self.order += 1 for child in field['children']: child['autocode'] = autocode autocode += 1 create_tc_format(ins_format.C_INS_FORMAT_PK, child) # if 'subPkts' in field: # for pkt in field['subPkts']: # ins_format = create_ins_format(self.proj.C_PROJECT_PK, ins_format.C_INS_FORMAT_PK, # {'order': self.order, 'type': 'subPkt', # 'def': json.dumps({"CanInput": False})}) # create_tc_format(ins_format.C_INS_FORMAT_PK, pkt) create_tc_format(None, frame) def gen_tc_transfer_frame(self): _msg = ''' 分析YK传送帧格式,提取YK传送帧的数据结构,不包括数据包的数据结构。 ## 经验: 字段类型包括: 1.组合包:combPkt, 2.固定码字:const, 3.长度:length, 4.枚举值:enum, 5.校验和:checkSum, 6.数据区:subPkt。 根据字段描述分析字段的类型,分析方法: 1.字段描述中明确指定了字段值的,类型为const, 2.字段中没有明确指定字段值,但是罗列了取值范围的,类型为enum, 3.字段描述中如果存在多层级描述则父级字段的类型为combPkt, 4.字段如果是和“长度”有关,类型为length, 5.如果和数据域有关,类型为subPkt, 6.字段如果和校验和有关,类型为checkSum。 字段值提取方法: 1.字段描述中明确指定了字段值, 2.长度字段的值要根据描述确定起止字段范围以及计算公式,value格式例如:{"start":"","end":"","formula":"N-1"},注意:start和end的值为字段code。 ## 限制: - length 自动转换为bit长度。 - value 根据字段描述提取。 - enums 有些字段是枚举值,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""}。 - 输出内容必须为严格的json,不能输出除json以外的任何内容。 字段数据结构: 主导头 版本号、通过标志、控制命令标志、空闲位、HTQ标识、虚拟信道标识、帧长、帧序列号 传送帧数据域 帧差错控制域。 # 输出内容例子: { "name": "YK帧", "type": "pkt" "children":[ { "name": "主导头", "code": "primaryHeader", "length": 2, "value": "00", "type": "combPkt", "children": [ { "name": "版本号", "code": "verNum" "length": 1, "value": "00" } ] } ], "subPkts":[] } ''' def validation(gen_text): json.loads(gen_text) text = self.generate_tc_text(_msg, 'out/tc_transfer_frame.json', files=[file_map['指令格式']], validation=validation) frame = json.loads(text) return frame def gen_tc_transfer_pkt(self): _msg = ''' 仅分析YK包格式,提取YK包数据结构。 ## 经验: 字段类型包括: 1.组合包:combPkt, 2.固定码字:const, 3.长度:length, 4.枚举值:enum, 5.校验和:checkSum, 6.数据区:subPkt。 根据字段描述分析字段的类型,分析方法: 1.字段描述中明确指定了字段值的,类型为const, 2.字段中没有明确指定字段值,但是罗列了取值范围的,类型为enum, 3.字段描述中如果存在多层级描述则父级字段的类型为combPkt, 4.字段如果是和“长度”有关,类型为length, 5.如果和数据域有关,类型为subPkt, 6.字段如果和校验和有关,类型为checkSum。 字段值提取方法: 1.字段描述中明确指定了字段值, 2.长度字段的值要根据描述确定起止字段范围以及计算公式,value格式例如:{"start":"","end":"","formula":"N-1"},注意:start和end的值为字段code。 ## 限制: - length 自动转换为bit长度。 - value 根据字段描述提取。 - enums 有些字段是枚举值,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""}。 - 输出内容必须为严格的json,不能输出除json以外的任何内容。 字段数据结构: 主导头 包识别 包版本号、包类型、数据区头标志、应用进程标识符(APID) 包序列控制 序列标志 包序列计数 包长 副导头 CCSDS副导头标志 YK包版本号 命令正确应答(Ack) 服务类型 服务子类型 源地址 应用数据区 帧差错控制域。 # 输出内容例子: { "name": "YK包", "type": "pkt" "children":[ { "name": "主导头", "code": "primaryHeader", "length": 2, "value": "00", "type": "combPkt", "children": [ { "name": "版本号", "code": "verNum" "length": 1, "value": "00" } ] } ], "subPkts":[] } ''' def validation(gen_text): json.loads(gen_text) text = self.generate_tc_text(_msg, 'out/tc_transfer_pkt.json', files=[file_map['指令格式']], validation=validation) pkt_format = json.loads(text) return pkt_format def gen_tc_transfer_pkts(self): _msg = ''' 分析文档列出所有的遥控源包。 ## 数据结构如下: [{ "name": "xxx", "code":"pkt", "apid":"0xAA", "server":"0x1", "subServer":"0x2" }] ''' def validation(gen_text): json.loads(gen_text) text = self.generate_tc_text(_msg, 'out/tc_transfer_pkts.json', files=[file_map['指令格式']], validation=validation) pkts = json.loads(text) return pkts if __name__ == '__main__': try: os.makedirs("./out/pkts", exist_ok=True) # 启动大模型处理流程 ret_text = DbStructFlow().run() except KeyboardInterrupt: if g_completion: g_completion.close()