import os from datetime import datetime from openai import OpenAI from pathlib import Path import re import json import copy from datas import pkt_vc, pkt_datas, dev_pkt, proj_data from db.db_generate import create_project, create_device, create_data_stream from db.models import TProject, TDevice BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1' API_KEY = 'sk-15ecf7e273ad4b729c7f7f42b542749e' MODEL_NAME = 'qwen-long' assistant_msg = """ # 角色 你是一个专业的文档通信分析师,擅长进行文档分析和通信协议分析,同时能够解析 markdown 类型的文档。拥有成熟准确的文档阅读与分析能力,能够妥善处理多文档间存在引用关系的复杂情况。 ## 技能 ### 技能 1:文档分析(包括 markdown 文档) 1. 当用户提供文档时,仔细阅读文档内容,严格按照文档中的描述提取关键信息,不得加入自己的回答或建议。 2. 分析文档的结构、主题和重点内容,同样只依据文档进行表述。 3. 如果文档间存在引用关系,梳理引用脉络,明确各文档之间的关联,且仅呈现文档中体现的内容。 ### 技能 2:通信协议分析 1. 接收通信协议相关信息,理解协议的规则和流程,仅依据所给信息进行分析。 ## 目标导向 1. 通过对文档和通信协议的分析,为用户提供清晰、准确的数据结构,帮助用户更好地理解和使用相关信息。 2. 以 JSON 格式组织输出内容,确保数据结构的完整性和可读性。 ## 规则 1. 每一个型号都会有一套文档,需准确判断是否为同一个型号的文档后再进行整体分析。 2. 每次只分析同一个型号。 3. 大多数文档结构为:型号下包含设备,设备下包含数据流,数据流下包含数据帧,数据帧中有一块是包域,包域中会挂载各种类型的数据包。 4. 这些文档都是数据传输协议的描述,在数据流、数据帧、数据包等传输实体中都描述了各个字段的分布和每个字段的大小,且大小单位不统一,需理解这些单位,并将所有输出单位统一为 bits,统一使用length表示。 5. 如果有层级,使用树形 JSON 输出,子节点 key 使用children;需保证相同类型的数据结构统一,并且判断每个层级是什么类型,输出类型字段,类型字段的 key 使用 type ;例如当前层级为字段时使用:type:"field";当前层级为设备时使用:type:"device" 6.名称相关的字段的 key 使用name;代号或者唯一标识相关的字段的key使用id;序号相关的字段的key使用number;其他没有举例的字段使用精简的翻译作为字段的key; 7.探测帧为CADU,其中包含同步头和VCDU,按照习惯需要使用VCDU层级包含下一层级中传输帧主导头、传输帧插入域、传输帧数据域、传输帧尾的结构 ## 限制: - 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。 - 不输出任何注释等描述性信息 """ class DbStructFlow: files = [] file_objects = [] def __init__(self, doc_files): self.client = OpenAI( api_key=API_KEY, base_url=BASE_URL, # api_key="ollama", # base_url="http://192.168.1.48:11434/v1/", ) if doc_files: self.files = doc_files self.load_file_objs() self.delete_all_files() self.upload_files() def load_file_objs(self): file_stk = self.client.files.list() self.file_objects = file_stk.data def delete_all_files(self): for file_object in self.file_objects: self.client.files.delete(file_object.id) def upload_file(self, file_path): file_object = self.client.files.create(file=Path(file_path), purpose="file-extract") return file_object def upload_files(self): self.file_objects = [] for file_path in self.files: file_object = self.upload_file(file_path) self.file_objects.append(file_object) def run(self): # 生成型号结构 # 生成设备结构 # 生成数据流结构 CADU # 生成VCDU结构 # 生成遥测数据包结构 proj = self.gen_project([]) # proj = TProject(C_PROJECT_PK='2e090a487c1a4f7f741be3a437374e2f') devs = self.gen_device([], proj) # with open('datas/设备列表.json', 'w', encoding='utf8') as f: # json.dump(devs, f, ensure_ascii=False, indent=4) # # proj['devices'] = devs # # messages = [] # cadu = self.gen_tm_frame(messages) # with open("datas/探测帧.json", 'w', encoding='utf8') as f: # json.dump(cadu, f, ensure_ascii=False, indent=4) # # messages = [] # vcs = self.gen_vc(messages) # with open('datas/虚拟信道.json', 'w', encoding='utf8') as f: # json.dump(vcs, f, ensure_ascii=False, indent=4) # # messages = [] # pkt_vcs = self.gen_pkt_vc(messages) # with open('datas/VC源包.json', 'w', encoding='utf8') as f: # json.dump(pkt_vcs, f, ensure_ascii=False, indent=4) # # messages = [] # dev_pkts = self.gen_dev_pkts(messages) # with open('datas/设备源包.json', 'w', encoding='utf8') as f: # json.dump(dev_pkts, f, ensure_ascii=False, indent=4) # # messages = [] # _pkts = self.gen_pkts() # pkts = [] # for pkt in _pkts: # _pkt = self.gen_pkt_details(pkt['name']) # pkts.append(_pkt) # with open('datas/源包列表.json', 'w', encoding='utf8') as f: # json.dump(pkts, f, ensure_ascii=False, indent=4) # # for dev in devs: # ds = dev['data_streams'][0] # _cadu = copy.deepcopy(cadu) # ds['cadu'] = _cadu # _vcdu = next(filter(lambda it: it['name'] == '传输帧', _cadu['children'])) # vcdu_data = next(filter(lambda it: it['name'] == '传输帧数据域', _vcdu['children'])) # _vcs = copy.deepcopy(vcs) # vcdu_data['children'] = _vcs # dev_pkt = next(filter(lambda it: it['name'] == dev['name'], dev_pkts), None) # if dev_pkt is None: # continue # for pkt in dev_pkt['pkts']: # for vc in _vcs: # _pkt = next( # filter(lambda it: it['name'] == pkt['name'] and it['vcs'].__contains__(vc['code']), pkt_vcs), # None) # if _pkt: # if vc.__contains__('pkts') is False: # vc['pkts'] = [] # _pkt = next(filter(lambda it: it['name'] == _pkt['name'], pkts), None) # if _pkt: # vc['pkts'].append(_pkt) # # with open("datas/型号.json", 'w', encoding='utf8') as f: # json.dump(proj, f, ensure_ascii=False, indent=4) return '' def _gen(self, msgs, msg): messages = [] if msgs is None else msgs if len(messages) == 0: # 如果是第一次提问加入文档 messages.append({'role': 'system', 'content': assistant_msg}) for file_object in self.file_objects: messages.append({'role': 'system', 'content': 'fileid://' + file_object.id}) messages.append({'role': 'user', 'content': msg}) completion = self.client.chat.completions.create( model=MODEL_NAME, messages=messages, stream=True, temperature=0.0, top_p=0, timeout=30 * 60000, max_completion_tokens=1000000 # stream_options={"include_usage": True} ) text = '' for chunk in completion: if chunk.choices[0].delta.content is not None: text += chunk.choices[0].delta.content print(chunk.choices[0].delta.content, end="") print("") return text def gen_project(self, messages): _msg = f""" 根据文档输出型号信息,型号字段包括:名称和代号,仅输出型号的属性,不输出其他层级数据 """ print('型号信息:') text = self._gen(messages, _msg) messages.append({'role': 'assistant', 'content': text}) text = self.remove_markdown(text) proj_dict = json.loads(text) # return proj_dict code = proj_dict['id'] name = proj_dict['name'] proj = create_project(code, name, code, name, "", datetime.now()) return proj def gen_device(self, messages, proj): """ 设备列表生成规则: 1.如文档中有1553协议描述,加入1553设备 2.如是类SMU软件(遥测遥控包含,BC或者RT),加入对应相关设备,文档只有设备名称和设备ID,设备类型90%是标准类型 3.如是类RTU软件,加入对应相关设备,文档里面有设备名称和设备ID,同上 4.如基于软平台,如是SMU软件,加入SMU工控机设备,待定 设备类型:工控机[0]、1553B[1] :param messages: :return: """ proj_pk = proj.C_PROJECT_PK devices = [] _msg = f""" 输出所有设备列表,设备字段包括名称(name)、代号(code),如果没有代号则使用名称的英文翻译缩写代替且缩写长度不超过5个字符,JSON格式,并且给每个设备增加三个字段,第一个字段hasTcTm“是否包含遥控遥测”,判断该设备是否包含遥控遥测的功能;第二个字段hasTemperatureAnalog“是否包含温度量、模拟量等数据的采集”,判断该设备是否包含温度量等信息的采集功能;第三个字段hasBus“是否是总线设备”,判断该设备是否属于总线设备,是否有RT地址;每个字段的值都使用true或false来表示。 仅输出JSON,不要输出JSON以外的任何字符。 """ print('设备列表:') text = self._gen(messages, _msg) text = self.remove_markdown(text) devs = json.loads(text) hasBus = any(d['hasBus'] for d in devs) if hasBus: # 总线设备 dev = create_device("B1553", "1553总线", '1', 'StandardProCommunicationDev', proj_pk) devices.append(dev) # 创建数据流 ds_u153 = create_data_stream(proj_pk, dev.C_DEV_PK, 'ECSS上行总线数据', 'U153', 'B153', '0', 'E153', '001') ds_d153 = create_data_stream(proj_pk, dev.C_DEV_PK, 'ECSS下行总线数据', 'D153', 'B153', '1', 'E153', '001') # 类SMU设备,包含遥测和遥控功能,名称结尾为“管理单元” like_smu_devs = list(filter(lambda it: it['hasTcTm'] and it['name'].endswith('管理单元'), devs)) for dev in like_smu_devs: dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK) devices.append(dev) # 创建数据流 ds_tmfl = create_data_stream(proj_pk, dev.C_DEV_PK, 'AOS遥测', 'TMFL', 'TMFL', '1', 'TMFL', '001') ds_tcfl = create_data_stream(proj_pk, dev.C_DEV_PK, '遥控指令', 'TCFL', 'TCFL', '0', 'TCFL', '006') # 类RTU设备,包含温度量和模拟量功能,名称结尾为“接口单元” like_rtu_devs = list(filter(lambda it: it['hasTemperatureAnalog'] and it['name'].endswith('接口单元'), devs)) for dev in like_rtu_devs: dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK) # for dev in like_rtu_devs: # dev = create_device(dev['code'], dev['name'], '0', '', proj.C_PROJECT_PK) # devices.append(dev) # # 创建数据流 # ds_tmfl = create_data_stream(proj.C_PROJECT_PK, '温度量', 'TMFL', 'TMFL', '1', 'TMFL', '001') # ds_tcfl = create_data_stream(proj.C_PROJECT_PK, '模拟量', 'TCFL', 'TCFL', '0', 'TCFL', '006') print() # 总线设备 # print('是否有总线设备:', end='') # _msg = "文档中描述的有总线相关内容吗?仅回答:“有”或“无”,不要输出其他文本。" # text = self._gen([], _msg) # if text == "有": # _msg = f""" # 文档中描述的总线型号是多少,仅输出总线型号不要输出型号以外的其他任何文本,总线型号由数字和英文字母组成。 # """ # print('设备ID:') # dev_code = self._gen([], _msg) # dev = create_device(dev_code, dev_code, '1', '', proj.C_PROJECT_PK) # devices.append(dev) # 类SMU软件 # print('是否有类SMU设备:', end='') # _msg = "文档中有描述遥测和遥控功能吗?仅回答:“有”或“无”,不要输出其他文本。" # text = self._gen([], _msg) # if text == "有": # # 系统管理单元 # print('是否有系统管理单元(SMU):', end='') # _msg = f"文档中有描述系统管理单元(SMU)吗?仅回答“有”或“无”,不要输出其他文本。" # text = self._gen([], _msg) # if text == "有": # dev = create_device("SMU", "系统管理单元", '0', '', proj.C_PROJECT_PK) # devices.append(dev) # # 中心控制单元(CTU) # print('是否有中心控制单元(CTU):', end='') # _msg = f"文档中有描述中心控制单元(CTU)吗?仅回答“有”或“无”,不要输出其他文本。" # text = self._gen([], _msg) # if text == "有": # dev = create_device("CTU", "中心控制单元", '0', '', proj.C_PROJECT_PK) # devices.append(dev) # # # 类RTU # print('是否有类RTU设备:', end='') # _msg = "文档中有描述模拟量采集和温度量采集功能吗?仅回答:“有”或“无”,不要输出其他文本。" # text = self._gen([], _msg) # if text == "有": # dev = create_device("RTU", "远置单元", '0', '', proj.C_PROJECT_PK) # devices.append(dev) # device_dicts = json.loads(text) # for device_dict in device_dicts: # data_stream = {'name': '数据流', 'code': 'DS'} # device_dict['data_streams'] = [data_stream] # # return device_dicts return devices def gen_tm_frame(self, messages): _msg = f""" 输出探测帧的结构,探测帧字段包括:探测帧代号(id)、探测帧名称(name)、长度(length)、下级数据单元列表(children)。代号如果没有则用名称的英文翻译,包括下级数据单元。 """ print('探测帧信息:') text = self._gen(messages, _msg) messages.append({'role': 'assistant', 'content': text}) text = self.remove_markdown(text) cadu = json.loads(text) return cadu def gen_vc(self, messages): _msg = f""" 输出探测虚拟信道的划分,不需要描述信息,使用一个数组输出,字段包括:代号(code)、vcid、名称(name)。 """ print('虚拟信道:') text = self._gen(messages, _msg) messages.append({'role': 'assistant', 'content': text}) text = self.remove_markdown(text) vcs = json.loads(text) return vcs def gen_dev_pkts(self, messages): _msg = f""" 输出文档中探测源包类型定义描述的设备以及设备下面的探测包,数据结构:最外层为设备列表 > 探测包列表(pkts),设备字段包括:名称(name)、代号(id),源包字段包括:名称(name)、代号(id) """ print('设备探测源包信息:') file = next(filter(lambda it: it.filename == 'XA-5D无人机分系统探测源包设计报告(公开).md', self.file_objects), None) messages = [{'role': 'system', 'content': assistant_msg}, {'role': 'system', 'content': 'fileid://' + file.id}] text = self._gen(messages, _msg) messages.append({'role': 'assistant', 'content': text}) text = self.remove_markdown(text) dev_pkts = json.loads(text) return dev_pkts def gen_pkt_details(self, pkt_name): _msg = f""" 输出文档中描述的“{pkt_name}”探测包。 探测包字段包括:名称(name)、代号(id)、包头属性列表(headers)、数据域参数列表(datas), 包头属性字段包括:位置(pos)、名称(name)、代号(id)、定义(val), 数据域参数字段包括:位置(pos)、名称(name)、代号(id)、字节顺序(byteOrder), 如果没有代号用名称的英文翻译代替,如果没有名称用代号代替, 输出内容仅输出json,不要输出任何其他内容! """ print(f'探测源包“{pkt_name}”信息:') file = next(filter(lambda it: it.filename == 'XA-5D无人机分系统探测源包设计报告(公开).md', self.file_objects), None) messages = [{'role': 'system', 'content': assistant_msg}, {'role': 'system', 'content': 'fileid://' + file.id}] text = self._gen(messages, _msg) messages.append({'role': 'assistant', 'content': text}) text = self.remove_markdown(text) pkt = json.loads(text) return pkt def gen_pkts(self): _msg = f""" 输出文档中描述的探测包。 探测包字段包括:名称(name)、代号(id), 如果没有代号用名称的英文翻译代替,如果没有名称用代号代替, 顶级结构直接从探测包开始,不包括探测包下面的参数。 """ print(f'探测源包列表:') file = next( filter(lambda it: it.filename == 'XA-5D无人机分系统探测源包设计报告(公开).md', self.file_objects), None) messages = [{'role': 'system', 'content': assistant_msg}, {'role': 'system', 'content': 'fileid://' + file.id}] text = self._gen(messages, _msg) messages.append({'role': 'assistant', 'content': text}) text = self.remove_markdown(text) pkt = json.loads(text) return pkt def gen_pkt_vc(self, messages): _msg = f""" 根据探测源包下传时机定义,输出各个探测源包信息列表,顶级结构为数组元素为探测源包,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs),下传时机(timeTags) """ print('探测源包所属虚拟信道:') text = self._gen(messages, _msg) messages.append({'role': 'assistant', 'content': text}) text = self.remove_markdown(text) pkt_vcs = json.loads(text) return pkt_vcs def remove_markdown(self, text): # 去掉开头的```json text = re.sub(r'^```json', '', text) # 去掉结尾的```json text = re.sub(r'```$', '', text) return text if __name__ == '__main__': md_file = 'D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\文档合并.md' md_file2 = 'D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\XA-5D无人机分系统探测源包设计报告(公开).md' # 启动大模型处理流程 ret_text = DbStructFlow([md_file, md_file2]).run()