import os
|
from datetime import datetime
|
|
from openai import OpenAI
|
from pathlib import Path
|
import re
|
import json
|
import copy
|
|
from datas import pkt_vc, pkt_datas, dev_pkt, proj_data
|
from db.db_generate import create_project, create_device, create_data_stream
|
from db.models import TProject, TDevice
|
|
BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1'
|
API_KEY = 'sk-15ecf7e273ad4b729c7f7f42b542749e'
|
MODEL_NAME = 'qwen-long'
|
|
assistant_msg = """
|
# 角色
|
你是一个专业的文档通信分析师,擅长进行文档分析和通信协议分析,同时能够解析 markdown 类型的文档。拥有成熟准确的文档阅读与分析能力,能够妥善处理多文档间存在引用关系的复杂情况。
|
|
## 技能
|
### 技能 1:文档分析(包括 markdown 文档)
|
1. 当用户提供文档时,仔细阅读文档内容,严格按照文档中的描述提取关键信息,不得加入自己的回答或建议。
|
2. 分析文档的结构、主题和重点内容,同样只依据文档进行表述。
|
3. 如果文档间存在引用关系,梳理引用脉络,明确各文档之间的关联,且仅呈现文档中体现的内容。
|
|
|
### 技能 2:通信协议分析
|
1. 接收通信协议相关信息,理解协议的规则和流程,仅依据所给信息进行分析。
|
|
## 目标导向
|
1. 通过对文档和通信协议的分析,为用户提供清晰、准确的数据结构,帮助用户更好地理解和使用相关信息。
|
2. 以 JSON 格式组织输出内容,确保数据结构的完整性和可读性。
|
|
## 规则
|
1. 每一个型号都会有一套文档,需准确判断是否为同一个型号的文档后再进行整体分析。
|
2. 每次只分析同一个型号。
|
3. 大多数文档结构为:型号下包含设备,设备下包含数据流,数据流下包含数据帧,数据帧中有一块是包域,包域中会挂载各种类型的数据包。
|
4. 这些文档都是数据传输协议的描述,在数据流、数据帧、数据包等传输实体中都描述了各个字段的分布和每个字段的大小,且大小单位不统一,需理解这些单位,并将所有输出单位统一为 bits,统一使用length表示。
|
5. 如果有层级,使用树形 JSON 输出,子节点 key 使用children;需保证相同类型的数据结构统一,并且判断每个层级是什么类型,输出类型字段,类型字段的 key 使用 type ;例如当前层级为字段时使用:type:"field";当前层级为设备时使用:type:"device"
|
6.名称相关的字段的 key 使用name;代号或者唯一标识相关的字段的key使用id;序号相关的字段的key使用number;其他没有举例的字段使用精简的翻译作为字段的key;
|
7.探测帧为CADU,其中包含同步头和VCDU,按照习惯需要使用VCDU层级包含下一层级中传输帧主导头、传输帧插入域、传输帧数据域、传输帧尾的结构
|
|
## 限制:
|
- 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。
|
- 不输出任何注释等描述性信息
|
|
"""
|
|
|
class DbStructFlow:
|
files = []
|
file_objects = []
|
|
def __init__(self, doc_files):
|
self.client = OpenAI(
|
api_key=API_KEY,
|
base_url=BASE_URL,
|
# api_key="ollama",
|
# base_url="http://192.168.1.48:11434/v1/",
|
)
|
if doc_files:
|
self.files = doc_files
|
self.load_file_objs()
|
self.delete_all_files()
|
self.upload_files()
|
|
def load_file_objs(self):
|
file_stk = self.client.files.list()
|
self.file_objects = file_stk.data
|
|
def delete_all_files(self):
|
for file_object in self.file_objects:
|
self.client.files.delete(file_object.id)
|
|
def upload_file(self, file_path):
|
file_object = self.client.files.create(file=Path(file_path), purpose="file-extract")
|
return file_object
|
|
def upload_files(self):
|
self.file_objects = []
|
for file_path in self.files:
|
file_object = self.upload_file(file_path)
|
self.file_objects.append(file_object)
|
|
def run(self):
|
# 生成型号结构
|
# 生成设备结构
|
# 生成数据流结构 CADU
|
# 生成VCDU结构
|
# 生成遥测数据包结构
|
proj = self.gen_project([])
|
# proj = TProject(C_PROJECT_PK='2e090a487c1a4f7f741be3a437374e2f')
|
|
devs = self.gen_device([], proj)
|
# with open('datas/设备列表.json', 'w', encoding='utf8') as f:
|
# json.dump(devs, f, ensure_ascii=False, indent=4)
|
#
|
# proj['devices'] = devs
|
#
|
# messages = []
|
# cadu = self.gen_tm_frame(messages)
|
# with open("datas/探测帧.json", 'w', encoding='utf8') as f:
|
# json.dump(cadu, f, ensure_ascii=False, indent=4)
|
#
|
# messages = []
|
# vcs = self.gen_vc(messages)
|
# with open('datas/虚拟信道.json', 'w', encoding='utf8') as f:
|
# json.dump(vcs, f, ensure_ascii=False, indent=4)
|
#
|
# messages = []
|
# pkt_vcs = self.gen_pkt_vc(messages)
|
# with open('datas/VC源包.json', 'w', encoding='utf8') as f:
|
# json.dump(pkt_vcs, f, ensure_ascii=False, indent=4)
|
#
|
# messages = []
|
# dev_pkts = self.gen_dev_pkts(messages)
|
# with open('datas/设备源包.json', 'w', encoding='utf8') as f:
|
# json.dump(dev_pkts, f, ensure_ascii=False, indent=4)
|
#
|
# messages = []
|
# _pkts = self.gen_pkts()
|
# pkts = []
|
# for pkt in _pkts:
|
# _pkt = self.gen_pkt_details(pkt['name'])
|
# pkts.append(_pkt)
|
# with open('datas/源包列表.json', 'w', encoding='utf8') as f:
|
# json.dump(pkts, f, ensure_ascii=False, indent=4)
|
#
|
# for dev in devs:
|
# ds = dev['data_streams'][0]
|
# _cadu = copy.deepcopy(cadu)
|
# ds['cadu'] = _cadu
|
# _vcdu = next(filter(lambda it: it['name'] == '传输帧', _cadu['children']))
|
# vcdu_data = next(filter(lambda it: it['name'] == '传输帧数据域', _vcdu['children']))
|
# _vcs = copy.deepcopy(vcs)
|
# vcdu_data['children'] = _vcs
|
# dev_pkt = next(filter(lambda it: it['name'] == dev['name'], dev_pkts), None)
|
# if dev_pkt is None:
|
# continue
|
# for pkt in dev_pkt['pkts']:
|
# for vc in _vcs:
|
# _pkt = next(
|
# filter(lambda it: it['name'] == pkt['name'] and it['vcs'].__contains__(vc['code']), pkt_vcs),
|
# None)
|
# if _pkt:
|
# if vc.__contains__('pkts') is False:
|
# vc['pkts'] = []
|
# _pkt = next(filter(lambda it: it['name'] == _pkt['name'], pkts), None)
|
# if _pkt:
|
# vc['pkts'].append(_pkt)
|
#
|
# with open("datas/型号.json", 'w', encoding='utf8') as f:
|
# json.dump(proj, f, ensure_ascii=False, indent=4)
|
return ''
|
|
def _gen(self, msgs, msg):
|
messages = [] if msgs is None else msgs
|
if len(messages) == 0:
|
# 如果是第一次提问加入文档
|
messages.append({'role': 'system', 'content': assistant_msg})
|
for file_object in self.file_objects:
|
messages.append({'role': 'system', 'content': 'fileid://' + file_object.id})
|
messages.append({'role': 'user', 'content': msg})
|
|
completion = self.client.chat.completions.create(
|
model=MODEL_NAME,
|
messages=messages,
|
stream=True,
|
temperature=0.0,
|
top_p=0,
|
timeout=30 * 60000,
|
max_completion_tokens=1000000
|
# stream_options={"include_usage": True}
|
)
|
|
text = ''
|
for chunk in completion:
|
if chunk.choices[0].delta.content is not None:
|
text += chunk.choices[0].delta.content
|
print(chunk.choices[0].delta.content, end="")
|
print("")
|
return text
|
|
def gen_project(self, messages):
|
_msg = f"""
|
根据文档输出型号信息,型号字段包括:名称和代号,仅输出型号的属性,不输出其他层级数据
|
"""
|
print('型号信息:')
|
text = self._gen(messages, _msg)
|
messages.append({'role': 'assistant', 'content': text})
|
text = self.remove_markdown(text)
|
proj_dict = json.loads(text)
|
# return proj_dict
|
code = proj_dict['id']
|
name = proj_dict['name']
|
proj = create_project(code, name, code, name, "", datetime.now())
|
return proj
|
|
def gen_device(self, messages, proj):
|
"""
|
设备列表生成规则:
|
1.如文档中有1553协议描述,加入1553设备
|
2.如是类SMU软件(遥测遥控包含,BC或者RT),加入对应相关设备,文档只有设备名称和设备ID,设备类型90%是标准类型
|
3.如是类RTU软件,加入对应相关设备,文档里面有设备名称和设备ID,同上
|
4.如基于软平台,如是SMU软件,加入SMU工控机设备,待定
|
|
设备类型:工控机[0]、1553B[1]
|
|
:param messages:
|
:return:
|
"""
|
proj_pk = proj.C_PROJECT_PK
|
devices = []
|
|
_msg = f"""
|
输出所有设备列表,设备字段包括名称(name)、代号(code),如果没有代号则使用名称的英文翻译缩写代替且缩写长度不超过5个字符,JSON格式,并且给每个设备增加三个字段,第一个字段hasTcTm“是否包含遥控遥测”,判断该设备是否包含遥控遥测的功能;第二个字段hasTemperatureAnalog“是否包含温度量、模拟量等数据的采集”,判断该设备是否包含温度量等信息的采集功能;第三个字段hasBus“是否是总线设备”,判断该设备是否属于总线设备,是否有RT地址;每个字段的值都使用true或false来表示。
|
仅输出JSON,不要输出JSON以外的任何字符。
|
"""
|
print('设备列表:')
|
text = self._gen(messages, _msg)
|
text = self.remove_markdown(text)
|
devs = json.loads(text)
|
hasBus = any(d['hasBus'] for d in devs)
|
if hasBus:
|
# 总线设备
|
dev = create_device("B1553", "1553总线", '1', 'StandardProCommunicationDev', proj_pk)
|
devices.append(dev)
|
# 创建数据流
|
ds_u153 = create_data_stream(proj_pk, dev.C_DEV_PK, 'ECSS上行总线数据', 'U153', 'B153', '0', 'E153', '001')
|
ds_d153 = create_data_stream(proj_pk, dev.C_DEV_PK, 'ECSS下行总线数据', 'D153', 'B153', '1', 'E153', '001')
|
|
# 类SMU设备,包含遥测和遥控功能,名称结尾为“管理单元”
|
like_smu_devs = list(filter(lambda it: it['hasTcTm'] and it['name'].endswith('管理单元'), devs))
|
for dev in like_smu_devs:
|
dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK)
|
devices.append(dev)
|
# 创建数据流
|
ds_tmfl = create_data_stream(proj_pk, dev.C_DEV_PK, 'AOS遥测', 'TMFL', 'TMFL', '1', 'TMFL', '001')
|
ds_tcfl = create_data_stream(proj_pk, dev.C_DEV_PK, '遥控指令', 'TCFL', 'TCFL', '0', 'TCFL', '006')
|
|
# 类RTU设备,包含温度量和模拟量功能,名称结尾为“接口单元”
|
like_rtu_devs = list(filter(lambda it: it['hasTemperatureAnalog'] and it['name'].endswith('接口单元'), devs))
|
for dev in like_rtu_devs:
|
dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK)
|
|
# for dev in like_rtu_devs:
|
# dev = create_device(dev['code'], dev['name'], '0', '', proj.C_PROJECT_PK)
|
# devices.append(dev)
|
# # 创建数据流
|
# ds_tmfl = create_data_stream(proj.C_PROJECT_PK, '温度量', 'TMFL', 'TMFL', '1', 'TMFL', '001')
|
# ds_tcfl = create_data_stream(proj.C_PROJECT_PK, '模拟量', 'TCFL', 'TCFL', '0', 'TCFL', '006')
|
|
print()
|
# 总线设备
|
# print('是否有总线设备:', end='')
|
# _msg = "文档中描述的有总线相关内容吗?仅回答:“有”或“无”,不要输出其他文本。"
|
# text = self._gen([], _msg)
|
# if text == "有":
|
# _msg = f"""
|
# 文档中描述的总线型号是多少,仅输出总线型号不要输出型号以外的其他任何文本,总线型号由数字和英文字母组成。
|
# """
|
# print('设备ID:')
|
# dev_code = self._gen([], _msg)
|
# dev = create_device(dev_code, dev_code, '1', '', proj.C_PROJECT_PK)
|
# devices.append(dev)
|
|
# 类SMU软件
|
# print('是否有类SMU设备:', end='')
|
# _msg = "文档中有描述遥测和遥控功能吗?仅回答:“有”或“无”,不要输出其他文本。"
|
# text = self._gen([], _msg)
|
# if text == "有":
|
# # 系统管理单元
|
# print('是否有系统管理单元(SMU):', end='')
|
# _msg = f"文档中有描述系统管理单元(SMU)吗?仅回答“有”或“无”,不要输出其他文本。"
|
# text = self._gen([], _msg)
|
# if text == "有":
|
# dev = create_device("SMU", "系统管理单元", '0', '', proj.C_PROJECT_PK)
|
# devices.append(dev)
|
# # 中心控制单元(CTU)
|
# print('是否有中心控制单元(CTU):', end='')
|
# _msg = f"文档中有描述中心控制单元(CTU)吗?仅回答“有”或“无”,不要输出其他文本。"
|
# text = self._gen([], _msg)
|
# if text == "有":
|
# dev = create_device("CTU", "中心控制单元", '0', '', proj.C_PROJECT_PK)
|
# devices.append(dev)
|
#
|
# # 类RTU
|
# print('是否有类RTU设备:', end='')
|
# _msg = "文档中有描述模拟量采集和温度量采集功能吗?仅回答:“有”或“无”,不要输出其他文本。"
|
# text = self._gen([], _msg)
|
# if text == "有":
|
# dev = create_device("RTU", "远置单元", '0', '', proj.C_PROJECT_PK)
|
# devices.append(dev)
|
# device_dicts = json.loads(text)
|
# for device_dict in device_dicts:
|
# data_stream = {'name': '数据流', 'code': 'DS'}
|
# device_dict['data_streams'] = [data_stream]
|
#
|
# return device_dicts
|
return devices
|
|
def gen_tm_frame(self, messages):
|
_msg = f"""
|
输出探测帧的结构,探测帧字段包括:探测帧代号(id)、探测帧名称(name)、长度(length)、下级数据单元列表(children)。代号如果没有则用名称的英文翻译,包括下级数据单元。
|
"""
|
print('探测帧信息:')
|
text = self._gen(messages, _msg)
|
messages.append({'role': 'assistant', 'content': text})
|
text = self.remove_markdown(text)
|
cadu = json.loads(text)
|
|
return cadu
|
|
def gen_vc(self, messages):
|
_msg = f"""
|
输出探测虚拟信道的划分,不需要描述信息,使用一个数组输出,字段包括:代号(code)、vcid、名称(name)。
|
"""
|
print('虚拟信道:')
|
text = self._gen(messages, _msg)
|
messages.append({'role': 'assistant', 'content': text})
|
text = self.remove_markdown(text)
|
vcs = json.loads(text)
|
return vcs
|
|
def gen_dev_pkts(self, messages):
|
_msg = f"""
|
输出文档中探测源包类型定义描述的设备以及设备下面的探测包,数据结构:最外层为设备列表 > 探测包列表(pkts),设备字段包括:名称(name)、代号(id),源包字段包括:名称(name)、代号(id)
|
"""
|
print('设备探测源包信息:')
|
file = next(filter(lambda it: it.filename == 'XA-5D无人机分系统探测源包设计报告(公开).md', self.file_objects),
|
None)
|
messages = [{'role': 'system', 'content': assistant_msg}, {'role': 'system', 'content': 'fileid://' + file.id}]
|
text = self._gen(messages, _msg)
|
messages.append({'role': 'assistant', 'content': text})
|
text = self.remove_markdown(text)
|
dev_pkts = json.loads(text)
|
return dev_pkts
|
|
def gen_pkt_details(self, pkt_name):
|
_msg = f"""
|
输出文档中描述的“{pkt_name}”探测包。
|
探测包字段包括:名称(name)、代号(id)、包头属性列表(headers)、数据域参数列表(datas),
|
包头属性字段包括:位置(pos)、名称(name)、代号(id)、定义(val),
|
数据域参数字段包括:位置(pos)、名称(name)、代号(id)、字节顺序(byteOrder),
|
如果没有代号用名称的英文翻译代替,如果没有名称用代号代替,
|
输出内容仅输出json,不要输出任何其他内容!
|
"""
|
print(f'探测源包“{pkt_name}”信息:')
|
file = next(filter(lambda it: it.filename == 'XA-5D无人机分系统探测源包设计报告(公开).md', self.file_objects),
|
None)
|
messages = [{'role': 'system', 'content': assistant_msg}, {'role': 'system', 'content': 'fileid://' + file.id}]
|
text = self._gen(messages, _msg)
|
messages.append({'role': 'assistant', 'content': text})
|
text = self.remove_markdown(text)
|
pkt = json.loads(text)
|
return pkt
|
|
def gen_pkts(self):
|
_msg = f"""
|
输出文档中描述的探测包。
|
探测包字段包括:名称(name)、代号(id),
|
如果没有代号用名称的英文翻译代替,如果没有名称用代号代替,
|
顶级结构直接从探测包开始,不包括探测包下面的参数。
|
"""
|
print(f'探测源包列表:')
|
file = next(
|
filter(lambda it: it.filename == 'XA-5D无人机分系统探测源包设计报告(公开).md', self.file_objects),
|
None)
|
messages = [{'role': 'system', 'content': assistant_msg},
|
{'role': 'system', 'content': 'fileid://' + file.id}]
|
text = self._gen(messages, _msg)
|
messages.append({'role': 'assistant', 'content': text})
|
text = self.remove_markdown(text)
|
pkt = json.loads(text)
|
return pkt
|
|
def gen_pkt_vc(self, messages):
|
_msg = f"""
|
根据探测源包下传时机定义,输出各个探测源包信息列表,顶级结构为数组元素为探测源包,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs),下传时机(timeTags)
|
"""
|
print('探测源包所属虚拟信道:')
|
text = self._gen(messages, _msg)
|
messages.append({'role': 'assistant', 'content': text})
|
text = self.remove_markdown(text)
|
pkt_vcs = json.loads(text)
|
return pkt_vcs
|
|
def remove_markdown(self, text):
|
# 去掉开头的```json
|
text = re.sub(r'^```json', '', text)
|
# 去掉结尾的```json
|
text = re.sub(r'```$', '', text)
|
return text
|
|
|
if __name__ == '__main__':
|
md_file = 'D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\文档合并.md'
|
md_file2 = 'D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\XA-5D无人机分系统探测源包设计报告(公开).md'
|
# 启动大模型处理流程
|
ret_text = DbStructFlow([md_file, md_file2]).run()
|