| | |
| | | import asyncio |
| | | import math |
| | | import os |
| | | import subprocess |
| | | import time |
| | | from datetime import datetime |
| | | |
| | |
| | | import re |
| | | import json |
| | | |
| | | from langchain_community.chat_models import ChatOpenAI |
| | | from langchain_core.prompts import HumanMessagePromptTemplate, SystemMessagePromptTemplate |
| | | import textwrap |
| | | import data_templates |
| | | from knowledgebase import utils |
| | | from knowledgebase.db.db_helper import create_project, create_device, create_data_stream, \ |
| | | update_rule_enc, create_extend_info, create_ref_ds_rule_stream, create_ins_format |
| | | update_rule_enc, create_extend_info, create_ref_ds_rule_stream, create_ins_format, make_attr, init_db_helper |
| | | from knowledgebase.db.data_creator import create_prop_enc, create_enc_pkt, get_data_ty, create_any_pkt |
| | | |
| | | from knowledgebase.db.models import TProject |
| | | from knowledgebase.db.models import TProject, init_base_db |
| | | |
| | | file_map = { |
| | | "文档合并": "./doc/文档合并.md", |
| | | "遥测源包设计报告": "./doc/XA-5D无人机分系统探测源包设计报告(公开).md", |
| | | "遥测大纲": "./doc/XA-5D无人机探测大纲(公开).md", |
| | | "总线传输通信帧分配": "./doc/XA-5D无人机1314A总线传输通信帧分配(公开).md", |
| | | "应用软件用户需求": "./doc/XA-5D无人机软件用户需求(公开).docx.md", |
| | | "指令格式": "./doc/ZL格式(公开).docx.md" |
| | | } |
| | | # file_map = { |
| | | # "遥测源包设计报告": "./docs/HY-4A数管分系统遥测源包设计报告 Z 240824 更改3(内部) .docx.md", |
| | | # "遥测大纲": "./docs/HY-4A卫星遥测大纲 Z 240824 更改3(内部).docx.md", |
| | | # "总线传输通信帧分配": "./docs/HY-4A卫星1553B总线传输通信帧分配 Z 240824 更改3(内部).docx.md", |
| | | # "应用软件用户需求": "./docs/HY-4A数管分系统应用软件用户需求(星务管理分册) Z 240831 更改4(内部).docx.md" |
| | | # } |
| | | # file_map = { |
| | | # "文档合并": "./doc/文档合并.md", |
| | | # "遥测源包设计报告": "./doc/XA-5D无人机分系统探测源包设计报告(公开).md", |
| | | # "遥测大纲": "./doc/XA-5D无人机探测大纲(公开).md", |
| | | # "总线传输通信帧分配": "./doc/XA-5D无人机1314A总线传输通信帧分配(公开).md" |
| | | # } |
| | | from knowledgebase.db.doc_db_helper import doc_dbh |
| | | from knowledgebase.llm import llm |
| | | |
| | | BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1' |
| | | API_KEY = 'sk-15ecf7e273ad4b729c7f7f42b542749e' |
| | | MODEL_NAME = 'qwen2.5-14b-instruct-1m' |
| | | # BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1' |
| | | # API_KEY = 'sk-15ecf7e273ad4b729c7f7f42b542749e' |
| | | # MODEL_NAME = 'qwen2.5-72b-instruct' |
| | | |
| | | # BASE_URL = 'http://10.74.15.164:11434/v1/' |
| | | # BASE_URL = 'http://10.74.15.171:11434/v1/' |
| | | # API_KEY = 'ollama' |
| | | # MODEL_NAME = 'qwen2.5:32b-128k' |
| | | # MODEL_NAME = 'qwen2.5:72b-instruct' |
| | | |
| | | # BASE_URL = 'http://10.74.15.164:1001/api' |
| | | # API_KEY = 'sk-a909385bc14d4491a718b6ee264c3227' |
| | | # MODEL_NAME = 'qwen2.5:32b-128k' |
| | | # BASE_URL = 'http://chat.com/api' |
| | | # API_KEY = 'sk-49457e83f734475cb4cf7066c649d563' |
| | | # MODEL_NAME = 'qwen2.5:72b-120k' |
| | | |
| | | BASE_URL = 'http://10.74.15.171:8000/v1' |
| | | API_KEY = 'EMPTY' |
| | | # MODEL_NAME = 'QwQ:32b' |
| | | MODEL_NAME = 'Qwen2.5-72B-Instruct-AWQ' |
| | | # MODEL_NAME = 'qwen2.5:72b-instruct' |
| | | |
| | | USE_CACHE = True |
| | | assistant_msg = """ |
| | | 你是一名资深的软件工程师。 |
| | | """ |
| | | # |
| | | # ## 技能 |
| | | # ### 技能 1:通信协议分析 |
| | | # 1. 接收通信协议相关信息,理解协议的规则和流程,仅依据所给信息进行分析。 |
| | | # |
| | | # ## 目标导向 |
| | | # 1. 通过对文档和通信协议的分析,为用户提供清晰、准确的数据结构,帮助用户更好地理解和使用相关信息。 |
| | | # |
| | | # ## 限制: |
| | | # - 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。 |
| | | # - 不输出任何注释等描述性信息。 |
| | | tc_system_msg = """ |
| | | # 角色 |
| | | 你是一个专业的文档通信分析师,擅长进行文档分析和通信协议分析,同时能够解析 markdown 类型的文档。拥有成熟准确的文档阅读与分析能力,能够妥善处理多文档间存在引用关系的复杂情况。 |
| | | |
| | | ## 技能 |
| | | ### 技能 1:文档分析(包括 markdown 文档) |
| | | 1. 当用户提供文档时,仔细阅读文档内容,严格按照文档中的描述提取关键信息,不得加入自己的回答或建议。 |
| | | 2. 分析文档的结构、主题和重点内容,同样只依据文档进行表述。 |
| | | 3. 如果文档间存在引用关系,梳理引用脉络,明确各文档之间的关联,且仅呈现文档中体现的内容。 |
| | | |
| | | ### 技能 2:通信协议分析 |
| | | 1. 接收通信协议相关信息,理解协议的规则和流程,仅依据所给信息进行分析。 |
| | | |
| | | ## 背景知识 |
| | | ###软件主要功能与运行机制总结如下: |
| | | 1. 数据采集和处理: |
| | | DIU负责根据卫星的工作状态或模式提供遥测数据,包括模拟量(AN)、总线信号(BL)以及温度(TH)和数字量(DS),并将这些信息打包,通过总线发送给SMU。 |
| | | SMU则收集硬通道上的遥测参数,并通过总线接收DIU采集的信息。 |
| | | 2. 多路复用与数据传输: |
| | | 遥测源包被组织成E-PDU,进一步复用为M-PDU,并填充到VCDU中构成遥测帧。 |
| | | 利用CCSDS AOS CADU格式进行遥测数据的多路复用和传输。 |
| | | 3. 虚拟信道(VC)调度机制: |
| | | 通过常规遥测VC、突发数据VC、延时遥测VC、记录数据VC以及回放VC实现不同类型的数据下传。 |
| | | 4. 遥控指令处理: |
| | | 上行遥控包括直接指令和间接指令,需经过格式验证后转发给相应单机执行。 |
| | | 遥控帧通过特定的虚拟信道(VC)进行传输。 |
| | | 这些知识需要你记住,再后续的处理中可以帮助你理解要处理的数据。 |
| | | |
| | | ## 目标导向 |
| | | 1. 通过对文档和通信协议的分析,为用户提供清晰、准确的数据结构,帮助用户更好地理解和使用相关信息。 |
| | | |
| | | ## 规则 |
| | | 1. 每一个型号都会有一套文档,需准确判断是否为同一个型号的文档后再进行整体分析,每次只分析同一个型号的文档。 |
| | | 2. 大多数文档结构为:型号下包含设备,设备下包含数据流,数据流下包含数据帧,数据帧中有一块是包域,包域中会挂载各种类型的数据包。 |
| | | 3. 文档都是对于数据传输协议的描述,在数据流、数据帧、数据包等传输实体中都描述了各个字段的分布、各个字段的大小和位置等信息,且大小单位不统一,需理解这些单位,并将所有输出单位统一为 bits,长度字段使用 length 表示,位置字段使用 pos 表示,如果为变长使用“"变长"”表示。 |
| | | 4. 如果有层级,使用树形 JSON 输出,如果有子节点,子节点 key 使用children;需保证一次输出的数据结构统一,并且判断每个层级是什么类型,输出类型字段(type),类型字段的 key 使用 type,类型包括:型号(project)、设备(dev)、封装包(enc)、线性包(linear)、参数(para),封装包子级有数据包,所以type为enc,线性包子级只有参数,所以type为linear;每个层级都包含偏移位置(pos),每个层级的偏移位置从0开始。 |
| | | 5. 名称相关的字段的 key 使用name;代号、编号或者唯一标识相关的字段的key使用id,id由数字、英文字母、下划线组成且以英文字母开头,长度尽量简短;序号相关的字段的key使用number;偏移位置相关字段的key使用pos;其他没有举例的字段使用精简的翻译作为字段的key;每个结构必须包含name和id。 |
| | | 6. 遥测帧为CADU,其中包含同步头和VCDU,按照习惯需要使用VCDU层级嵌套传输帧主导头、传输帧插入域、传输帧数据域、传输帧尾的结构。 |
| | | 7. 数据包字段包括:name、id、type、pos、length、children;参数字段包括:name、id、pos、type、length;必须包含pos和length字段。 |
| | | 8. 常用id参考:遥测(TM)、遥控(TC)、总线(BUS)、版本号(Ver)、应用过程标识(APID)。 |
| | | 9. 注意:一定要记得morkdown文档中会将一些特殊字符进行转义,以此来保证文档的正确性,这些转义符号(也就是反斜杠‘\’)不需要在结果中输出。 |
| | | 10. 以 JSON 格式组织输出内容,确保数据结构的完整性和可读性,注意:生成的JSON语法格式必须符合json规范,避免出现错误。 |
| | | |
| | | ## 限制: |
| | | - 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。 |
| | | - 不输出任何注释等描述性信息。 |
| | | 你是一个资深软件工程师。 |
| | | # 约束 |
| | | - 输出内容根据文档内容输出。 |
| | | """ |
| | | |
| | | g_completion = None |
| | |
| | | f.write(text) |
| | | |
| | | |
| | | def remove_think_tag(text): |
| | | pattern = r'<think>(.|\n)*?</think>' |
| | | result = re.sub(pattern, '', text) |
| | | return result |
| | | |
| | | |
| | | json_pat = re.compile(r'```json(.*?)```', re.DOTALL) |
| | | |
| | | |
| | | def remove_markdown(text): |
| | | def get_json_text(text): |
| | | # 使用正则表达式提取json文本 |
| | | try: |
| | | return json_pat.findall(text)[0] |
| | |
| | | return text |
| | | |
| | | |
| | | def rt_pkt_map_gen(pkt, trans_ser, rt_pkt_map, pkt_id, vals): |
| | | def rt_pkt_map_gen(pkt, trans_ser, rt_pkt_map, pkt_id, vals, pkts: list): |
| | | # 逻辑封装包,数据块传输的只有一个,取数的根据RT地址、子地址和帧号划分 |
| | | frame_num = pkt['frameNum'] |
| | | if trans_ser == '数据块传输': |
| | |
| | | |
| | | interval = f'{pkt["interval"]}'.replace(".", "_") |
| | | if trans_ser == '取数': |
| | | _key = f'RT{pkt["rtAddr"]}Frame{frame.replace("|", "_")}_Per{interval}' |
| | | # 取数忽略周期 |
| | | # _key = f'RT{pkt["rtAddr"]}Frame{frame.replace("|", "_")}_Per{interval}' |
| | | _key = f'RT{pkt["rtAddr"]}Frame{frame.replace("|", "_")}' |
| | | else: |
| | | # 数据块传输 |
| | | if pkt['burst']: |
| | |
| | | |
| | | |
| | | class DbStructFlow: |
| | | json_path = '' |
| | | # 工程 |
| | | proj: TProject = None |
| | | # 遥测源包列表,仅包名称、包id和hasParams |
| | |
| | | # vc源包 |
| | | vc_pkts = [] |
| | | |
| | | def __init__(self): |
| | | def __init__(self, project_path: str): |
| | | self.client = OpenAI( |
| | | api_key=API_KEY, |
| | | base_url=BASE_URL, |
| | | # api_key="ollama", |
| | | # base_url="http://192.168.1.48:11434/v1/", |
| | | ) |
| | | self.json_path = f'{project_path}/json' |
| | | self.db_dir = f'{project_path}/db' |
| | | os.makedirs(f"{self.json_path}", exist_ok=True) |
| | | os.makedirs(f"{self.json_path}/pkts", exist_ok=True) |
| | | os.makedirs(f"{self.db_dir}", exist_ok=True) |
| | | init_base_db(f'{self.db_dir}/db.db') |
| | | init_db_helper() |
| | | |
| | | def run(self): |
| | | # self.llm = ChatOpenAI(model=MODEL_NAME, temperature=0, api_key=API_KEY, base_url=BASE_URL) |
| | | |
| | | async def run(self): |
| | | # 生成型号结构 |
| | | # 生成设备结构 |
| | | # 生成数据流结构 CADU |
| | | # 生成VCDU结构 |
| | | # 生成遥测数据包结构 |
| | | self.proj = self.gen_project() |
| | | tasks = [] |
| | | tasks.append(self.gen_device(self.proj)) |
| | | |
| | | devs = self.gen_device(self.proj) |
| | | tasks.append(self.gen_tc()) |
| | | |
| | | # self.gen_tc() |
| | | # 测试位置计算 |
| | | # print(self.handle_pos("Byte1_B0~Byte1_B0")) |
| | | # print(self.handle_pos("Byte0_B0~Byte0_B7")) |
| | | # print(self.handle_pos("Byte9_B0~Byte9_B7")) |
| | | |
| | | await asyncio.gather(*tasks) |
| | | return '' |
| | | |
| | | def _gen(self, msgs, msg, files=None): |
| | | if files is None: |
| | | files = [file_map['文档合并']] |
| | | def handle_pos(self, srt): |
| | | pos_data = { |
| | | "start": 0, |
| | | "end": 0 |
| | | } |
| | | pos = srt.split("~") |
| | | for index, p in enumerate(pos): |
| | | byte = p.split('_') |
| | | for b in byte: |
| | | if b.find("Byte") > -1: |
| | | value = b.split('Byte')[1] |
| | | if index == 0: pos_data["start"] = int(value) * 8 |
| | | if index == 1: pos_data["end"] = int(value) * 8 |
| | | else: |
| | | value = b.split('B')[1] |
| | | if index == 0: pos_data["start"] += int(value) |
| | | if index == 1: pos_data["end"] += int(value) |
| | | |
| | | return { |
| | | "pos": pos_data["start"], |
| | | "length": pos_data["end"] - pos_data["start"] + 1, |
| | | } |
| | | |
| | | def get_text_with_entity(self, entity_names: list[str]) -> str: |
| | | """ |
| | | 根据实体词获取文档文本 |
| | | :param entity_names: str - 实体词名称 |
| | | :return: str - 文本内容 |
| | | """ |
| | | return doc_dbh.get_text_with_entities(entity_names) |
| | | |
| | | def get_text_list_with_entity(self, entity_names: list[str]) -> str: |
| | | """ |
| | | 根据实体词获取文档文本列表 |
| | | :param entity_names: 实体词列表 |
| | | :return: [str] - 文本列表 |
| | | """ |
| | | return doc_dbh.get_texts_with_entities(entity_names) |
| | | |
| | | def _gen(self, msgs, msg, doc_text): |
| | | # if files is None: |
| | | # files = [file_map['文档合并']] |
| | | messages = [] if msgs is None else msgs |
| | | doc_text = '' |
| | | for file in files: |
| | | doc_text += '\n' + read_from_file(file) |
| | | # doc_text = '' |
| | | # for file in files: |
| | | # doc_text += '\n' + read_from_file(file) |
| | | # 去除多余的缩进 |
| | | msg = textwrap.dedent(msg).strip() |
| | | if len(messages) == 0: |
| | | # 如果是第一次提问加入system消息 |
| | | messages.append({'role': 'system', 'content': assistant_msg}) |
| | | messages.append({'role': 'user', 'content': "以下是文档内容:\n" + doc_text}) |
| | | messages.append({'role': 'user', 'content': msg}) |
| | | |
| | | completion = self.client.chat.completions.create( |
| | | model=MODEL_NAME, |
| | | messages=messages, |
| | | stream=True, |
| | | temperature=0.0, |
| | | top_p=0, |
| | | timeout=30 * 60000, |
| | | max_completion_tokens=1000000, |
| | | seed=0 |
| | | # stream_options={"include_usage": True} |
| | | ) |
| | | g_completion = completion |
| | | text = '' |
| | | for chunk in completion: |
| | | if chunk.choices[0].delta.content is not None: |
| | | text += chunk.choices[0].delta.content |
| | | print(chunk.choices[0].delta.content, end="") |
| | | print("") |
| | | g_completion = None |
| | | for ai_msg in llm.stream(messages): |
| | | text += ai_msg.content |
| | | print(ai_msg.content, end='') |
| | | print('') |
| | | |
| | | # completion = self.client.chat.completions.create( |
| | | # model=MODEL_NAME, |
| | | # messages=messages, |
| | | # stream=True, |
| | | # temperature=0, |
| | | # # top_p=0, |
| | | # timeout=30 * 60000, |
| | | # max_completion_tokens=32000, |
| | | # seed=0 |
| | | # # stream_options={"include_usage": True} |
| | | # ) |
| | | # g_completion = completion |
| | | # text = '' |
| | | # for chunk in completion: |
| | | # if chunk.choices[0].delta.content is not None: |
| | | # text += chunk.choices[0].delta.content |
| | | # print(chunk.choices[0].delta.content, end="") |
| | | # print("") |
| | | # g_completion = None |
| | | return text |
| | | |
| | | def generate_text(self, msg, cache_file, msgs=None, files=None, validation=None, try_cnt=5): |
| | | def generate_text(self, msg, cache_file, msgs=None, doc_text="", validation=None, try_cnt=5, json_text=False): |
| | | if msgs is None: |
| | | msgs = [] |
| | | if USE_CACHE and os.path.isfile(cache_file): |
| | | text = read_from_file(cache_file) |
| | | else: |
| | | s = time.time() |
| | | text = self._gen(msgs, msg, files) |
| | | text = remove_markdown(text) |
| | | text = self._gen(msgs, msg, doc_text) |
| | | text = remove_think_tag(text) |
| | | if json_text: |
| | | text = get_json_text(text) |
| | | if validation: |
| | | try: |
| | | validation(text) |
| | |
| | | print(e) |
| | | if try_cnt <= 0: |
| | | raise RuntimeError('生成失败,重试次数太多,强制结束!') |
| | | return self.generate_text(msg, cache_file, msgs, files, validation, try_cnt - 1) |
| | | save_to_file(text, cache_file) |
| | | return self.generate_text_json(msg, cache_file, msgs, doc_text, validation, try_cnt - 1) |
| | | if cache_file: |
| | | save_to_file(text, cache_file) |
| | | print(f'耗时:{time.time() - s}') |
| | | return text |
| | | |
| | | def generate_tc_text(self, msg, cache_file, messages=None, files=None, validation=None, try_cnt=5): |
| | | if messages is None: |
| | | messages = [] |
| | | doc_text = '' |
| | | for file in files: |
| | | doc_text += '\n' + read_from_file(file) |
| | | if len(messages) == 0: |
| | | # 如果是第一次提问加入system消息 |
| | | messages.append({'role': 'user', 'content': "以下是文档内容:\n" + doc_text}) |
| | | return self.generate_text(msg, cache_file, messages, files, validation, try_cnt) |
| | | def generate_text_json(self, msg, cache_file, msgs=None, doc_text="", validation=None, try_cnt=5): |
| | | return self.generate_text(msg, cache_file, msgs, doc_text, validation, try_cnt, True) |
| | | |
| | | def generate_tc_text(self, msg, cache_file, msgs=None, doc_text=None, validation=None, try_cnt=5): |
| | | msgs = [ |
| | | {'role': 'system', 'content': tc_system_msg}, |
| | | {'role': 'user', 'content': "以下是文档内容:\n" + doc_text}] |
| | | return self.generate_text(msg, cache_file, msgs, doc_text, validation, try_cnt, True) |
| | | |
| | | def gen_project(self): |
| | | # _msg = """ |
| | | # 根据文档输出型号信息,型号字段包括:名称和代号。仅输出型号这一级。 |
| | | # 例如:{"name":"xxx","id":"xxx"} |
| | | # """ |
| | | # print('型号信息:') |
| | | # text = self.generate_text(_msg, 'out/型号信息.json', files=[file_map['应用软件用户需求']]) |
| | | # proj_dict = json.loads(text) |
| | | # 工程信息从系统获取 |
| | | proj_dict = { |
| | | "id": "JB200001", |
| | | "name": "HY-4A" |
| | | } |
| | | _msg = """ |
| | | 根据文档内容输出卫星的型号信息,输出字段包括:卫星的型号名称和卫星的型号代号。注意:如果没有单独描述型号名称或者型号代号,那么型号名称和型号代号是相同的,并且只输出一个层级。如果型号代号中有符号也要输出,保证输出完整。 |
| | | 例如:{"name":"xxx","id":"xxx"} |
| | | """ |
| | | print('型号信息:') |
| | | doc_text = self.get_text_with_entity(['系统概述']) |
| | | text = self.generate_text_json(_msg, f'{self.json_path}/型号信息.json', doc_text=doc_text) |
| | | proj_dict = json.loads(text) |
| | | code = proj_dict['id'] |
| | | name = proj_dict['name'] |
| | | proj = create_project(code, name, code, name, "", datetime.now()) |
| | | return proj |
| | | |
| | | def gen_device(self, proj): |
| | | async def gen_device(self, proj): |
| | | """ |
| | | 设备列表生成规则: |
| | | 1.如文档中有1553协议描述,加入1553设备 |
| | |
| | | proj_pk = proj.C_PROJECT_PK |
| | | devices = [] |
| | | |
| | | _msg = f""" |
| | | 输出分系统下的硬件产品(设备)列表,字段包括:名称(name)、代号(code),硬件产品名称一般会包含“管理单元”或者“接口单元”,如果没有代号则使用名称的英文缩写代替缩写长度不超过5个字符; |
| | | 并且给每个硬件产品增加三个字段:第一个字段hasTcTm“是否包含遥控遥测”,判断该硬件产品是否包含遥控遥测的功能、 |
| | | 第二个字段hasTemperatureAnalog“是否包含温度量、模拟量等数据的采集”,判断该硬件产品是否包含温度量等信息的采集功能、 |
| | | 第三个字段hasBus“是否是总线硬件产品”,判断该设备是否属于总线硬件产品,是否有RT地址;每个字段的值都使用true或false来表示。 |
| | | 仅输出JSON,结构最外层为数组,数组元素为设备信息,不要输出JSON以外的任何字符。 |
| | | """ |
| | | _msg = """ |
| | | # 角色 |
| | | 你是一名资深软件工程师。 |
| | | # 指令 |
| | | 我需要从文档提取设备列表信息,你要帮助我完成设备列表信息提取。 |
| | | # 需求 |
| | | 输出分系统下的硬件产品(设备)列表,硬件产品名称一般会包含“管理单元”或者“接口单元”; |
| | | # 字段包括: |
| | | - 名称(name):设备名称; |
| | | - 代号(code):设备代号; |
| | | - 是否包含遥控遥测(hasTcTm):标识该硬件产品是否包含遥控遥测的功能,布尔值true或false; |
| | | - 是否包含温度量模拟量等数据的采集(hasTemperatureAnalog):标识该硬件产品是否包含温度量等信息的采集功能,布尔值true或false; |
| | | - 是否有总线硬件产品(hasBus):标识该设备是否属于总线硬件产品,是否有RT地址,布尔值true或false; |
| | | # 约束 |
| | | - 如果没有代号则使用名称的英文缩写代替缩写长度不超过5个字符; |
| | | - 数据结构最外层为数组,数组元素为设备信息 |
| | | - 仅输出JSON,不要输出JSON以外的任何字符。 |
| | | # 例子 |
| | | [ |
| | | { |
| | | "name": "系统管理单元", |
| | | "code": "SMU", |
| | | "hasTcTm": true, |
| | | "hasTemperatureAnalog": false, |
| | | "hasBus": true |
| | | }, |
| | | { |
| | | "name": "1553B总线", |
| | | "code": "1553", |
| | | "hasTcTm": true, |
| | | "hasTemperatureAnalog": true, |
| | | "hasBus": true |
| | | } |
| | | ] |
| | | """ |
| | | print('设备列表:') |
| | | cache_file = 'out/设备列表.json' |
| | | cache_file = f'{self.json_path}/设备列表.json' |
| | | |
| | | def validation(gen_text): |
| | | _devs = json.loads(gen_text) |
| | | assert isinstance(_devs, list), '数据结构最外层不是数组' |
| | | assert next(filter(lambda it: it['name'].endswith('管理单元'), _devs), None), '生成的设备列表中没有管理单元' |
| | | |
| | | text = self.generate_text(_msg, cache_file, files=[file_map['应用软件用户需求']], validation=validation) |
| | | doc_text = self.get_text_with_entity(['系统概述', '总线管理']) |
| | | text = self.generate_text_json(_msg, cache_file, doc_text=doc_text, validation=validation) |
| | | devs = json.loads(text) |
| | | |
| | | # 类SMU设备,包含遥测和遥控功能,名称结尾为“管理单元” |
| | | like_smu_devs = list(filter(lambda it: it['hasTcTm'] and it['name'].endswith('管理单元'), devs)) |
| | | tasks = [] |
| | | for dev in like_smu_devs: |
| | | dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK) |
| | | devices.append(dev) |
| | | # 创建数据流 |
| | | ds_tmfl, rule_stream, _ = create_data_stream(proj_pk, dev.C_DEV_PK, 'AOS遥测', 'TMF1', 'TMFL', '1', 'TMF1', |
| | | '001') |
| | | self.gen_tm_frame(proj_pk, rule_stream.C_RULE_PK, ds_tmfl, rule_stream.C_PATH) |
| | | task = self.gen_tm_frame(proj_pk, rule_stream.C_RULE_PK, ds_tmfl, rule_stream.C_PATH) |
| | | tasks.append(task) |
| | | # ds_tcfl, rule_stream, _ = create_data_stream(proj_pk, dev.C_DEV_PK, '遥控指令', 'TCFL', 'TCFL', '0', 'TCFL', |
| | | # '006') |
| | | |
| | | hasBus = any(d['hasBus'] for d in devs) |
| | | if hasBus: |
| | | has_bus = any(d['hasBus'] for d in devs) |
| | | if has_bus: |
| | | # 总线设备 |
| | | dev = create_device("1553", "1553总线", '1', 'StandardProCommunicationDev', proj_pk) |
| | | create_extend_info(proj_pk, "BusType", "总线类型", "ECSS_Standard", dev.C_DEV_PK) |
| | |
| | | ds_u153, rs_u153, rule_enc = create_data_stream(proj_pk, dev.C_DEV_PK, '上行总线数据', 'U15E', 'B153', |
| | | '0', '1553', '001') |
| | | # 创建总线结构 |
| | | self.gen_bus(proj_pk, rule_enc, '1553', ds_u153, rs_u153.C_PATH, dev.C_DEV_NAME) |
| | | task = self.gen_bus(proj_pk, rule_enc, '1553', ds_u153, rs_u153.C_PATH, dev.C_DEV_NAME) |
| | | tasks.append(task) |
| | | await asyncio.gather(*tasks) |
| | | ds_d153, rule_stream, rule_enc = create_data_stream(proj_pk, dev.C_DEV_PK, '下行总线数据', 'D15E', 'B153', |
| | | '1', '1553', '001', rs_u153.C_RULE_PK) |
| | | create_ref_ds_rule_stream(proj_pk, rule_stream.C_STREAM_PK, rule_stream.C_STREAM_ID, |
| | | rule_stream.C_STREAM_NAME, rule_stream.C_STREAM_DIR, rs_u153.C_STREAM_PK) |
| | | else: |
| | | await asyncio.gather(*tasks) |
| | | # 类RTU设备,包含温度量和模拟量功能,名称结尾为“接口单元” |
| | | # like_rtu_devs = list(filter(lambda it: it['hasTemperatureAnalog'] and it['name'].endswith('接口单元'), devs)) |
| | | # for dev in like_rtu_devs: |
| | |
| | | |
| | | def gen_insert_domain_params(self): |
| | | _msg = """ |
| | | 分析文档,输出插入域的参数列表,将所有参数全部输出,不要有遗漏。 |
| | | 数据结构最外层为数组,数组元素为参数信息对象,参数信息字段包括:name、id、pos、length、type。 |
| | | 1个字节的长度为8位,使用B0-B7来表示,请认真计算参数长度。 |
| | | 文档中位置描述信息可能存在跨字节的情况,,例如:"Byte1_B6~Byte2_B0":表示从第1个字节的第7位到第2个字节的第1位,长度是3;"Byte27_B7~Byte28_B0":表示从第27个字节的第8位到第28个字节的第1位,长度是2。 |
| | | # 指令 |
| | | 我需要从文档中提取插入域的参数列表,你要帮助我完成插入域参数列表的提取。 |
| | | # 需求 |
| | | 参数信息字段包括:name(参数名称)、id(参数代号)、pos(参数位置)、type(类型:para)。 |
| | | # 要求 |
| | | 1个字节的长度为8位,使用B0-B7来表示,请精确计算参数长度。 |
| | | 位置信息转换为通用格式"Byte1_B6~Byte2_B0"进行输出,如果缺少内容要进行补全,例如:"Byte1_B0~B2" 转换为 "Byte1_B0~Byte1_B2"。例如:"Byte1~Byte2" 转换为 "Byte1_B0~Byte2_B7"。例如:"Byte1_B5" 转换为 "Byte1_B5~Byte1_B5"。 |
| | | # 输出示例 |
| | | [ |
| | | { |
| | | "name": "遥测模式字", |
| | | "id": "TMS215", |
| | | "pos": Byte0_B0~Byte0_B7, |
| | | "type": "para" |
| | | } |
| | | ] |
| | | """ |
| | | print('插入域参数列表:') |
| | | files = [file_map['遥测大纲']] |
| | | |
| | | def validation(gen_text): |
| | | params = json.loads(gen_text) |
| | | assert isinstance(params, list), '插入域参数列表数据结构最外层必须是数组' |
| | | assert len(params), '插入域参数列表不能为空' |
| | | |
| | | text = self.generate_text(_msg, './out/插入域参数列表.json', files=files, validation=validation) |
| | | return json.loads(text) |
| | | doc_text = self.get_text_with_entity(['插入域']) |
| | | text = self.generate_text_json(_msg, f'{self.json_path}/插入域参数列表.json', doc_text=doc_text, validation=validation) |
| | | json_list = json.loads(text) |
| | | for j in json_list: |
| | | if j['pos'] is not None: |
| | | pos_data = self.handle_pos(j['pos']) |
| | | j['pos'] = pos_data['pos'] |
| | | j['length'] = pos_data['length'] |
| | | return json_list |
| | | |
| | | def gen_tm_frame_data(self): |
| | | _msg = """ |
| | | """ |
| | | files = [file_map['遥测大纲']] |
| | | async def get_pkt_details(self, _pkt, vc): |
| | | _pkt = await self.gen_pkt_details(_pkt['name'], _pkt['id']) |
| | | epdu = next(filter(lambda it: it['name'] == '数据域', vc['children']), None) |
| | | if epdu and _pkt: |
| | | _pkt['children'] = _pkt['datas'] |
| | | # todo 当数据包获取到东西但不是参数时,获取到的包结构有问题,需要过滤 |
| | | _pkt['length'] = 0 |
| | | _pkt['pos'] = 0 |
| | | if len(_pkt['children']) > 0: |
| | | _last_par = _pkt['children'][len(_pkt['children']) - 1] |
| | | _pkt['length'] = (_last_par['pos'] + _last_par['length']) |
| | | if 'children' not in epdu: |
| | | epdu['children'] = [] |
| | | # 添加解析规则后缀防止重复 |
| | | _pkt['id'] = _pkt['id'] + '_' + vc['VCID'] |
| | | # 给包名加代号前缀 |
| | | if not _pkt['name'].startswith(_pkt['id']): |
| | | _pkt['name'] = _pkt['id'] + '_' + _pkt['name'] |
| | | epdu['children'].append(_pkt) |
| | | apid_node = next(filter(lambda it: it['name'].__contains__('应用过程'), _pkt['headers']), None) |
| | | ser_node = next(filter(lambda it: it['name'] == '服务', _pkt['headers']), None) |
| | | sub_ser_node = next(filter(lambda it: it['name'] == '子服务', _pkt['headers']), None) |
| | | apid = '' |
| | | service = '' |
| | | sub_service = '' |
| | | if apid_node and apid_node['content']: |
| | | apid = apid_node['content'] |
| | | if ser_node and ser_node['content']: |
| | | service = f"{int(ser_node['content'], 16)}" |
| | | if sub_ser_node and sub_ser_node['content']: |
| | | sub_service = f"{int(sub_ser_node['content'], 16)}" |
| | | _pkt['vals'] = \ |
| | | f"{apid}/{service}/{sub_service}/" |
| | | |
| | | def validation(gen_text): |
| | | pass |
| | | |
| | | def gen_tm_frame(self, proj_pk, rule_pk, ds, name_path): |
| | | async def gen_tm_frame(self, proj_pk, rule_pk, ds, name_path): |
| | | # 插入域参数列表 |
| | | insert_domain = self.gen_insert_domain_params() |
| | | |
| | |
| | | 'insertDomain': insert_domain, |
| | | } |
| | | cadu = data_templates.get_tm_frame(tm_data) |
| | | |
| | | # VC源包 |
| | | self.vc_pkts = self.gen_pkt_vc() |
| | | # 遥测源包设计中的源包列表 |
| | | self.tm_pkts = self.gen_pkts() |
| | | self.vc_pkts = await self.gen_pkt_vc() # ,self.tm_pkts = self.gen_pkts() |
| | | |
| | | # 处理VC下面的遥测包数据 |
| | | tasks = [] |
| | | for vc in vcs: |
| | | # 此VC下的遥测包过滤 |
| | | _vc_pkts = filter(lambda it: it['vcs'].__contains__(vc['id']), self.vc_pkts) |
| | | _vc_pkts = list(filter(lambda it: vc['id'] in it['vcs'], self.vc_pkts)) |
| | | for _pkt in _vc_pkts: |
| | | # 判断遥测包是否有详细定义 |
| | | if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], self.tm_pkts), None): |
| | | continue |
| | | # if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], self.tm_pkts), None): |
| | | # continue |
| | | # 获取包详情 |
| | | _pkt = self.gen_pkt_details(_pkt['name'], _pkt['id']) |
| | | epdu = next(filter(lambda it: it['name'] == '数据域', vc['children']), None) |
| | | if epdu and _pkt: |
| | | _pkt['children'] = _pkt['datas'] |
| | | _last_par = _pkt['children'][len(_pkt['children']) - 1] |
| | | _pkt['length'] = (_last_par['pos'] + _last_par['length']) |
| | | _pkt['pos'] = 0 |
| | | if 'children' not in epdu: |
| | | epdu['children'] = [] |
| | | # 添加解析规则后缀防止重复 |
| | | _pkt['id'] = _pkt['id'] + '_' + vc['VCID'] |
| | | # 给包名加代号前缀 |
| | | if not _pkt['name'].startswith(_pkt['id']): |
| | | _pkt['name'] = _pkt['id'] + '_' + _pkt['name'] |
| | | epdu['children'].append(_pkt) |
| | | apid_node = next(filter(lambda it: it['name'].__contains__('应用过程'), _pkt['headers']), None) |
| | | ser_node = next(filter(lambda it: it['name'] == '服务', _pkt['headers']), None) |
| | | sub_ser_node = next(filter(lambda it: it['name'] == '子服务', _pkt['headers']), None) |
| | | _pkt['vals'] = \ |
| | | f"{apid_node['content']}/{int(ser_node['content'], 16)}/{int(sub_ser_node['content'], 16)}/" |
| | | ret = self.get_pkt_details(_pkt, vc) |
| | | tasks.append(ret) |
| | | if len(tasks): |
| | | await asyncio.gather(*tasks) |
| | | |
| | | # 重新计数起始偏移 |
| | | self.compute_length_pos(cadu['children']) |
| | |
| | | |
| | | def gen_vc(self): |
| | | _msg = """ |
| | | 请分析文档中的遥测包格式,输出遥测虚拟信道的划分,数据结构最外层为数组,数组元素为虚拟信道信息字典,字典包含以下键值对: |
| | | id: 虚拟信道代号 |
| | | name: 虚拟信道名称 |
| | | VCID: 虚拟信道VCID(二进制) |
| | | format: 根据虚拟信道类型获取对应的数据包的格式的名称 |
| | | #角色 |
| | | 你是一名资深的软件工程师。 |
| | | #指令 |
| | | 我需要从文档中提取虚拟信道列表,你要帮助我完成虚拟信道列表的提取。 |
| | | #需求 |
| | | 请分析文档中的遥测包格式以及遥测虚拟信道,输出遥测虚拟信道列表。 |
| | | 字段包括:id(虚拟信道代号)、name(虚拟信道名称)、VCID(虚拟信道VCID,二进制)、format(根据虚拟信道类型获取对应的数据包的格式的名称) |
| | | #上下文 |
| | | 深入理解文档中描述的关系,例如:文档中描述了常规遥测是常规数据的下传信道,并且还描述了分系统常规遥测参数包就是实时遥测参数包,并且文档中对实时遥测参数包的格式进行了描述,所以常规遥测VC应该输出为:{"id": "1", "name": "常规遥测VC", "VCID": "0", "format": "实时遥测参数包"} |
| | | #约束 |
| | | - 数据结构最外层为数组,数组元素为虚拟信道信息; |
| | | - format:必须是数据包格式的名称; |
| | | - 仅输出JSON文本。 |
| | | #例子: |
| | | [ |
| | | { |
| | | "id": "VC0", |
| | | "name": "空闲信道", |
| | | "VCID": "111111", |
| | | "format": "空闲包" |
| | | } |
| | | ] |
| | | """ |
| | | |
| | | def validation(gen_text): |
| | |
| | | assert next(filter(lambda it: re.match('^[0-1]+$', it['VCID']), vcs)), '生成的VCID必须是二进制' |
| | | |
| | | print('虚拟信道:') |
| | | text = self.generate_text(_msg, "out/虚拟信道.json", files=[file_map['遥测大纲']], validation=validation) |
| | | doc_text = self.get_text_with_entity(['虚拟信道定义']) |
| | | text = self.generate_text_json(_msg, f"{self.json_path}/虚拟信道.json", doc_text=doc_text, |
| | | validation=validation) |
| | | vcs = json.loads(text) |
| | | return vcs |
| | | |
| | | def gen_dev_pkts(self): |
| | | _msg = f""" |
| | | 输出文档中遥测源包类型定义描述的设备以及设备下面的遥测包,数据结构:最外层为数组 > 设备 > 遥测包列表(pkts),设备字段包括:名称(name)、代号(id),源包字段包括:名称(name)、代号(id) |
| | | """ |
| | | print('设备遥测源包信息:') |
| | | files = [file_map["遥测源包设计报告"]] |
| | | text = self.generate_text(_msg, 'out/设备数据包.json', [], files) |
| | | dev_pkts = json.loads(text) |
| | | return dev_pkts |
| | | async def gen_pkt_details(self, pkt_name, pkt_id): |
| | | cache_file = f"{self.json_path}/数据包-{utils.to_file_name(pkt_name)}.json" |
| | | |
| | | def pkt_in_tm_pkts(self, pkt_name): |
| | | cache_file = f'out/数据包-{pkt_name}.json' |
| | | if os.path.isfile(cache_file): |
| | | return True |
| | | files = [file_map['遥测源包设计报告']] |
| | | print(f'文档中有无“{pkt_name}”的字段描述:', end='') |
| | | _msg = f""" |
| | | 文档中有遥测包“{pkt_name}”的字段表描述吗?遥测包名称必须完全匹配。输出:“无”或“有”,不要输出其他任何内容。 |
| | | 注意:遥测包的字段表紧接着遥测包章节标题,如果章节标题后面省略了或者详见xxx则是没有字段表描述。 |
| | | 根据文档内容输出。""" |
| | | text = self.generate_text(_msg, f'out/pkts/有无数据包-{pkt_name}.txt', [], files) |
| | | return text == '有' |
| | | doc_text = self.get_text_with_entity([pkt_id]) |
| | | pkt = { |
| | | "name": pkt_name, |
| | | "id": pkt_id, |
| | | "type": "linear", |
| | | "headers": [], |
| | | "datas": [], |
| | | } |
| | | if doc_text == '': |
| | | return pkt |
| | | print(f'遥测源包“{pkt_name}”信息:') |
| | | |
| | | def gen_pkt_details(self, pkt_name, pkt_id): |
| | | cache_file = f'out/数据包-{pkt_name}.json' |
| | | files = [file_map['遥测源包设计报告']] |
| | | if not os.path.isfile(cache_file): |
| | | # 1. 获取包头和参数列表 |
| | | # 2. 遍历包头和参数列表,获取bit位置和长度,规范代号并生成,生成byteOrder |
| | | async def get_header_params(_pkt_name, _doc_text: str): |
| | | _msg = (""" |
| | | # 需求 |
| | | 提取文档中描述的遥测包包头信息。 |
| | | 包头信息包括:包版本号(Ver)、包类型(Type)、副导头标识(Subheader)、应用过程标识(apid)、序列标记(SequenceFlag)、包序列计数(SequenceCount)、包长(PacketLength)、服务(Service)、子服务(SubService)信息。 |
| | | 服务、子服务:一般在表格中的包头区域提取,如果表格中没有包头区域只有数据域则在标题中提取,例如:“在轨维护遥测包(APID=0x384) (3,255)”其中服务是3子服务是255; |
| | | 表格单元格合并说明:包格中存在单元格合并的情况,如果水平或垂直相邻的单元格内容一样那么这几个内容一样的单元格有可能是一个合并单元格在分析时应该当作合并单元格分析; |
| | | 输出json,不要有注释。 |
| | | # 输出例子 |
| | | ```json |
| | | { |
| | | "Ver": "000", |
| | | "Type": "0", |
| | | "Subheader": "1", |
| | | "apid": "0", |
| | | "SequenceFlag": "11", |
| | | "SequenceCount": "00000000000000", |
| | | "PacketLength": "1", |
| | | "Service": "03", |
| | | "SubService": "FF" |
| | | } |
| | | ```""") |
| | | # 截取前70行 |
| | | _doc_text = '\n'.join(_doc_text.splitlines()[0:100]) |
| | | tpl = os.path.dirname(__file__) + "/tpl/tm_pkt_headers_yg.json" |
| | | tpl_text = utils.read_from_file(tpl) |
| | | _cache_file = f"{self.json_path}/数据包-{utils.to_file_name(pkt_name)}-包头参数.json" |
| | | _text = await asyncio.to_thread(self.generate_text_json, _msg, _cache_file, [], _doc_text, None) |
| | | result = json.loads(_text) |
| | | if re.match(r'^(0x)?[01]{11}$', result['apid']): |
| | | result['apid'] = hex(int(re.sub('0x', '', result['apid']), 2)) |
| | | for k in result: |
| | | tpl_text = tpl_text.replace("{{" + k + "}}", result[k]) |
| | | return json.loads(tpl_text) |
| | | |
| | | async def get_data_area_params(_pkt_name, _doc_text: str): |
| | | _msg = (""" |
| | | # 指令 |
| | | 我需要从文档中提取遥测源包的参数信息列表,你要帮我完成遥测源包的参数信息的提取。 |
| | | # 需求 |
| | | 提取文档中描述的遥测包数据域中的所有参数,以及参数的位置、名称、代号信息,输出的信息要与文档中的文本要一致,不要遗漏任何参数。 |
| | | 如果文档中没有参数表则输出空数组。 |
| | | 严格按照输出示例中的格式输出,仅输出json。 |
| | | # 要求 |
| | | 1个字节的长度为8位,使用B0-B7来表示。 |
| | | 所有位置信息需要转换为要求格式"Byte1_B6~Byte2_B0"进行输出,如果与要求格式不同的要进行补全或转换,例如:"Byte1_B0~B2" 转换为 "Byte1_B0~Byte1_B2"。例如:"Byte1~Byte2" 转换为 "Byte1_B0~Byte2_B7"。例如:"Byte1_B5" 转换为 "Byte1_B5~Byte1_B5"。 |
| | | |
| | | # 输出示例 |
| | | ```json |
| | | [ |
| | | { |
| | | "posText": "Byte1_B6~Byte2_B0", |
| | | "name": "xxx", |
| | | "id": "xxxxxx" |
| | | } |
| | | ] |
| | | ``` |
| | | # 没有参数时的输出示例 |
| | | ```json |
| | | [] |
| | | ```""") |
| | | _cache_file = f"{self.json_path}/数据包-{utils.to_file_name(pkt_name)}-参数列表.json" |
| | | if utils.file_exists(_cache_file): |
| | | return json.loads(utils.read_from_file(_cache_file)) |
| | | title_line = _doc_text.splitlines()[0] |
| | | tables = re.findall(r"```json(.*)```", _doc_text, re.DOTALL) |
| | | if tables: |
| | | table_text = tables[0] |
| | | table_blocks = [] |
| | | if len(table_text)>50000: |
| | | table = json.loads(table_text) |
| | | header:list = table[0:20] |
| | | for i in range(math.ceil((len(table)-20)/200)): |
| | | body = table[20 + i * 200:20 + (i + 1) * 200] |
| | | block = [] |
| | | block.extend(header) |
| | | block.extend(body) |
| | | table_blocks.append(f"{title_line}\n```json\n{json.dumps(block,indent=2,ensure_ascii=False)}\n```") |
| | | else: |
| | | table_blocks.append(table_text) |
| | | param_list = [] |
| | | block_idx = 0 |
| | | for tb_block in table_blocks: |
| | | _block_cache_file = f"{self.json_path}/pkts/数据包-{utils.to_file_name(pkt_name)}-参数列表-{block_idx}.json" |
| | | block_idx += 1 |
| | | text = await asyncio.to_thread(self.generate_text_json, _msg, _block_cache_file, [], tb_block, None) |
| | | json_list = json.loads(text) |
| | | for par in json_list: |
| | | if not re.match('^Byte\d+_B[0-7]~Byte\d+_B[0-7]$', par['posText']): |
| | | par['posText'] = get_single_pos(par['posText']) |
| | | if not any(filter(lambda p: p['posText']==par['posText'], param_list)): |
| | | param_list.append(par) |
| | | for par in param_list: |
| | | if par['posText'] is not None: |
| | | par['id'] = re.sub('[^_a-zA-Z0-9]', '_', par['id']) |
| | | pos_data = self.handle_pos(par['posText']) |
| | | par['pos'] = pos_data['pos'] |
| | | par['length'] = pos_data['length'] |
| | | save_to_file(json.dumps(param_list, ensure_ascii=False, indent=2), _cache_file) |
| | | return param_list |
| | | else: |
| | | return [] |
| | | # 单独处理未正确获取的位置信息 |
| | | def get_single_pos(txt): |
| | | _msg = f""" |
| | | 输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包; |
| | | 遥测包字段包括:名称(name)、代号(id)、类型(type)、包头属性列表(headers)、数据域参数列表(datas),类型为 linear; |
| | | 包头属性字段包括:名称(name)、代号(id)、位置(pos)、定义(content)、长度(length)、类型(type),类型为 para; |
| | | 数据域参数字段包括:参数名称(name)、参数代号(id)、位置(pos)、长度(length)、字节顺序(byteOrder),类型为 para; |
| | | 如果没有名称用代号代替,如果没有代号用名称的英文翻译代替,翻译尽量简短; |
| | | 你需要理解数据包的位置信息,并且将所有输出单位统一转换为 bits,位置字段的输出格式必须为数值类型; |
| | | 数据结构仅只包含遥测包,仅输出json,不要输出任何其他内容。""" |
| | | print(f'遥测源包“{pkt_name}”信息:') |
| | | 1个字节的长度为8位,使用B0-B7来表示。 |
| | | 将“{txt}”转换为要求格式"Byte1_B6~Byte2_B0"进行输出,如果与要求格式不同的要进行补全或转换,例如:"Byte1_B0~B2" 转换为 "Byte1_B0~Byte1_B2"。例如:"Byte1~Byte2" 转换为 "Byte1_B0~Byte2_B7"。例如:"Byte1_B5" 转换为 "Byte1_B5~Byte1_B5"。 |
| | | 输出示例:Byte1_B6~Byte2_B0 |
| | | 仅输出结果,不输出其他文字 |
| | | """ |
| | | |
| | | def validation(gen_text): |
| | | _pkt = json.loads(gen_text) |
| | | assert 'headers' in _pkt, '包结构中必须包含headers字段' |
| | | assert 'datas' in _pkt, '包结构中必须包含datas字段' |
| | | def validation(return_txt): |
| | | assert re.match('^Byte\d+_B[0-7]~Byte\d+_B[0-7]$', return_txt), '格式不正确' |
| | | |
| | | text = self.generate_text(_msg, cache_file, [], files, validation) |
| | | pkt = json.loads(text) |
| | | else: |
| | | pkt = json.loads(read_from_file(cache_file)) |
| | | pkt_len = 0 |
| | | for par in pkt['datas']: |
| | | par['pos'] = pkt_len |
| | | pkt_len += par['length'] |
| | | pkt['length'] = pkt_len |
| | | text = self.generate_text_json(_msg, "", doc_text="", validation=validation) |
| | | return text |
| | | |
| | | params = [] |
| | | header_params, data_area_params = ( |
| | | await asyncio.gather(get_header_params(pkt_name, doc_text), |
| | | get_data_area_params(pkt_name, doc_text))) |
| | | |
| | | params.extend(data_area_params) |
| | | |
| | | pkt['headers'] = header_params |
| | | pkt['datas'] = data_area_params |
| | | |
| | | # async def get_param_info(para): |
| | | # _msg2 = """ |
| | | # # 需求 |
| | | # 从文本中提取区间起始偏移位置和区间长度,单位为比特。文本中的内容为区间描述,其中:Byte<N> 表示第 N 个字节,N 从 1 开始,B<X> 表示第 X 个比特,X 从 0 - 7 ,区间为闭区间。 |
| | | # 所有数学计算是需要算数表达式,不需要计算结果。计算公式如下: |
| | | # - ByteN_BX起始和结束位置:(N - 1)*8 + X |
| | | # - ByteN 起始位置:(N - 1)*8 |
| | | # - ByteN 结束位置:(N - 1) * 8 + 7 |
| | | # - 长度:结束偏移位置 + 1 - 起始偏移位置,闭区间的长度需要结束位置加1再减去起始位置 |
| | | # # 生成模板 |
| | | # 推理过程:简要说明提取信息及调用 tool 计算的过程。输出结果:按 JSON 格式输出,格式如下: |
| | | # { |
| | | # "offset": "起始偏移位置表达式", |
| | | # "length": "长度计算表达式" |
| | | # } |
| | | # 文本: |
| | | # """+f""" |
| | | # {para['posText']} |
| | | # """ |
| | | # text2 = await asyncio.to_thread(self.generate_text_json, _msg2, '', [], '') |
| | | # try: |
| | | # out = json.loads(text2) |
| | | # para['pos'] = eval(out['offset']) |
| | | # para['posRet'] = text2 |
| | | # para['length'] = eval(out['length']) |
| | | # para['id'] = re.sub(r"[^0-9a-zA-Z_]", "_", para['code']) |
| | | # para['type'] = 'para' |
| | | # except Exception as e: |
| | | # print(e) |
| | | # tasks = [] |
| | | # for param in params: |
| | | # tasks.append(get_param_info(param)) |
| | | # |
| | | # s = time.time() |
| | | # await asyncio.gather(*tasks) |
| | | # e = time.time() |
| | | if params: |
| | | offset = params[0]['pos'] |
| | | for para in params: |
| | | para['pos'] -= offset |
| | | # print(f'======参数数量:{len(params)},耗时:{e - s}') |
| | | utils.save_text_to_file(json.dumps(pkt, ensure_ascii=False, indent=4), cache_file) |
| | | return pkt |
| | | |
| | | def gen_pkts(self): |
| | | _msg = f""" |
| | | 输出文档中描述的遥测包。 |
| | | 遥测包字段包括:名称(name)、代号(id)、hasParams, |
| | | 名称中不要包含代号, |
| | | hasParams表示当前遥测包是否有参数列表,遥测包的参数表紧接着遥测包章节标题,如果章节标题后面省略了或者详见xxx则是没有参数表, |
| | | 如果没有代号用名称的英文翻译代替,如果没有名称用代号代替, |
| | | 数据结构最外层为数组数组元素为遥测包,不包括遥测包下面的参数。 |
| | | async def gen_pkts(self): |
| | | _msg = """ |
| | | # 角色 |
| | | 你是一名资深软件工程师。 |
| | | # 指令 |
| | | 我需要从文档中提取遥测包数据,你要根据文档内容帮我完成遥测包数据的提取。 |
| | | # 需求 |
| | | 输出文档中描述的遥测包列表,遥测包字段包括:名称(name)、代号(id)。 |
| | | 字段描述: |
| | | 1.名称:遥测包的名称; |
| | | 2.代号:遥测包的代号; |
| | | # 约束 |
| | | - name:名称中不要包含代号,仅从文档中提取源包名称; |
| | | - 如果没有代号,使用遥测包名称的英文翻译代替; |
| | | - 如果没有名称用代号代替; |
| | | - 注意,一定要输出所有的遥测包,不要漏掉任何一个遥测包; |
| | | - 数据结构最外层为数组数组元素为遥测包,不包括遥测包下面的参数。 |
| | | # 例子 |
| | | [ |
| | | { |
| | | "name": "数管数字量快速源包", |
| | | "id": "PMS001", |
| | | } |
| | | ] |
| | | """ |
| | | print(f'遥测源包列表:') |
| | | files = [file_map['遥测源包设计报告']] |
| | | text = self.generate_text(_msg, 'out/源包列表.json', [], files) |
| | | doc_text = self.get_text_with_entity(['源包列表']) |
| | | text = await asyncio.to_thread(self.generate_text_json, _msg, f'{self.json_path}/源包列表.json', doc_text=doc_text) |
| | | pkt = json.loads(text) |
| | | return pkt |
| | | |
| | | def gen_pkt_vc(self): |
| | | _msg = f""" |
| | | 根据遥测源包下传时机定义,输出各个遥测源包信息列表,顶级结构为数组元素为遥测源包,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs),下传时机(timeTags) |
| | | async def gen_pkt_vc(self): |
| | | _msg = """ |
| | | # 需求 |
| | | 根据文档内容输出遥测源包信息,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs)。 |
| | | 所有字段仅使用文档内容输出。 |
| | | 表格中遥测源包不是按名称来排序的,按照文档中的表格中的遥测源包顺序进行输出。 |
| | | 每个包都要输出。 |
| | | 所属虚拟信道:通过表格中描述的下传时机和虚拟信道的划分,获取下传时机对应的虚拟信道代号(序号),并组织为一个数据进行输出,例如:下传时机为实时和延时,那么就表示该包的所属虚拟信道为VC1和VC3。如果没有匹配下传时机,就填入空数组。 |
| | | # 输出示例: |
| | | [ |
| | | { |
| | | "id": "PMS001", |
| | | "name": "数管数字量快速源包", |
| | | "vcs": ["VC1",'VC2'] |
| | | }, |
| | | ] |
| | | """ |
| | | files = [file_map['遥测大纲']] |
| | | print('遥测源包所属虚拟信道:') |
| | | |
| | | def validation(gen_text): |
| | | pkts = json.loads(gen_text) |
| | | assert len(pkts), 'VC源包列表不能为空' |
| | | |
| | | text = self.generate_text(_msg, 'out/遥测VC源包.json', files=files, validation=validation) |
| | | doc_text = self.get_text_with_entity(['虚拟信道定义', '遥测源包下传时机']) |
| | | text = await asyncio.to_thread( |
| | | lambda: self.generate_text_json(_msg, f'{self.json_path}/遥测VC源包.json', doc_text=doc_text, validation=validation)) |
| | | pkt_vcs = json.loads(text) |
| | | return pkt_vcs |
| | | |
| | | def gen_pkt_format(self): |
| | | _msg = f""" |
| | | 请仔细分系文档,输出各个数据包的格式,数据结构最外层为数组,数组元素为数据包格式,将主导头的子级提升到主导头这一级并且去除主导头,数据包type为logic,包数据域type为any。 |
| | | 包格式children包括:版本号(id:Ver)、类型(id:TM_Type)、副导头标志(id:Vice_Head)、应用过程标识符(id:Proc_Sign)、分组标志(id:Group_Sign)、包序列计数(id:Package_Count)、包长(id:Pack_Len)、数据域(id:EPDU_DATA)。 |
| | | children元素的字段包括:name、id、pos、length、type |
| | | 注意:生成的JSON语法格式要合法。 |
| | | _msg = """ |
| | | # 角色 |
| | | 你是一名资深软件工程师。 |
| | | # 指令 |
| | | 我需要从文档中提取数据包的格式,你要帮助我完成数据包格式的提取。 |
| | | # 需求 |
| | | 请仔细分系文档,输出各个数据包的格式。 |
| | | 数据结构最外层为数组,数组元素为数据包格式,将主导头的子级提升到主导头这一级并且去除主导头,数据包type为logic,包数据域type为any。 |
| | | 包格式字段包括:名称(name)、代号(id)、类型(type)、子级(children)。 |
| | | children元素的字段包括:name、id、pos、length、type。 |
| | | children元素包括:版本号(Ver)、类型(TM_Type)、副导头标志(Vice_Head)、应用过程标识符(Proc_Sign)、分组标志(Group_Sign)、包序列计数(Package_Count)、包长(Pack_Len)、数据域(EPDU_DATA)。 |
| | | # 约束 |
| | | - 生成的JSON语法格式要合法。 |
| | | # 例子 |
| | | { |
| | | "name": "实时遥测参数包", |
| | | "id": "EPDU", |
| | | "type": "logic", |
| | | "children": [ |
| | | { |
| | | "name": "版本号", |
| | | "id": "Ver", |
| | | "pos": 0, |
| | | "length": 3, |
| | | "type": "para", |
| | | "content": "0", |
| | | "dataTy": "INVAR" |
| | | }, |
| | | { |
| | | "name": "数据域", |
| | | "id": "EPDU_DATA", |
| | | "pos": 3, |
| | | "length": "变长", |
| | | "type": "any" |
| | | } |
| | | ] |
| | | } |
| | | """ |
| | | print('遥测包格式:') |
| | | text = self.generate_text(_msg, 'out/数据包格式.json', files=[file_map['遥测大纲']]) |
| | | text = self.generate_text_json(_msg, f'{self.json_path}/数据包格式.json', files=[file_map['遥测大纲']]) |
| | | pkt_formats = json.loads(text) |
| | | return pkt_formats |
| | | |
| | | def compute_length_pos(self, items: list): |
| | | length = 0 |
| | | pos = 0 |
| | | for child in items: |
| | | if 'children' in child: |
| | | self.compute_length_pos(child['children']) |
| | | child['pos'] = pos |
| | | if 'length' in child and isinstance(child['length'], int): |
| | | length = length + child['length'] |
| | | pos = pos + child['length'] |
| | | items.sort(key=lambda x: x['pos']) |
| | | # for child in items: |
| | | # if 'children' in child: |
| | | # self.compute_length_pos(child['children']) |
| | | # if 'length' in child and isinstance(child['length'], int): |
| | | # length = length + child['length'] |
| | | # pos = pos + child['length'] |
| | | # node['length'] = length |
| | | |
| | | def gen_bus(self, proj_pk, rule_enc, rule_id, ds, name_path, dev_name): |
| | | _msg = f""" |
| | | 请析文档,列出总线通信包传输约定中描述的所有数据包列表, |
| | | 数据包字段包括:id、name、apid(16进制字符串)、service(服务子服务)、length(bit长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、 |
| | | transSer(传输服务)、note(备注)、rtAddr(所属RT的地址十进制)、rt(所属rt名称)、throughBus(是否经过总线)、burst(是否突发)、transDirect(传输方向), |
| | | 数据结构最外层是数组,数组元素为数据包,以JSON格式输出,不要输出JSON以外的任何文本。 |
| | | 通信帧号:使用文档中的文本不要做任何转换。 |
| | | subAddr:值为“深度”、“平铺”、“数字”或null。 |
| | | 是否经过总线的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线。 |
| | | 传输服务分三种:SetData(置数)、GetData(取数)、DataBlock(数据块传输)。 |
| | | 传输方向分:”收“和”发“,传输服务如果是”取数“是”收“,如果是”数据块传输“则根据包所在的分系统以及表格的”传输方向“列进行判断,判断对于SMU来说是收还是发。 |
| | | 是否突发的判断依据:根据表格中的”传输周期“列进行判断,如果填写了类似”突发“的文字表示是突发否则表示不是突发。 |
| | | async def gen_bus(self, proj_pk, rule_enc, rule_id, ds, name_path, dev_name): |
| | | _msg = """ |
| | | # 角色 |
| | | 你是一名资深的软件工程师。 |
| | | # 需求 |
| | | 请分析文档中的表格,按表格顺序输出表格中的所有源包信息; |
| | | 数据包字段包括:id(数据包代号)、name(数据包名称)、apid(16进制字符串)、service(服务子服务)、length(长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、 |
| | | transSer(传输服务)、note(备注)、throughBus(是否经过总线)、transDirect(传输方向)。 |
| | | 文档中如果没有数据包表则输出:[]。 |
| | | |
| | | # 数据包字段说明 |
| | | - frameNum(通信帧号):文档中通信帧号列的内容; |
| | | - subAddr(子地址/模式):值只能是:“深度”、“平铺”、数字或null,如果是“/”则是null; |
| | | - throughBus(是否经过总线)的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线; |
| | | - transSer(传输服务分三种):置数(SetData)、取数(GetData)、数据块传输(DataBlock),根据表格中的“传输服务”列进行判断; |
| | | |
| | | # 约束 |
| | | - 仅输出json。 |
| | | - 按照表格中的顺序进行输出。 |
| | | - 不要漏包。 |
| | | # 例子 |
| | | [ |
| | | { |
| | | "id": "P001", |
| | | "name": "xxx", |
| | | "apid": "418", |
| | | "service": "(1, 2)", |
| | | "length": 1, |
| | | "interval": 1000, |
| | | "subAddr": null, |
| | | "frameNum": "1|2", |
| | | "transSer": "DataBlock", |
| | | "note": "", |
| | | "throughBus": true, |
| | | "burst": true, |
| | | "transDirect": "" |
| | | } |
| | | ] |
| | | """ |
| | | print('总线数据包:') |
| | | |
| | | def validation(gen_text): |
| | | json.loads(gen_text) |
| | | pkts2 = json.loads(gen_text) |
| | | assert not next(filter(lambda pkt2: 'transSer' not in pkt2, pkts2), None), '总线包属性生成不完整,缺少transSer。' |
| | | |
| | | text = self.generate_text(_msg, 'out/总线.json', files=[file_map['总线传输通信帧分配']], validation=validation) |
| | | pkts = json.loads(text) |
| | | rt_doc_text = self.get_text_with_entity(['RT地址分配']) |
| | | subsys_pkt_texts = self.get_text_list_with_entity(['分系统源包']) |
| | | tasks = [] |
| | | rt_adds = [] |
| | | for subsys_pkt_text in subsys_pkt_texts: |
| | | doc_text = f'{rt_doc_text}\n{subsys_pkt_text}' |
| | | subsys = subsys_pkt_text[:subsys_pkt_text.index("\n")] |
| | | # 单独获取RT地址,并应用到章节下所有包 |
| | | get_rt_msg = f"""返回{subsys}的RT地址,仅输出十进制的结果,不要输出其他内容,如果是系统管理单元(SMU)则返回0。""" |
| | | rt_info = self.generate_text_json(get_rt_msg, "", doc_text=rt_doc_text) |
| | | if rt_info == '0': |
| | | continue |
| | | rt_adds.append({ |
| | | "rt": subsys, |
| | | "rt_addr": rt_info |
| | | }) |
| | | # md5 = utils.generate_text_md5(subsys_pkt_text) |
| | | task = asyncio.to_thread(self.generate_text_json, _msg, |
| | | f"{self.json_path}/总线-{utils.to_file_name(subsys)}.json", doc_text=doc_text, |
| | | validation=validation) |
| | | tasks.append(task) |
| | | results = await asyncio.gather(*tasks) |
| | | pkts = [] |
| | | # 判断是否存在总线数据包.json |
| | | if os.path.isfile(f"{self.json_path}/总线数据包列表.json"): |
| | | pkts = read_from_file(f"{self.json_path}/总线数据包列表.json") |
| | | pkts = json.loads(pkts) |
| | | else: |
| | | pktid_apid_map = {} |
| | | for index, result in enumerate(results): |
| | | pkts_diretions = [] |
| | | # 全角空格去除 |
| | | result = re.sub(r' ', '', result) |
| | | _pkts = json.loads(result) |
| | | rt_name = rt_adds[index]["rt"] |
| | | for _pkt in _pkts: |
| | | # 应用RT地址 |
| | | _pkt['rt'] = rt_name |
| | | _pkt['rtAddr'] = rt_adds[index]["rt_addr"] |
| | | _pkt['burst'] = "突发" in f"{_pkt['interval']}" |
| | | if _pkt['apid'] is None or not re.match(r'[0-9A-Fa-f]+', _pkt['apid']): |
| | | _pkt['apid'] = '' |
| | | if _pkt['id'] in pktid_apid_map: |
| | | if _pkt['apid']: |
| | | pktid_apid_map[_pkt['id']] = _pkt['apid'] |
| | | else: |
| | | _pkt['apid'] = pktid_apid_map[_pkt['id']] |
| | | else: |
| | | pktid_apid_map[_pkt['id']] = _pkt['apid'] |
| | | # 转为bit长度 |
| | | if _pkt['length']: |
| | | if isinstance(_pkt['length'], str) and re.match(r'^\d+$', _pkt['length']): |
| | | _pkt['length'] = int(_pkt['len']) * 8 |
| | | elif isinstance(_pkt['length'], int): |
| | | _pkt['length'] = _pkt['length'] * 8 |
| | | pkts.append(_pkt) |
| | | # 获取待处理的传输方向信息 |
| | | pkts_diretions.append({ |
| | | 'id': _pkt['id'], |
| | | 'rt': _pkt['rt'], |
| | | 'transDirect': _pkt['transDirect'], |
| | | }) |
| | | # 处理传输方向 |
| | | _msg = """ |
| | | 处理传入的json数组,每个数组对象中包含字段:rt(自身设备)、transDirect(传输方向)。 |
| | | 需要你给数组对象中多加一个字段,输出数组中单个对象的传输类型(transType),传输类型有两种值“收”和“发”,判断依据是根据传输方向的内容进行判断,由rt发送给SMU的传输类型是“收”,由SMU发送给rt的传输类型是“发” |
| | | rt字段为空的数据不用处理。 |
| | | 在transDirect字段中rt可能为缩写,缩写对应的rt名称可以从文档中进行读取。 |
| | | 输出结果将增加了字段的json数组直接输出,不用输出其他内容。 |
| | | 输出示例:[{"id": "PMK013", "rt": "中心控制单元CCU", "transDirect": "CCU→SMU→地面", "transType": "收"},{"id": "PMK055", "rt": "中心控制单元CCU", "transDirect": "SMU→CUU", "transType": "发"}] |
| | | """ + f""" |
| | | JSON:{pkts_diretions} |
| | | """ |
| | | result_json = self.generate_text_json(_msg, f"{self.json_path}/总线-rt-{utils.to_file_name(rt_name)}.json", doc_text=rt_doc_text) |
| | | # 将处理结果同步修改到result |
| | | for pkt in _pkts: |
| | | for data in json.loads(result_json): |
| | | if "transType" in data: |
| | | if data['id'] == pkt['id']: |
| | | pkt['transType'] = data['transType'] |
| | | break |
| | | print(f"总线源包个数:{len(pkts)}") |
| | | # 筛选经总线的数据包 |
| | | pkts = list(filter(lambda it: it['throughBus'], pkts)) |
| | | no_apid_pkts = list(filter(lambda it: not it['apid'], pkts)) |
| | | # 筛选有apid的数据包 |
| | | pkts = list(filter(lambda it: it['apid'], pkts)) |
| | | # 筛选rtAddr不为0的数据包,SMU的 |
| | | pkts = list(filter(lambda it: it['rtAddr'] != '0', pkts)) |
| | | |
| | | pkts2 = [] |
| | | # 储存所有总线包 |
| | | save_to_file(json.dumps(pkts, ensure_ascii=False, indent=2), f"{self.json_path}/总线数据包列表.json") |
| | | |
| | | tasks = [] |
| | | |
| | | def _run(gen_pkt_details, pkt2): |
| | | _pkt2 = asyncio.run(gen_pkt_details(pkt2['name'], pkt2['id'])) |
| | | if _pkt2 is not None: |
| | | pkt2['children'] = [] |
| | | pkt2['children'].extend(_pkt2['datas']) |
| | | |
| | | for pkt in pkts: |
| | | if self.pkt_in_tm_pkts(pkt["name"]): |
| | | pkts2.append(pkt) |
| | | for pkt in pkts2: |
| | | _pkt = self.gen_pkt_details(pkt['name'], pkt['id']) |
| | | if _pkt: |
| | | pkt['children'] = [] |
| | | pkt['children'].extend(_pkt['datas']) |
| | | pkt['length'] = _pkt['length'] |
| | | pkt_task = asyncio.to_thread(_run, self.gen_pkt_details, pkt) |
| | | tasks.append(pkt_task) |
| | | |
| | | await asyncio.gather(*tasks) |
| | | |
| | | rt_pkt_map = {} |
| | | for pkt in pkts: |
| | | # 根据数据块传输和取数分组 |
| | | # 逻辑封装包的解析规则ID:RT[rt地址]SUB[子地址]S(S代表取数,方向是AA表示发送;R代表置数,方向是BB表示接受) |
| | | # 取数:逻辑封装包根据子地址和帧号组合创建,有几个组合就创建几个逻辑封装包 |
| | | # 数据块:只有一个逻辑封装包 |
| | | |
| | | if pkt['subAddr'] is not None and not isinstance(pkt['subAddr'], int) and pkt['subAddr'].find("/") > -1: |
| | | pkt['subAddr'] = pkt['subAddr'].split("/")[0] |
| | | # 处理子地址 |
| | | if pkt['burst']: |
| | | # 突发包子地址是18~26 |
| | | pkt['subAddr'] = 26 |
| | | elif pkt['subAddr'] == '平铺' or pkt['subAddr'] is None: |
| | | if pkt['subAddr'] == '平铺' or not pkt['subAddr']: |
| | | # 平铺:11~26,没有填写的默认为平铺 |
| | | pkt['subAddr'] = 26 |
| | | pkt['subAddr'] = '11~26' |
| | | elif pkt['subAddr'] == '深度': |
| | | # 深度:11 |
| | | pkt['subAddr'] = 11 |
| | | pkt['subAddr'] = '11' |
| | | |
| | | pkt['burst'] = "突发" in f"{pkt['interval']}" |
| | | # 处理帧号 |
| | | if pkt['burst']: |
| | | # 突发:ALL |
| | |
| | | # 取数 |
| | | pkt_id = f"RT{rt_addr}SUB{sub_addr}" |
| | | vals = f"{rt_addr}/{sub_addr}/0xAA/{frame_no}/" |
| | | rt_pkt_map_gen(pkt, '取数', rt_pkt_map, pkt_id, vals) |
| | | rt_pkt_map_gen(pkt, '取数', rt_pkt_map, pkt_id, vals, pkts) |
| | | elif trans_ser == 'DataBlock': |
| | | # 数据块 |
| | | direct = '0xAA' |
| | | rt_pkt_map_gen(pkt, '数据块传输', rt_pkt_map, f"RT{rt_addr}SUB{sub_addr}{direct}", |
| | | f"{rt_addr}/{sub_addr}/{direct}/ALL/") |
| | | if pkt['transDirect'] == '发': |
| | | direct = '0xBB' |
| | | pkt_id = f"RT{rt_addr}SUB{sub_addr}{direct}" |
| | | vals = f"{rt_addr}/{sub_addr}/{direct}/ALL/" |
| | | rt_pkt_map_gen(pkt, '数据块传输', rt_pkt_map, pkt_id, vals, pkts) |
| | | _pkts = [] |
| | | for k in rt_pkt_map: |
| | | _pkts.append(rt_pkt_map[k]) |
| | |
| | | rule_enc.C_KEY = sub_key |
| | | update_rule_enc(rule_enc) |
| | | |
| | | def gen_tc(self): |
| | | async def gen_tc(self): |
| | | # 数据帧格式 |
| | | frame = self.gen_tc_transfer_frame() |
| | | # 数据包格式 |
| | | pkt_format = self.gen_tc_transfer_pkt() |
| | | # 数据包列表 |
| | | pkts = self.gen_tc_transfer_pkts() |
| | | for pkt in pkts: |
| | | pf = json.loads(json.dumps(pkt_format)) |
| | | pf['name'] = pkt['name'] |
| | | ph = next(filter(lambda x: x['name'] == '主导头', pf['children']), None) |
| | | apid = next(filter(lambda x: x['name'] == '应用进程标识符(APID)', ph['children']), None) |
| | | apid['value'] = pkt['apid'] |
| | | apid['type'] = 'const' |
| | | sh = next(filter(lambda x: x['name'] == '副导头', pf['children']), None) |
| | | ser = next(filter(lambda x: x['name'] == '服务类型', sh['children']), None) |
| | | sub_ser = next(filter(lambda x: x['name'] == '服务子类型', sh['children']), None) |
| | | ser['value'] = pkt['server'] |
| | | ser['type'] = 'const' |
| | | sub_ser['value'] = pkt['subServer'] |
| | | sub_ser['type'] = 'const' |
| | | frame_task = self.gen_tc_transfer_frame_format() |
| | | # 遥控包格式 |
| | | pkt_format_task = self.gen_tc_pkt_format() |
| | | # 遥控包列表 |
| | | instructions_task = self.gen_tc_transfer_pkts() |
| | | result = await asyncio.gather(frame_task, pkt_format_task, instructions_task) |
| | | frame = result[0] |
| | | pkt_format = result[1] |
| | | instructions = result[2] |
| | | |
| | | tasks = [] |
| | | for inst in instructions: |
| | | # 遥控指令数据区内容 |
| | | tasks.append(self.gen_tc_details(inst)) |
| | | |
| | | await asyncio.gather(*tasks) |
| | | |
| | | for inst in instructions: |
| | | inst['type'] = 'insUnit' |
| | | format_text = json.dumps(pkt_format, ensure_ascii=False) |
| | | format_text = utils.replace_tpl_paras(format_text, inst) |
| | | pf = json.loads(format_text) |
| | | pf['name'] = inst['name'] |
| | | pf['code'] = inst['code'] |
| | | data_area = next(filter(lambda x: x['name'] == '应用数据区', pf['children'])) |
| | | data_area['children'].append(inst) |
| | | frame['subPkts'].append(pf) |
| | | self.order = 0 |
| | | |
| | | def build_def(item: dict): |
| | | if item['type'] == 'enum': |
| | | return json.dumps({"EnumItems": item['enums'], "CanInput": True}) |
| | | if item['type'] in ['enum', 'sendFlag']: |
| | | if isinstance(item['enums'], str): |
| | | enums = json.loads(item['enums']) |
| | | else: |
| | | enums = item['enums'] |
| | | return json.dumps({"EnumItems": enums, "CanInput": True}, ensure_ascii=False) |
| | | elif item['type'] == 'length': |
| | | return None |
| | | elif item['type'] == 'checkSum': |
| | | return json.dumps({"ChecksumType": "CRC-CCITT"}) |
| | | return json.dumps({"ChecksumType": item['value']['type']}) |
| | | elif item['type'] == 'subPkt': |
| | | return json.dumps({"CanInput": False}) |
| | | elif item['type'] == 'combPkt': |
| | | elif item['type'] in ['combPkt', 'insUnitList', 'input']: |
| | | return None |
| | | elif item['type'] == 'insUnit': |
| | | return '{"MinLength":null,"MaxLength":null,"IsSubPackage":false,"InputParams":[],"OutPutParams":[],"MatchItems":[]}' |
| | | elif item['type'] == 'pkt': |
| | | return '''{"MaxLength":1024,"IsSplit8":false,"Split8Start":null,"Split8End":null,"PadCode":null,"Alignment":null,"InputParams":[],"OutPutParams":[],"MatchItems":[]}''' |
| | | elif item['type'] == 'pktSeqCnt': |
| | | return json.dumps( |
| | | {"FirstPackValue": "PackCount", "MiddlePackValue": "PackIndex", "LastPackValue": "PackIndex", |
| | | "IndependPackValue": "InsUnitCount"}) |
| | | elif 'value' in item: |
| | | return item['value'] |
| | | |
| | | def create_tc_format(parent_pk, field): |
| | | def create_tc_format(parent_pk, field, parent_parent_pk=None): |
| | | """ |
| | | 创建遥控格式 |
| | | |
| | | 数据库数据结构: |
| | | 帧字段 parent_pk=null, pk=pk_001, type=1 |
| | | 匿名字段(子包) parent_pk=pk_001, pk=pk_002, type=22 |
| | | 字段1 parent_pk=pk_002, pk=pk_003, type=15 |
| | | 字段2 parent_pk=pk_002, pk=pk_004, type=15 |
| | | 包字段 parent_pk=pk_001, pk=pk_005, type=1 |
| | | 匿名字段(子包) parent_pk=pk_005, pk=pk_006, type=22 |
| | | 字段3 parent_pk=pk_006, pk=pk_007, type=15 |
| | | 指令单元 parent_pk=pk_005, pk=pk_007, type=4 |
| | | 字段4 parent_pk=pk_007, pk=pk_008, type=15 |
| | | |
| | | :param parent_pk: 父级pk |
| | | :param field: 格式字段 |
| | | :param parent_parent_pk: 父级的父级pk |
| | | :return: |
| | | """ |
| | | field['order'] = self.order |
| | | self.order += 1 |
| | | field['def'] = build_def(field) |
| | | if 'length' in field: |
| | | field['bitWidth'] = field['length'] |
| | | field['bitOrder'] = None |
| | | field['attr'] = 0 |
| | | if field['type'] == 'length': |
| | | field['attr'] = make_attr(field) |
| | | if field['type'] == 'length' and 'value' in field and field['value']: |
| | | val = field['value'] |
| | | field['range'] = val['start'] + "~" + val['end'] |
| | | field['formula'] = val['formula'] |
| | | # 即时输入长度为null则是变长字段,需要把类型改为variableLength |
| | | if field['type'] == 'input' and field['length'] is None: |
| | | field['type'] = 'variableLength' |
| | | if isinstance(field['value'], dict): |
| | | field['range'] = f'{field["value"]["minLength"]}~{field["value"]["maxLength"]}' |
| | | # 枚举值默认值设置 |
| | | if field['type'] == 'enum' and len(field['enums']) and not next( |
| | | filter(lambda x: 'default' in x and x['default'], field['enums']), None): |
| | | field['enums'][0]['default'] = True |
| | | # 校验和 |
| | | if field['type'] == 'checkSum': |
| | | field['range'] = f'{field["value"]["start"]}~{field["value"]["end"]}' |
| | | ins_format = create_ins_format(self.proj.C_PROJECT_PK, parent_pk, field) |
| | | ins_format_pk = ins_format.C_INS_FORMAT_PK |
| | | if 'children' in field: |
| | | autocode = 1 |
| | | if field['type'] == 'pkt': |
| | | ins_format = create_ins_format(self.proj.C_PROJECT_PK, ins_format.C_INS_FORMAT_PK, |
| | | {'order': self.order, 'type': 'subPkt', |
| | | 'def': json.dumps({"CanInput": False})}) |
| | | info = { |
| | | 'order': self.order, |
| | | 'type': 'subPkt', |
| | | 'def': json.dumps({"CanInput": False}) |
| | | } |
| | | ins_format = create_ins_format(self.proj.C_PROJECT_PK, ins_format_pk, info) |
| | | self.order += 1 |
| | | for child in field['children']: |
| | | child['autocode'] = autocode |
| | | autocode += 1 |
| | | create_tc_format(ins_format.C_INS_FORMAT_PK, child) |
| | | # if 'subPkts' in field: |
| | | # for pkt in field['subPkts']: |
| | | # ins_format = create_ins_format(self.proj.C_PROJECT_PK, ins_format.C_INS_FORMAT_PK, |
| | | # {'order': self.order, 'type': 'subPkt', |
| | | # 'def': json.dumps({"CanInput": False})}) |
| | | # create_tc_format(ins_format.C_INS_FORMAT_PK, pkt) |
| | | if field['type'] == 'insUnitList': |
| | | _parent_pk = parent_parent_pk |
| | | else: |
| | | _parent_pk = ins_format.C_INS_FORMAT_PK |
| | | create_tc_format(_parent_pk, child, ins_format_pk) |
| | | if 'subPkts' in field: |
| | | for _pkt in field['subPkts']: |
| | | create_tc_format(ins_format_pk, _pkt, parent_pk) |
| | | |
| | | create_tc_format(None, frame) |
| | | |
| | | def gen_tc_transfer_frame(self): |
| | | async def gen_tc_transfer_frame_format(self): |
| | | _msg = ''' |
| | | 分析YK传送帧格式,提取YK传送帧的数据结构,不包括数据包的数据结构。 |
| | | ## 经验: |
| | | 字段类型包括: |
| | | 1.组合包:combPkt, |
| | | 2.固定码字:const, |
| | | 3.长度:length, |
| | | 4.枚举值:enum, |
| | | 5.校验和:checkSum, |
| | | 6.数据区:subPkt。 |
| | | |
| | | 根据字段描述分析字段的类型,分析方法: |
| | | 1.字段描述中明确指定了字段值的,类型为const, |
| | | 2.字段中没有明确指定字段值,但是罗列了取值范围的,类型为enum, |
| | | 3.字段描述中如果存在多层级描述则父级字段的类型为combPkt, |
| | | 4.字段如果是和“长度”有关,类型为length, |
| | | 5.如果和数据域有关,类型为subPkt, |
| | | 6.字段如果和校验和有关,类型为checkSum。 |
| | | |
| | | 字段值提取方法: |
| | | 1.字段描述中明确指定了字段值, |
| | | 2.长度字段的值要根据描述确定起止字段范围以及计算公式,value格式例如:{"start":"<code>","end":"<code>","formula":"N-1"},注意:start和end的值为字段code。 |
| | | |
| | | ## 限制: |
| | | - length 自动转换为bit长度。 |
| | | - value 根据字段描述提取。 |
| | | - enums 有些字段是枚举值,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""}。 |
| | | - 输出内容必须为严格的json,不能输出除json以外的任何内容。 |
| | | |
| | | 字段数据结构: |
| | | 主导头 |
| | | 版本号、通过标志、控制命令标志、空闲位、HTQ标识、虚拟信道标识、帧长、帧序列号 |
| | | 传送帧数据域 |
| | | 帧差错控制域。 |
| | | |
| | | # 输出内容例子: |
| | | # 角色 |
| | | 你是一名资深的软件工程师。 |
| | | # 指令 |
| | | 分析遥控传送帧格式,提取遥控传送帧格式的字段定义。 |
| | | # 需求 |
| | | 要提取值的帧格式字段: |
| | | - 版本号:const,二进制,以B结尾; |
| | | - 通过标志:const,二进制,以B结尾; |
| | | - 控制命令标志:const,二进制,以B结尾; |
| | | - 空闲位:const,二进制,以B结尾; |
| | | - 航天器标识:const,十六进制,以0x开头,如果是二进制或十进制需要转换为十六进制; |
| | | - 虚拟信道标识:sendFlag,发送标记,默认为“任务注入帧”,所有的值都要列举出来; |
| | | # 数据类型 |
| | | - const:固定码字,数值,二进制以B结尾,十进制,十六进制以0x开头; |
| | | - sendFlag:发送标记,类似枚举,定义样例:[{"n":"name","v":"value","c":"code","default":true}],n表示名称,v表示值,c表示code(没有空着),default表示是默认值; |
| | | - checkSum:校验和,如果是校验和类型还需要分析校验和的算法,并保存在value的type中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other) |
| | | # 约束 |
| | | - 以JSON格式输出; |
| | | - 仅输出JSON文本,不要输出任何其他文本。 |
| | | # 输出例子: |
| | | { |
| | | "name": "YK帧", |
| | | "type": "pkt" |
| | | "children":[ |
| | | { |
| | | "name": "主导头", |
| | | "code": "primaryHeader", |
| | | "length": 2, |
| | | "value": "00", |
| | | "type": "combPkt", |
| | | "children": [ |
| | | { |
| | | "name": "版本号", |
| | | "code": "verNum" |
| | | "length": 1, |
| | | "value": "00" |
| | | } |
| | | ] |
| | | } |
| | | ], |
| | | "subPkts":[] |
| | | "版本号": "00B", |
| | | "通过标志": "0", |
| | | ... |
| | | } |
| | | ''' |
| | | |
| | | def validation(gen_text): |
| | | json.loads(gen_text) |
| | | |
| | | text = self.generate_tc_text(_msg, 'out/tc_transfer_frame.json', files=[file_map['指令格式']], |
| | | validation=validation) |
| | | frame = json.loads(text) |
| | | doc_text = self.get_text_with_entity(['遥控帧格式']) |
| | | text = await asyncio.to_thread( |
| | | lambda: self.generate_tc_text(_msg, f'{self.json_path}/tc_transfer_frame.json', doc_text=doc_text, |
| | | validation=validation)) |
| | | result: dict = json.loads(text) |
| | | format_text = utils.read_from_file('tpl/tc_transfer_frame.json') |
| | | format_text = utils.replace_tpl_paras(format_text, result) |
| | | frame = json.loads(format_text) |
| | | return frame |
| | | |
| | | def gen_tc_transfer_pkt(self): |
| | | async def gen_tc_pkt_format(self): |
| | | _msg = ''' |
| | | 仅分析YK包格式,提取YK包数据结构。 |
| | | ## 经验: |
| | | |
| | | 字段类型包括: |
| | | 1.组合包:combPkt, |
| | | 2.固定码字:const, |
| | | 3.长度:length, |
| | | 4.枚举值:enum, |
| | | 5.校验和:checkSum, |
| | | 6.数据区:subPkt。 |
| | | |
| | | 根据字段描述分析字段的类型,分析方法: |
| | | 1.字段描述中明确指定了字段值的,类型为const, |
| | | 2.字段中没有明确指定字段值,但是罗列了取值范围的,类型为enum, |
| | | 3.字段描述中如果存在多层级描述则父级字段的类型为combPkt, |
| | | 4.字段如果是和“长度”有关,类型为length, |
| | | 5.如果和数据域有关,类型为subPkt, |
| | | 6.字段如果和校验和有关,类型为checkSum。 |
| | | |
| | | 字段值提取方法: |
| | | 1.字段描述中明确指定了字段值, |
| | | 2.长度字段的值要根据描述确定起止字段范围以及计算公式,value格式例如:{"start":"<code>","end":"<code>","formula":"N-1"},注意:start和end的值为字段code。 |
| | | |
| | | ## 限制: |
| | | - length 自动转换为bit长度。 |
| | | - value 根据字段描述提取。 |
| | | - enums 有些字段是枚举值,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""}。 |
| | | - 输出内容必须为严格的json,不能输出除json以外的任何内容。 |
| | | |
| | | 字段数据结构: |
| | | 主导头 |
| | | 包识别 |
| | | 包版本号、包类型、数据区头标志、应用进程标识符(APID) |
| | | 包序列控制 |
| | | 序列标志 |
| | | 包序列计数 |
| | | 包长 |
| | | 副导头 |
| | | CCSDS副导头标志 |
| | | YK包版本号 |
| | | 命令正确应答(Ack) |
| | | 服务类型 |
| | | 服务子类型 |
| | | 源地址 |
| | | 应用数据区 |
| | | 帧差错控制域。 |
| | | |
| | | # 输出内容例子: |
| | | # 角色 |
| | | 你是一名资深的软件工程师。 |
| | | # 指令 |
| | | 分析遥控包格式,提取遥控包格式的字段定义。 |
| | | # 需求 |
| | | 要提取值的包格式字段: |
| | | - packetVersionNumber:包版本号,const,二进制; |
| | | - packetType:包类型,const,二进制; |
| | | - dataFieldHeaderFlag:数据区头标志,const,二进制; |
| | | - sequenceFlags:序列标志,const,二进制; |
| | | - ccsdsSecondaryHeaderFlag:副导头标志,const,二进制; |
| | | - tcPktVersionNumber:遥控包版本号,const,二进制; |
| | | - acknowledgmentFlag:命令正确应答,const,二进制; |
| | | - sourceAddr:源地址,const,十六进制。 |
| | | # 数据类型 |
| | | - 固定码字:const,数值,二进制以B结尾,十进制,十六进制以0x开头; |
| | | - 长度:length,如果字段描述内容为数据区域的长度则表示是长度,长度的value为数值、null或范围定义, |
| | | - 枚举值:enum, |
| | | - 校验和:checkSum,如果是校验和类型还需要分析校验和的算法,并保存在value的type中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other) |
| | | - 即时输入:input,如果是即时输入value的值为变长定义。 |
| | | ## 长度类型的范围定义描述 |
| | | {"start": "起始字段code", "end": "结束字段code", "formula": "计算公式"} |
| | | - start:起始字段code,长度包括起始字段,字段描述中说明了起始字段, |
| | | - end:结束字段code,长度包括结束字段,字段描述中说明了结束字段, |
| | | - formula:计算公式,如果没有计算相关描述则表示不需要计算公式。 |
| | | ## 即使输入类型的变长定义描述 |
| | | {"minLength": "最小长度", "maxLength": "最大长度", "variableLength": true} |
| | | - minLength:最小长度, |
| | | - maxLength:最大长度, |
| | | - variableLength:是否是变长。 |
| | | 计算公式定义: |
| | | - BYTES:按字节计算; |
| | | - N-x:总字节数减x,例如总字节数减1的公式为N-1。 |
| | | # 约束 |
| | | - 以JSON格式输出; |
| | | - 仅输出JSON文本,不要输出任何其他文本。 |
| | | # 输出例子: |
| | | { |
| | | "name": "YK包", |
| | | "type": "pkt" |
| | | "children":[ |
| | | { |
| | | "name": "主导头", |
| | | "code": "primaryHeader", |
| | | "length": 2, |
| | | "value": "00", |
| | | "type": "combPkt", |
| | | "children": [ |
| | | { |
| | | "name": "版本号", |
| | | "code": "verNum" |
| | | "length": 1, |
| | | "value": "00" |
| | | } |
| | | ] |
| | | } |
| | | ], |
| | | "subPkts":[] |
| | | "packetVersionNumber": "00B", |
| | | "packetType": "1B", |
| | | ... |
| | | } |
| | | ''' |
| | | |
| | | def validation(gen_text): |
| | | json.loads(gen_text) |
| | | |
| | | text = self.generate_tc_text(_msg, 'out/tc_transfer_pkt.json', files=[file_map['指令格式']], |
| | | validation=validation) |
| | | pkt_format = json.loads(text) |
| | | doc_text = self.get_text_with_entity(['遥控包格式']) |
| | | text = await asyncio.to_thread( |
| | | lambda: self.generate_tc_text(_msg, f'{self.json_path}/tc_transfer_pkt.json', doc_text=doc_text, |
| | | validation=validation)) |
| | | result = json.loads(text) |
| | | |
| | | format_text = utils.read_from_file('tpl/tc_pkt_format.json') |
| | | format_text = utils.replace_tpl_paras(format_text, result) |
| | | pkt_format = json.loads(format_text) |
| | | return pkt_format |
| | | |
| | | def gen_tc_transfer_pkts(self): |
| | | async def gen_tc_transfer_pkts(self): |
| | | _msg = ''' |
| | | 分析文档列出所有的遥控源包。 |
| | | ## 数据结构如下: |
| | | # 角色 |
| | | 你是一名资深的软件工程师。 |
| | | # 指令 |
| | | 分析文档列出所有的遥控指令。 |
| | | # 约束 |
| | | - 应用过程标识:应用过程标识就是APID,一般会在名称后的括号中列出来; |
| | | - code:指令代号,如果没有填写或者是“/”则使用空字符串代替; |
| | | - name:指令名称,根据表格行内容提取,注意是行内容,注意名称需要提取完整,如果有多列则合并用-分割; |
| | | - shortName:指令名称,根据表格内容提取; |
| | | - apid: 应用过程标识符; |
| | | - serviceType:服务类型; |
| | | - serviceSubtype:服务子类型; |
| | | - dataArea:应用数据区,提取表格中的应用数据区内容。 |
| | | # 输出例子: |
| | | [{ |
| | | "name": "xxx", |
| | | "name": "aaa-xxx", |
| | | "shortName": "xxx" |
| | | "code":"pkt", |
| | | "apid":"0xAA", |
| | | "server":"0x1", |
| | | "subServer":"0x2" |
| | | "serviceType":"0x1", |
| | | "serviceSubtype":"0x2", |
| | | "dataArea": "" |
| | | }] |
| | | ''' |
| | | |
| | | def validation(gen_text): |
| | | json.loads(gen_text) |
| | | |
| | | text = self.generate_tc_text(_msg, 'out/tc_transfer_pkts.json', files=[file_map['指令格式']], |
| | | validation=validation) |
| | | doc_text = self.get_text_with_entity(['APID分配']) |
| | | text = await asyncio.to_thread( |
| | | lambda: self.generate_tc_text(_msg, f'{self.json_path}/tc_transfer_pkts.json', doc_text=doc_text, |
| | | validation=validation)) |
| | | pkts = json.loads(text) |
| | | return pkts |
| | | |
| | | async def gen_tc_details(self, pkt): |
| | | result = [] |
| | | tc_name = pkt['shortName'] |
| | | tc_code = pkt['code'] |
| | | pkt['name'] = f'{tc_code} {tc_name}' |
| | | _msg = f""" |
| | | # 角色 |
| | | 你是一个资深软件工程师。 |
| | | |
| | | if __name__ == '__main__': |
| | | # 指令 |
| | | 分析文档,从文档中提取遥控指令名称为“{tc_name}”代号为“{tc_code}”的指令应用数据区定义。 |
| | | |
| | | 有些文档内容非常简单仅仅包含特定字节的内容描述,如果是这种文档,则每个特定字节的内容描述定义为一个字段,字段类型根据字节内容确定。 |
| | | """ + """ |
| | | |
| | | # 字段类型 |
| | | - 固定码字:const,数值,二进制以B结尾,十进制,十六进制以0x开头; |
| | | - 长度:length,如果字段描述内容为数据区域的长度则表示是长度,长度的value为数值、null或范围定义, |
| | | - 枚举值:enum, |
| | | - 校验和:checkSum,如果是校验和类型还需要分析校验和的算法是什么以及校验数据域范围,并保存在value中,例如:{ "type":"CRC-CCITT", "start": "START", "end":"END" }, |
| | | - 即时输入:input,如果是即时输入value的值为变长定义。 |
| | | |
| | | ## 长度类型的范围定义描述 |
| | | {"start": "起始字段code", "end": "结束字段code", "formula": "计算公式"} |
| | | - start:起始字段code,长度包括起始字段,字段描述中说明了起始字段, |
| | | - end:结束字段code,长度包括结束字段,字段描述中说明了结束字段, |
| | | - formula:计算公式,如果没有长度特殊计算相关描述则使用BYTES。 |
| | | 计算公式定义: |
| | | - BYTES:按字节计算,字节数; |
| | | - N-x:总字节数减x,例如总字节数减1的公式为N-1。 |
| | | ## 即使输入类型的变长定义描述 |
| | | {"minLength": "最小长度", "maxLength": "最大长度", "variableLength": true} |
| | | - minLength:最小长度, |
| | | - maxLength:最大长度, |
| | | - variableLength:是否是变长。 |
| | | |
| | | # 字段类型分析方法 |
| | | - 根据字段描述分析字段的类型; |
| | | - 字段描述中明确指定了字段值的,类型为const; |
| | | - 字段描述中没有明确指定字段值,但是罗列了取值范围的,类型为enum; |
| | | - 字段描述中如果没有明确指定字段值也没有罗列取值范围的,类型为input; |
| | | - 字段如果描述了当前指令中的数据域长度以及长度范围则是长度类型length,否则不是长度类型; |
| | | - 如果和数据域有关,类型为const; |
| | | - 校验和类型:字段如果与当前指令数据区的校验和有关则为校验和类型否则不是校验和类型,分析校验和的算法,并保存在value中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other)。 |
| | | |
| | | # 约束 |
| | | ## 字段属性 |
| | | - code 如果没有明确定义则使用名称的英文翻译,尽量简短,如果没有填写或者为斜线表示没有明确定义; |
| | | - length 自动转换为bit长度,必须是数值、null或范围定义,不能为0; |
| | | - value 根据字段描述提取字段值,字段值一般为数值类型,需要根据字段类型来分析,如果是length类型value的值为范围定义; |
| | | - enums 枚举类型的字段必须要有enums,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""}; |
| | | ## 字段类型 |
| | | - 长度类型字段的范围定义中的start和end必须是生成结果中的字段code,长度范围包括start和end,必须使用长度描述中的字段; |
| | | - 如果没有长度范围描述则不是长度类型; |
| | | - 校验和类型字段必须描述的是当前指令数据域校验和,如果描述的不是当前指令的数据域校验和则不是校验和类型; |
| | | - 输出数据结构为数组,数组元素为字段信息; |
| | | - 输出内容必须为严格的json,不能输出除json以外的任何内容。 |
| | | |
| | | # 输出例子: |
| | | [ |
| | | { |
| | | "name": "para1", |
| | | "code": "para1", |
| | | "length": 8, |
| | | "type": "const", |
| | | "value": "0xAA" |
| | | }, |
| | | { |
| | | "name": "para2", |
| | | "code": "para2", |
| | | "length": 8, |
| | | "type": "length", |
| | | "value": {"start": "data", "end": "data", "formula": "BYTES"} |
| | | }, |
| | | { |
| | | "name": "para3", |
| | | "code": "para3", |
| | | "length": 8, |
| | | "type": "enum", |
| | | "value": "", |
| | | "enums": [{"n":"参数1","v":"0x0A","c":"Para1"}] |
| | | }, |
| | | { |
| | | "name": "数据", |
| | | "code": "data", |
| | | "length": null, |
| | | "type": "input", |
| | | "value": "" |
| | | }, |
| | | { |
| | | "name": "校验和", |
| | | "code": "checksum", |
| | | "length": 2, |
| | | "type": "checkSum", |
| | | "value": { "type": "CRC-CCITT", "start":"para1", "end":"data" } |
| | | } |
| | | ] |
| | | """ |
| | | |
| | | def validation(gen_text): |
| | | fields = json.loads(gen_text) |
| | | for field in fields: |
| | | if field['type'] == 'length': |
| | | if field['value'] is None: |
| | | raise Exception('length类型的value不能为空') |
| | | if 'start' not in field['value'] or 'end' not in field['value']: |
| | | raise Exception('length类型的value必须包含start和end') |
| | | if field['value']['start'] not in [f['code'] for f in fields]: |
| | | raise Exception('length类型的value的start字段必须在fields中') |
| | | if field['value']['end'] not in [f['code'] for f in fields]: |
| | | raise Exception('length类型的value的end字段必须在fields中') |
| | | elif field['type'] == 'enum': |
| | | if 'enums' not in field: |
| | | raise Exception('enum类型的field必须包含enums') |
| | | if len(field['enums']) == 0: |
| | | raise Exception('enum类型的field的enums不能为空') |
| | | for enum in field['enums']: |
| | | if 'n' not in enum or 'v' not in enum: |
| | | raise Exception('enum类型的field的enums的元素必须包含n、v、c') |
| | | if enum['n'] == '' or enum['v'] == '': |
| | | raise Exception('enum类型的field的enums的元素不能为空') |
| | | |
| | | doc_text = self.get_text_with_entity([tc_name]) |
| | | if doc_text == '': |
| | | doc_text = self.get_text_with_tc_name(tc_name) |
| | | if doc_text == '': |
| | | doc_text = pkt['dataArea'] |
| | | text = await asyncio.to_thread(self.generate_tc_text, _msg, |
| | | f"{self.json_path}/遥控指令数据域-{tc_code}-{utils.to_file_name(tc_name)}.json", |
| | | doc_text=doc_text, validation=validation) |
| | | result = json.loads(text) |
| | | pkt['children'] = result |
| | | |
| | | def get_text_with_tc_name(self, tc_name: str): |
| | | entities = doc_dbh.get_entities_by_type('指令格式配置') |
| | | entity_names = '\n'.join([f'- {e.name}' for e in entities]) |
| | | msg = f""" |
| | | # 需求 |
| | | 请从下列指令名称中匹配一个与“{tc_name}”相似度最高的指令名称。 |
| | | 指令名称列表: |
| | | {entity_names} |
| | | """ |
| | | name = self.generate_text(msg,None) |
| | | entity = next(filter(lambda e: e.name == name, entities,None)) |
| | | if entity: |
| | | return self.get_text_with_entity([entity.name]) |
| | | else: |
| | | return '' |
| | | |
| | | |
| | | def tc_data_generate(): |
| | | exe_path = os.path.dirname(__file__) + "/db_tc_generator/InstructionGenerator.exe" |
| | | db_path = os.path.dirname(__file__) + "/db.db" |
| | | try: |
| | | os.makedirs("./out/pkts", exist_ok=True) |
| | | # 超时时间240秒 |
| | | result = subprocess.run([exe_path, db_path], timeout=240) |
| | | print(result.stdout) |
| | | print(result.returncode) |
| | | except subprocess.TimeoutExpired: |
| | | print("警告:指令数据生成失败!") |
| | | |
| | | |
| | | def main(): |
| | | try: |
| | | project_path = r'D:\projects\KnowledgeBase' |
| | | doc_dbh.set_project_path(project_path) |
| | | # 启动大模型处理流程 |
| | | ret_text = DbStructFlow().run() |
| | | asyncio.run(DbStructFlow(f'{project_path}').run()) |
| | | # 生成指令数据表 |
| | | tc_data_generate() |
| | | except KeyboardInterrupt: |
| | | if g_completion: |
| | | g_completion.close() |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | main() |