import asyncio
import math
import os
import subprocess
import time
from datetime import datetime
from openai import OpenAI
import re
import json
from langchain_community.chat_models import ChatOpenAI
from langchain_core.prompts import HumanMessagePromptTemplate, SystemMessagePromptTemplate
import textwrap
import data_templates
from knowledgebase import utils
from knowledgebase.db.db_helper import create_project, create_device, create_data_stream, \
update_rule_enc, create_extend_info, create_ref_ds_rule_stream, create_ins_format, make_attr, init_db_helper
from knowledgebase.db.data_creator import create_prop_enc, create_enc_pkt, get_data_ty, create_any_pkt
from knowledgebase.db.models import TProject, init_base_db
from knowledgebase.db.doc_db_helper import doc_dbh
from knowledgebase.llm import llm
# BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1'
# API_KEY = 'sk-15ecf7e273ad4b729c7f7f42b542749e'
# MODEL_NAME = 'qwen2.5-72b-instruct'
# BASE_URL = 'http://10.74.15.171:11434/v1/'
# API_KEY = 'ollama'
# MODEL_NAME = 'qwen2.5:72b-instruct'
# BASE_URL = 'http://chat.com/api'
# API_KEY = 'sk-49457e83f734475cb4cf7066c649d563'
# MODEL_NAME = 'qwen2.5:72b-120k'
BASE_URL = 'http://10.74.15.171:8000/v1'
API_KEY = 'EMPTY'
# MODEL_NAME = 'QwQ:32b'
MODEL_NAME = 'Qwen2.5-72B-Instruct-AWQ'
# MODEL_NAME = 'qwen2.5:72b-instruct'
USE_CACHE = True
assistant_msg = """
你是一名资深的软件工程师。
"""
#
# ## 技能
# ### 技能 1:通信协议分析
# 1. 接收通信协议相关信息,理解协议的规则和流程,仅依据所给信息进行分析。
#
# ## 目标导向
# 1. 通过对文档和通信协议的分析,为用户提供清晰、准确的数据结构,帮助用户更好地理解和使用相关信息。
#
# ## 限制:
# - 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。
# - 不输出任何注释等描述性信息。
tc_system_msg = """
# 角色
你是一个资深软件工程师。
# 约束
- 输出内容根据文档内容输出。
"""
g_completion = None
def read_from_file(cache_file):
with open(cache_file, 'r', encoding='utf-8') as f:
text = f.read()
return text
def save_to_file(text, file_path):
if USE_CACHE:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(text)
def remove_think_tag(text):
pattern = r'(.|\n)*?'
result = re.sub(pattern, '', text)
return result
json_pat = re.compile(r'```json(.*?)```', re.DOTALL)
def get_json_text(text):
# 使用正则表达式提取json文本
try:
return json_pat.findall(text)[0]
except IndexError:
return text
def rt_pkt_map_gen(pkt, trans_ser, rt_pkt_map, pkt_id, vals, pkts: list):
# 逻辑封装包,数据块传输的只有一个,取数的根据RT地址、子地址和帧号划分
frame_num = pkt['frameNum']
if trans_ser == '数据块传输':
# 数据块传输根据RT地址和子地址划分
key = f'{pkt["rt"]}_{pkt["subAddr"]}'
name = f'{pkt["rt"]}_{pkt["subAddr"]}_{trans_ser}'
else:
# 取数根据RT地址、子地址和帧号划分
key = f'{pkt["rt"]}_{pkt["subAddr"]}_{pkt["frameNum"]}'
name = f'{pkt["rt"]}_{pkt["subAddr"]}_帧号{frame_num}_{trans_ser}'
#
if key not in rt_pkt_map:
rt_pkt_map[key] = {
"name": name,
"id": pkt_id,
"type": "logic",
"pos": 0,
"content": "CYCLEBUFFER,Message,28,0xFFFF",
"length": "",
"vals": vals,
"children": []
}
frame = f'{pkt["frameNum"]}'
interval = f'{pkt["interval"]}'.replace(".", "_")
if trans_ser == '取数':
# 取数忽略周期
# _key = f'RT{pkt["rtAddr"]}Frame{frame.replace("|", "_")}_Per{interval}'
_key = f'RT{pkt["rtAddr"]}Frame{frame.replace("|", "_")}'
else:
# 数据块传输
if pkt['burst']:
_key = f'RT{pkt["rtAddr"]}FrameALL'
else:
_key = f'RT{pkt["rtAddr"]}Frame{frame}Per{interval}'
_pkt = next(filter(lambda it: it['name'] == _key, rt_pkt_map[key]['children']), None)
if _pkt is None:
ext_info = None
if trans_ser == '数据块传输' and not pkt['burst']:
# 数据块传输且有周期的包需要
ext_info = [{"id": "PeriodTriger", "name": "时分复用总线触发属性", "val": f"{pkt['interval']}"},
{"id": "FrameNumber", "name": "时分复用协议帧号", "val": frame}]
_pkt = {
"name": _key,
"id": _key,
"type": "enc",
"pos": 0,
"content": "1:N;EPDU",
"length": "length",
"extInfo": ext_info,
"children": [
{
"id": "C02_ver",
"name": "遥测版本",
"type": "para",
"pos": 0,
"length": 3,
"dataTy": "INVAR",
"content": "0"
},
{
"id": "C02_type",
"name": "类型",
"type": "para",
"pos": 3,
"length": 1,
"dataTy": "INVAR",
"content": "0"
},
{
"id": "C02_viceHead",
"name": "副导头标识",
"type": "para",
"pos": 4,
"length": 1,
"content": "1",
"dataTy": "INVAR"
},
{
"id": "C02_PackSign",
"name": "APID",
"type": "para",
"pos": 5,
"length": 11,
"is_key": True,
"dataTy": "ENUM"
},
{
"id": "C02_SerCtr_1",
"name": "序列标记",
"type": "para",
"pos": 16,
"length": 2,
"content": "3"
},
{
"id": "C02_SerCtr_2",
"name": "包序计数",
"type": "para",
"pos": 18,
"length": 14,
"content": "0:167772:1",
"dataTy": "INCREASE"
},
{
"id": "C02_PackLen",
"name": "包长",
"type": "para",
"pos": 32,
"length": 16,
"content": "1Bytes/C02_Data.length+1",
"dataTy": "LEN"
},
{
"id": "C02_Ser",
"name": "服务",
"type": "para",
"pos": 48,
"length": 8,
"is_key": True,
"dataTy": "ENUM"
},
{
"id": "C02_SubSer",
"name": "子服务",
"type": "para",
"pos": 56,
"length": 8,
"is_key": True,
"dataTy": "ENUM"
},
{
"id": "C02_Data",
"name": "数据区",
"type": "linear",
"pos": 64,
"length": 'length-current',
"children": []
},
]
}
rt_pkt_map[key]['children'].append(_pkt)
# 数据区下面的包
data_area = next(filter(lambda it: it['name'] == '数据区', _pkt['children']), None)
ser_sub_ser: str = pkt['service']
ser = ''
sub_ser = ''
if ser_sub_ser:
nums = re.findall(r'\d+', ser_sub_ser)
if len(nums) == 2:
ser = nums[0]
sub_ser = nums[1]
if 'children' not in pkt:
pkt['children'] = []
p_name = pkt['id'] + '_' + pkt['name']
data_area['children'].append({
"name": p_name,
"id": pkt["id"],
"type": "linear",
"pos": 0,
"length": pkt["length"],
"vals": f"0x{pkt['apid']}/{ser}/{sub_ser}/",
"children": pkt['children'],
})
def build_vcid_content(vcs):
_vcs = []
for vc in vcs:
_vcs.append(vc['name'] + ',' + vc['VCID'])
return ' '.join(_vcs)
class DbStructFlow:
json_path = ''
# 工程
proj: TProject = None
# 遥测源包列表,仅包名称、包id和hasParams
tm_pkts = []
# vc源包
vc_pkts = []
def __init__(self, project_path: str):
self.client = OpenAI(
api_key=API_KEY,
base_url=BASE_URL,
# api_key="ollama",
# base_url="http://192.168.1.48:11434/v1/",
)
self.json_path = f'{project_path}/json'
self.db_dir = f'{project_path}/db'
os.makedirs(f"{self.json_path}", exist_ok=True)
os.makedirs(f"{self.json_path}/pkts", exist_ok=True)
os.makedirs(f"{self.db_dir}", exist_ok=True)
init_base_db(f'{self.db_dir}/db.db')
init_db_helper()
# self.llm = ChatOpenAI(model=MODEL_NAME, temperature=0, api_key=API_KEY, base_url=BASE_URL)
async def run(self):
# 生成型号结构
# 生成设备结构
# 生成数据流结构 CADU
# 生成VCDU结构
# 生成遥测数据包结构
self.proj = self.gen_project()
tasks = []
tasks.append(self.gen_device(self.proj))
tasks.append(self.gen_tc())
# 测试位置计算
# print(self.handle_pos("Byte1_B0~Byte1_B0"))
# print(self.handle_pos("Byte0_B0~Byte0_B7"))
# print(self.handle_pos("Byte9_B0~Byte9_B7"))
await asyncio.gather(*tasks)
return ''
def handle_pos(self, srt):
pos_data = {
"start": 0,
"end": 0
}
pos = srt.split("~")
for index, p in enumerate(pos):
byte = p.split('_')
for b in byte:
if b.find("Byte") > -1:
value = b.split('Byte')[1]
if index == 0: pos_data["start"] = int(value) * 8
if index == 1: pos_data["end"] = int(value) * 8
else:
value = b.split('B')[1]
if index == 0: pos_data["start"] += int(value)
if index == 1: pos_data["end"] += int(value)
return {
"pos": pos_data["start"],
"length": pos_data["end"] - pos_data["start"] + 1,
}
def get_text_with_entity(self, entity_names: list[str]) -> str:
"""
根据实体词获取文档文本
:param entity_names: str - 实体词名称
:return: str - 文本内容
"""
return doc_dbh.get_text_with_entities(entity_names)
def get_text_list_with_entity(self, entity_names: list[str]) -> str:
"""
根据实体词获取文档文本列表
:param entity_names: 实体词列表
:return: [str] - 文本列表
"""
return doc_dbh.get_texts_with_entities(entity_names)
def _gen(self, msgs, msg, doc_text):
# if files is None:
# files = [file_map['文档合并']]
messages = [] if msgs is None else msgs
# doc_text = ''
# for file in files:
# doc_text += '\n' + read_from_file(file)
# 去除多余的缩进
msg = textwrap.dedent(msg).strip()
if len(messages) == 0:
# 如果是第一次提问加入system消息
messages.append({'role': 'system', 'content': assistant_msg})
messages.append({'role': 'user', 'content': "以下是文档内容:\n" + doc_text})
messages.append({'role': 'user', 'content': msg})
text = ''
for ai_msg in llm.stream(messages):
text += ai_msg.content
print(ai_msg.content, end='')
print('')
# completion = self.client.chat.completions.create(
# model=MODEL_NAME,
# messages=messages,
# stream=True,
# temperature=0,
# # top_p=0,
# timeout=30 * 60000,
# max_completion_tokens=32000,
# seed=0
# # stream_options={"include_usage": True}
# )
# g_completion = completion
# text = ''
# for chunk in completion:
# if chunk.choices[0].delta.content is not None:
# text += chunk.choices[0].delta.content
# print(chunk.choices[0].delta.content, end="")
# print("")
# g_completion = None
return text
def generate_text(self, msg, cache_file, msgs=None, doc_text="", validation=None, try_cnt=5, json_text=False):
if msgs is None:
msgs = []
if USE_CACHE and os.path.isfile(cache_file):
text = read_from_file(cache_file)
else:
s = time.time()
text = self._gen(msgs, msg, doc_text)
text = remove_think_tag(text)
if json_text:
text = get_json_text(text)
if validation:
try:
validation(text)
except BaseException as e:
print(e)
if try_cnt <= 0:
raise RuntimeError('生成失败,重试次数太多,强制结束!')
return self.generate_text_json(msg, cache_file, msgs, doc_text, validation, try_cnt - 1)
if cache_file:
save_to_file(text, cache_file)
print(f'耗时:{time.time() - s}')
return text
def generate_text_json(self, msg, cache_file, msgs=None, doc_text="", validation=None, try_cnt=5):
return self.generate_text(msg, cache_file, msgs, doc_text, validation, try_cnt, True)
def generate_tc_text(self, msg, cache_file, msgs=None, doc_text=None, validation=None, try_cnt=5):
msgs = [
{'role': 'system', 'content': tc_system_msg},
{'role': 'user', 'content': "以下是文档内容:\n" + doc_text}]
return self.generate_text(msg, cache_file, msgs, doc_text, validation, try_cnt, True)
def gen_project(self):
_msg = """
根据文档内容输出卫星的型号信息,输出字段包括:卫星的型号名称和卫星的型号代号。注意:如果没有单独描述型号名称或者型号代号,那么型号名称和型号代号是相同的,并且只输出一个层级。如果型号代号中有符号也要输出,保证输出完整。
例如:{"name":"xxx","id":"xxx"}
"""
print('型号信息:')
doc_text = self.get_text_with_entity(['系统概述'])
text = self.generate_text_json(_msg, f'{self.json_path}/型号信息.json', doc_text=doc_text)
proj_dict = json.loads(text)
code = proj_dict['id']
name = proj_dict['name']
proj = create_project(code, name, code, name, "", datetime.now())
return proj
async def gen_device(self, proj):
"""
设备列表生成规则:
1.如文档中有1553协议描述,加入1553设备
2.如是类SMU软件(遥测遥控包含,BC或者RT),加入对应相关设备,文档只有设备名称和设备ID,设备类型90%是标准类型
3.如是类RTU软件,加入对应相关设备,文档里面有设备名称和设备ID,同上
4.如基于软平台,如是SMU软件,加入SMU工控机设备,待定
设备类型:工控机[0]、1553B[1]
:param proj:
:return:
"""
proj_pk = proj.C_PROJECT_PK
devices = []
_msg = """
# 角色
你是一名资深软件工程师。
# 指令
我需要从文档提取设备列表信息,你要帮助我完成设备列表信息提取。
# 需求
输出分系统下的硬件产品(设备)列表,硬件产品名称一般会包含“管理单元”或者“接口单元”;
# 字段包括:
- 名称(name):设备名称;
- 代号(code):设备代号;
- 是否包含遥控遥测(hasTcTm):标识该硬件产品是否包含遥控遥测的功能,布尔值true或false;
- 是否包含温度量模拟量等数据的采集(hasTemperatureAnalog):标识该硬件产品是否包含温度量等信息的采集功能,布尔值true或false;
- 是否有总线硬件产品(hasBus):标识该设备是否属于总线硬件产品,是否有RT地址,布尔值true或false;
# 约束
- 如果没有代号则使用名称的英文缩写代替缩写长度不超过5个字符;
- 数据结构最外层为数组,数组元素为设备信息
- 仅输出JSON,不要输出JSON以外的任何字符。
# 例子
[
{
"name": "系统管理单元",
"code": "SMU",
"hasTcTm": true,
"hasTemperatureAnalog": false,
"hasBus": true
},
{
"name": "1553B总线",
"code": "1553",
"hasTcTm": true,
"hasTemperatureAnalog": true,
"hasBus": true
}
]
"""
print('设备列表:')
cache_file = f'{self.json_path}/设备列表.json'
def validation(gen_text):
_devs = json.loads(gen_text)
assert isinstance(_devs, list), '数据结构最外层不是数组'
assert next(filter(lambda it: it['name'].endswith('管理单元'), _devs), None), '生成的设备列表中没有管理单元'
doc_text = self.get_text_with_entity(['系统概述', '总线管理'])
text = self.generate_text_json(_msg, cache_file, doc_text=doc_text, validation=validation)
devs = json.loads(text)
# 类SMU设备,包含遥测和遥控功能,名称结尾为“管理单元”
like_smu_devs = list(filter(lambda it: it['hasTcTm'] and it['name'].endswith('管理单元'), devs))
tasks = []
for dev in like_smu_devs:
dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK)
devices.append(dev)
# 创建数据流
ds_tmfl, rule_stream, _ = create_data_stream(proj_pk, dev.C_DEV_PK, 'AOS遥测', 'TMF1', 'TMFL', '1', 'TMF1',
'001')
task = self.gen_tm_frame(proj_pk, rule_stream.C_RULE_PK, ds_tmfl, rule_stream.C_PATH)
tasks.append(task)
# ds_tcfl, rule_stream, _ = create_data_stream(proj_pk, dev.C_DEV_PK, '遥控指令', 'TCFL', 'TCFL', '0', 'TCFL',
# '006')
has_bus = any(d['hasBus'] for d in devs)
if has_bus:
# 总线设备
dev = create_device("1553", "1553总线", '1', 'StandardProCommunicationDev', proj_pk)
create_extend_info(proj_pk, "BusType", "总线类型", "ECSS_Standard", dev.C_DEV_PK)
devices.append(dev)
# 创建数据流
ds_u153, rs_u153, rule_enc = create_data_stream(proj_pk, dev.C_DEV_PK, '上行总线数据', 'U15E', 'B153',
'0', '1553', '001')
# 创建总线结构
task = self.gen_bus(proj_pk, rule_enc, '1553', ds_u153, rs_u153.C_PATH, dev.C_DEV_NAME)
tasks.append(task)
await asyncio.gather(*tasks)
ds_d153, rule_stream, rule_enc = create_data_stream(proj_pk, dev.C_DEV_PK, '下行总线数据', 'D15E', 'B153',
'1', '1553', '001', rs_u153.C_RULE_PK)
create_ref_ds_rule_stream(proj_pk, rule_stream.C_STREAM_PK, rule_stream.C_STREAM_ID,
rule_stream.C_STREAM_NAME, rule_stream.C_STREAM_DIR, rs_u153.C_STREAM_PK)
else:
await asyncio.gather(*tasks)
# 类RTU设备,包含温度量和模拟量功能,名称结尾为“接口单元”
# like_rtu_devs = list(filter(lambda it: it['hasTemperatureAnalog'] and it['name'].endswith('接口单元'), devs))
# for dev in like_rtu_devs:
# dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK)
# for dev in like_rtu_devs:
# dev = create_device(dev['code'], dev['name'], '0', '', proj.C_PROJECT_PK)
# devices.append(dev)
# # 创建数据流
# ds_tmfl = create_data_stream(proj.C_PROJECT_PK, '温度量', 'TMFL', 'TMFL', '1', 'TMFL', '001')
# ds_tcfl = create_data_stream(proj.C_PROJECT_PK, '模拟量', 'TCFL', 'TCFL', '0', 'TCFL', '006')
return devices
def gen_insert_domain_params(self):
_msg = """
# 指令
我需要从文档中提取插入域的参数列表,你要帮助我完成插入域参数列表的提取。
# 需求
参数信息字段包括:name(参数名称)、id(参数代号)、pos(参数位置)、type(类型:para)。
# 要求
1个字节的长度为8位,使用B0-B7来表示,请精确计算参数长度。
位置信息转换为通用格式"Byte1_B6~Byte2_B0"进行输出,如果缺少内容要进行补全,例如:"Byte1_B0~B2" 转换为 "Byte1_B0~Byte1_B2"。例如:"Byte1~Byte2" 转换为 "Byte1_B0~Byte2_B7"。例如:"Byte1_B5" 转换为 "Byte1_B5~Byte1_B5"。
# 输出示例
[
{
"name": "遥测模式字",
"id": "TMS215",
"pos": Byte0_B0~Byte0_B7,
"type": "para"
}
]
"""
print('插入域参数列表:')
def validation(gen_text):
params = json.loads(gen_text)
assert isinstance(params, list), '插入域参数列表数据结构最外层必须是数组'
assert len(params), '插入域参数列表不能为空'
doc_text = self.get_text_with_entity(['插入域'])
text = self.generate_text_json(_msg, f'{self.json_path}/插入域参数列表.json', doc_text=doc_text, validation=validation)
json_list = json.loads(text)
for j in json_list:
if j['pos'] is not None:
pos_data = self.handle_pos(j['pos'])
j['pos'] = pos_data['pos']
j['length'] = pos_data['length']
return json_list
async def get_pkt_details(self, _pkt, vc):
_pkt = await self.gen_pkt_details(_pkt['name'], _pkt['id'])
epdu = next(filter(lambda it: it['name'] == '数据域', vc['children']), None)
if epdu and _pkt:
_pkt['children'] = _pkt['datas']
# todo 当数据包获取到东西但不是参数时,获取到的包结构有问题,需要过滤
_pkt['length'] = 0
_pkt['pos'] = 0
if len(_pkt['children']) > 0:
_last_par = _pkt['children'][len(_pkt['children']) - 1]
_pkt['length'] = (_last_par['pos'] + _last_par['length'])
if 'children' not in epdu:
epdu['children'] = []
# 添加解析规则后缀防止重复
_pkt['id'] = _pkt['id'] + '_' + vc['VCID']
# 给包名加代号前缀
if not _pkt['name'].startswith(_pkt['id']):
_pkt['name'] = _pkt['id'] + '_' + _pkt['name']
epdu['children'].append(_pkt)
apid_node = next(filter(lambda it: it['name'].__contains__('应用过程'), _pkt['headers']), None)
ser_node = next(filter(lambda it: it['name'] == '服务', _pkt['headers']), None)
sub_ser_node = next(filter(lambda it: it['name'] == '子服务', _pkt['headers']), None)
apid = ''
service = ''
sub_service = ''
if apid_node and apid_node['content']:
apid = apid_node['content']
if ser_node and ser_node['content']:
service = f"{int(ser_node['content'], 16)}"
if sub_ser_node and sub_ser_node['content']:
sub_service = f"{int(sub_ser_node['content'], 16)}"
_pkt['vals'] = \
f"{apid}/{service}/{sub_service}/"
async def gen_tm_frame(self, proj_pk, rule_pk, ds, name_path):
# 插入域参数列表
insert_domain = self.gen_insert_domain_params()
# VC源包格式
vc_pkt_fields = data_templates.vc_pkt_fields # self.gen_pkt_format()
# 获取虚拟信道 vc
vcs = self.gen_vc()
for vc in vcs:
vc['children'] = []
vc['VCID'] = str(int(vc['VCID'], 2))
for field in vc_pkt_fields:
if field['name'] == '数据域':
field['children'] = []
vc['children'].append(dict(field))
# VCID 字段内容
vcid_content = build_vcid_content(vcs)
# 遥测帧结构由模板生成,只需提供特定参数
tm_data = {
"vcidContent": vcid_content,
'insertDomain': insert_domain,
}
cadu = data_templates.get_tm_frame(tm_data)
# VC源包
# 遥测源包设计中的源包列表
self.vc_pkts = await self.gen_pkt_vc() # ,self.tm_pkts = self.gen_pkts()
# 处理VC下面的遥测包数据
tasks = []
for vc in vcs:
# 此VC下的遥测包过滤
_vc_pkts = list(filter(lambda it: vc['id'] in it['vcs'], self.vc_pkts))
for _pkt in _vc_pkts:
# 判断遥测包是否有详细定义
# if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], self.tm_pkts), None):
# continue
# 获取包详情
ret = self.get_pkt_details(_pkt, vc)
tasks.append(ret)
if len(tasks):
await asyncio.gather(*tasks)
# 重新计数起始偏移
self.compute_length_pos(cadu['children'])
# 将数据插入数据库
seq = 1
for cadu_it in cadu['children']:
if cadu_it['name'] == 'VCDU':
# VCDU
# 将信道替换到数据域位置
vc_data = next(filter(lambda it: it['name'].__contains__('数据域'), cadu_it['children']), None)
if vc_data:
idx = cadu_it['children'].index(vc_data)
cadu_it['children'].pop(idx)
for vc in vcs:
# 处理虚拟信道属性
vc['type'] = 'logic'
vc['length'] = vc_data['length']
vc['pos'] = vc_data['pos']
vc['content'] = 'CCSDSMPDU'
vcid = vc['VCID']
vc['condition'] = f'VCID=={vcid}'
# 将虚拟信道插入到VCDU
cadu_it['children'].insert(idx, vc)
idx += 1
for vc in vcs:
self.compute_length_pos(vc['children'])
# 设置VCID的content
vcid_node = next(filter(lambda it: it['name'].__contains__('VCID'), cadu_it['children']), None)
if vcid_node:
vcid_node['content'] = vcid_content
create_enc_pkt(proj_pk, rule_pk, cadu_it, rule_pk, seq, name_path, ds, '001', 'ENC')
else:
# 参数
create_prop_enc(proj_pk, rule_pk, cadu_it, get_data_ty(cadu_it), seq)
seq += 1
return cadu
def gen_vc(self):
_msg = """
#角色
你是一名资深的软件工程师。
#指令
我需要从文档中提取虚拟信道列表,你要帮助我完成虚拟信道列表的提取。
#需求
请分析文档中的遥测包格式以及遥测虚拟信道,输出遥测虚拟信道列表。
字段包括:id(虚拟信道代号)、name(虚拟信道名称)、VCID(虚拟信道VCID,二进制)、format(根据虚拟信道类型获取对应的数据包的格式的名称)
#上下文
深入理解文档中描述的关系,例如:文档中描述了常规遥测是常规数据的下传信道,并且还描述了分系统常规遥测参数包就是实时遥测参数包,并且文档中对实时遥测参数包的格式进行了描述,所以常规遥测VC应该输出为:{"id": "1", "name": "常规遥测VC", "VCID": "0", "format": "实时遥测参数包"}
#约束
- 数据结构最外层为数组,数组元素为虚拟信道信息;
- format:必须是数据包格式的名称;
- 仅输出JSON文本。
#例子:
[
{
"id": "VC0",
"name": "空闲信道",
"VCID": "111111",
"format": "空闲包"
}
]
"""
def validation(gen_text):
vcs = json.loads(gen_text)
assert next(filter(lambda it: re.match('^[0-1]+$', it['VCID']), vcs)), '生成的VCID必须是二进制'
print('虚拟信道:')
doc_text = self.get_text_with_entity(['虚拟信道定义'])
text = self.generate_text_json(_msg, f"{self.json_path}/虚拟信道.json", doc_text=doc_text,
validation=validation)
vcs = json.loads(text)
return vcs
async def gen_pkt_details(self, pkt_name, pkt_id):
cache_file = f"{self.json_path}/数据包-{utils.to_file_name(pkt_name)}.json"
doc_text = self.get_text_with_entity([pkt_id])
pkt = {
"name": pkt_name,
"id": pkt_id,
"type": "linear",
"headers": [],
"datas": [],
}
if doc_text == '':
return pkt
print(f'遥测源包“{pkt_name}”信息:')
# 1. 获取包头和参数列表
# 2. 遍历包头和参数列表,获取bit位置和长度,规范代号并生成,生成byteOrder
async def get_header_params(_pkt_name, _doc_text: str):
_msg = ("""
# 需求
提取文档中描述的遥测包包头信息。
包头信息包括:包版本号(Ver)、包类型(Type)、副导头标识(Subheader)、应用过程标识(apid)、序列标记(SequenceFlag)、包序列计数(SequenceCount)、包长(PacketLength)、服务(Service)、子服务(SubService)信息。
服务、子服务:一般在表格中的包头区域提取,如果表格中没有包头区域只有数据域则在标题中提取,例如:“在轨维护遥测包(APID=0x384) (3,255)”其中服务是3子服务是255;
表格单元格合并说明:包格中存在单元格合并的情况,如果水平或垂直相邻的单元格内容一样那么这几个内容一样的单元格有可能是一个合并单元格在分析时应该当作合并单元格分析;
输出json,不要有注释。
# 输出例子
```json
{
"Ver": "000",
"Type": "0",
"Subheader": "1",
"apid": "0",
"SequenceFlag": "11",
"SequenceCount": "00000000000000",
"PacketLength": "1",
"Service": "03",
"SubService": "FF"
}
```""")
# 截取前70行
_doc_text = '\n'.join(_doc_text.splitlines()[0:100])
tpl = os.path.dirname(__file__) + "/tpl/tm_pkt_headers_yg.json"
tpl_text = utils.read_from_file(tpl)
_cache_file = f"{self.json_path}/数据包-{utils.to_file_name(pkt_name)}-包头参数.json"
_text = await asyncio.to_thread(self.generate_text_json, _msg, _cache_file, [], _doc_text, None)
result = json.loads(_text)
if re.match(r'^(0x)?[01]{11}$', result['apid']):
result['apid'] = hex(int(re.sub('0x', '', result['apid']), 2))
for k in result:
tpl_text = tpl_text.replace("{{" + k + "}}", result[k])
return json.loads(tpl_text)
async def get_data_area_params(_pkt_name, _doc_text: str):
_msg = ("""
# 指令
我需要从文档中提取遥测源包的参数信息列表,你要帮我完成遥测源包的参数信息的提取。
# 需求
提取文档中描述的遥测包数据域中的所有参数,以及参数的位置、名称、代号信息,输出的信息要与文档中的文本要一致,不要遗漏任何参数。
如果文档中没有参数表则输出空数组。
严格按照输出示例中的格式输出,仅输出json。
# 要求
1个字节的长度为8位,使用B0-B7来表示。
所有位置信息需要转换为要求格式"Byte1_B6~Byte2_B0"进行输出,如果与要求格式不同的要进行补全或转换,例如:"Byte1_B0~B2" 转换为 "Byte1_B0~Byte1_B2"。例如:"Byte1~Byte2" 转换为 "Byte1_B0~Byte2_B7"。例如:"Byte1_B5" 转换为 "Byte1_B5~Byte1_B5"。
# 输出示例
```json
[
{
"posText": "Byte1_B6~Byte2_B0",
"name": "xxx",
"id": "xxxxxx"
}
]
```
# 没有参数时的输出示例
```json
[]
```""")
_cache_file = f"{self.json_path}/数据包-{utils.to_file_name(pkt_name)}-参数列表.json"
if utils.file_exists(_cache_file):
return json.loads(utils.read_from_file(_cache_file))
title_line = _doc_text.splitlines()[0]
tables = re.findall(r"```json(.*)```", _doc_text, re.DOTALL)
if tables:
table_text = tables[0]
table_blocks = []
if len(table_text)>50000:
table = json.loads(table_text)
header:list = table[0:20]
for i in range(math.ceil((len(table)-20)/200)):
body = table[20 + i * 200:20 + (i + 1) * 200]
block = []
block.extend(header)
block.extend(body)
table_blocks.append(f"{title_line}\n```json\n{json.dumps(block,indent=2,ensure_ascii=False)}\n```")
else:
table_blocks.append(table_text)
param_list = []
block_idx = 0
for tb_block in table_blocks:
_block_cache_file = f"{self.json_path}/pkts/数据包-{utils.to_file_name(pkt_name)}-参数列表-{block_idx}.json"
block_idx += 1
text = await asyncio.to_thread(self.generate_text_json, _msg, _block_cache_file, [], tb_block, None)
json_list = json.loads(text)
for par in json_list:
if not re.match('^Byte\d+_B[0-7]~Byte\d+_B[0-7]$', par['posText']):
par['posText'] = get_single_pos(par['posText'])
if not any(filter(lambda p: p['posText']==par['posText'], param_list)):
param_list.append(par)
for par in param_list:
if par['posText'] is not None:
par['id'] = re.sub('[^_a-zA-Z0-9]', '_', par['id'])
pos_data = self.handle_pos(par['posText'])
par['pos'] = pos_data['pos']
par['length'] = pos_data['length']
save_to_file(json.dumps(param_list, ensure_ascii=False, indent=2), _cache_file)
return param_list
else:
return []
# 单独处理未正确获取的位置信息
def get_single_pos(txt):
_msg = f"""
1个字节的长度为8位,使用B0-B7来表示。
将“{txt}”转换为要求格式"Byte1_B6~Byte2_B0"进行输出,如果与要求格式不同的要进行补全或转换,例如:"Byte1_B0~B2" 转换为 "Byte1_B0~Byte1_B2"。例如:"Byte1~Byte2" 转换为 "Byte1_B0~Byte2_B7"。例如:"Byte1_B5" 转换为 "Byte1_B5~Byte1_B5"。
输出示例:Byte1_B6~Byte2_B0
仅输出结果,不输出其他文字
"""
def validation(return_txt):
assert re.match('^Byte\d+_B[0-7]~Byte\d+_B[0-7]$', return_txt), '格式不正确'
text = self.generate_text_json(_msg, "", doc_text="", validation=validation)
return text
params = []
header_params, data_area_params = (
await asyncio.gather(get_header_params(pkt_name, doc_text),
get_data_area_params(pkt_name, doc_text)))
params.extend(data_area_params)
pkt['headers'] = header_params
pkt['datas'] = data_area_params
# async def get_param_info(para):
# _msg2 = """
# # 需求
# 从文本中提取区间起始偏移位置和区间长度,单位为比特。文本中的内容为区间描述,其中:Byte 表示第 N 个字节,N 从 1 开始,B 表示第 X 个比特,X 从 0 - 7 ,区间为闭区间。
# 所有数学计算是需要算数表达式,不需要计算结果。计算公式如下:
# - ByteN_BX起始和结束位置:(N - 1)*8 + X
# - ByteN 起始位置:(N - 1)*8
# - ByteN 结束位置:(N - 1) * 8 + 7
# - 长度:结束偏移位置 + 1 - 起始偏移位置,闭区间的长度需要结束位置加1再减去起始位置
# # 生成模板
# 推理过程:简要说明提取信息及调用 tool 计算的过程。输出结果:按 JSON 格式输出,格式如下:
# {
# "offset": "起始偏移位置表达式",
# "length": "长度计算表达式"
# }
# 文本:
# """+f"""
# {para['posText']}
# """
# text2 = await asyncio.to_thread(self.generate_text_json, _msg2, '', [], '')
# try:
# out = json.loads(text2)
# para['pos'] = eval(out['offset'])
# para['posRet'] = text2
# para['length'] = eval(out['length'])
# para['id'] = re.sub(r"[^0-9a-zA-Z_]", "_", para['code'])
# para['type'] = 'para'
# except Exception as e:
# print(e)
# tasks = []
# for param in params:
# tasks.append(get_param_info(param))
#
# s = time.time()
# await asyncio.gather(*tasks)
# e = time.time()
if params:
offset = params[0]['pos']
for para in params:
para['pos'] -= offset
# print(f'======参数数量:{len(params)},耗时:{e - s}')
utils.save_text_to_file(json.dumps(pkt, ensure_ascii=False, indent=4), cache_file)
return pkt
async def gen_pkts(self):
_msg = """
# 角色
你是一名资深软件工程师。
# 指令
我需要从文档中提取遥测包数据,你要根据文档内容帮我完成遥测包数据的提取。
# 需求
输出文档中描述的遥测包列表,遥测包字段包括:名称(name)、代号(id)。
字段描述:
1.名称:遥测包的名称;
2.代号:遥测包的代号;
# 约束
- name:名称中不要包含代号,仅从文档中提取源包名称;
- 如果没有代号,使用遥测包名称的英文翻译代替;
- 如果没有名称用代号代替;
- 注意,一定要输出所有的遥测包,不要漏掉任何一个遥测包;
- 数据结构最外层为数组数组元素为遥测包,不包括遥测包下面的参数。
# 例子
[
{
"name": "数管数字量快速源包",
"id": "PMS001",
}
]
"""
print(f'遥测源包列表:')
doc_text = self.get_text_with_entity(['源包列表'])
text = await asyncio.to_thread(self.generate_text_json, _msg, f'{self.json_path}/源包列表.json', doc_text=doc_text)
pkt = json.loads(text)
return pkt
async def gen_pkt_vc(self):
_msg = """
# 需求
根据文档内容输出遥测源包信息,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs)。
所有字段仅使用文档内容输出。
表格中遥测源包不是按名称来排序的,按照文档中的表格中的遥测源包顺序进行输出。
每个包都要输出。
所属虚拟信道:通过表格中描述的下传时机和虚拟信道的划分,获取下传时机对应的虚拟信道代号(序号),并组织为一个数据进行输出,例如:下传时机为实时和延时,那么就表示该包的所属虚拟信道为VC1和VC3。如果没有匹配下传时机,就填入空数组。
# 输出示例:
[
{
"id": "PMS001",
"name": "数管数字量快速源包",
"vcs": ["VC1",'VC2']
},
]
"""
print('遥测源包所属虚拟信道:')
def validation(gen_text):
pkts = json.loads(gen_text)
assert len(pkts), 'VC源包列表不能为空'
doc_text = self.get_text_with_entity(['虚拟信道定义', '遥测源包下传时机'])
text = await asyncio.to_thread(
lambda: self.generate_text_json(_msg, f'{self.json_path}/遥测VC源包.json', doc_text=doc_text, validation=validation))
pkt_vcs = json.loads(text)
return pkt_vcs
def gen_pkt_format(self):
_msg = """
# 角色
你是一名资深软件工程师。
# 指令
我需要从文档中提取数据包的格式,你要帮助我完成数据包格式的提取。
# 需求
请仔细分系文档,输出各个数据包的格式。
数据结构最外层为数组,数组元素为数据包格式,将主导头的子级提升到主导头这一级并且去除主导头,数据包type为logic,包数据域type为any。
包格式字段包括:名称(name)、代号(id)、类型(type)、子级(children)。
children元素的字段包括:name、id、pos、length、type。
children元素包括:版本号(Ver)、类型(TM_Type)、副导头标志(Vice_Head)、应用过程标识符(Proc_Sign)、分组标志(Group_Sign)、包序列计数(Package_Count)、包长(Pack_Len)、数据域(EPDU_DATA)。
# 约束
- 生成的JSON语法格式要合法。
# 例子
{
"name": "实时遥测参数包",
"id": "EPDU",
"type": "logic",
"children": [
{
"name": "版本号",
"id": "Ver",
"pos": 0,
"length": 3,
"type": "para",
"content": "0",
"dataTy": "INVAR"
},
{
"name": "数据域",
"id": "EPDU_DATA",
"pos": 3,
"length": "变长",
"type": "any"
}
]
}
"""
print('遥测包格式:')
text = self.generate_text_json(_msg, f'{self.json_path}/数据包格式.json', files=[file_map['遥测大纲']])
pkt_formats = json.loads(text)
return pkt_formats
def compute_length_pos(self, items: list):
items.sort(key=lambda x: x['pos'])
# for child in items:
# if 'children' in child:
# self.compute_length_pos(child['children'])
# if 'length' in child and isinstance(child['length'], int):
# length = length + child['length']
# pos = pos + child['length']
# node['length'] = length
async def gen_bus(self, proj_pk, rule_enc, rule_id, ds, name_path, dev_name):
_msg = """
# 角色
你是一名资深的软件工程师。
# 需求
请分析文档中的表格,按表格顺序输出表格中的所有源包信息;
数据包字段包括:id(数据包代号)、name(数据包名称)、apid(16进制字符串)、service(服务子服务)、length(长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、
transSer(传输服务)、note(备注)、throughBus(是否经过总线)、transDirect(传输方向)。
文档中如果没有数据包表则输出:[]。
# 数据包字段说明
- frameNum(通信帧号):文档中通信帧号列的内容;
- subAddr(子地址/模式):值只能是:“深度”、“平铺”、数字或null,如果是“/”则是null;
- throughBus(是否经过总线)的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线;
- transSer(传输服务分三种):置数(SetData)、取数(GetData)、数据块传输(DataBlock),根据表格中的“传输服务”列进行判断;
# 约束
- 仅输出json。
- 按照表格中的顺序进行输出。
- 不要漏包。
# 例子
[
{
"id": "P001",
"name": "xxx",
"apid": "418",
"service": "(1, 2)",
"length": 1,
"interval": 1000,
"subAddr": null,
"frameNum": "1|2",
"transSer": "DataBlock",
"note": "",
"throughBus": true,
"burst": true,
"transDirect": ""
}
]
"""
print('总线数据包:')
def validation(gen_text):
pkts2 = json.loads(gen_text)
assert not next(filter(lambda pkt2: 'transSer' not in pkt2, pkts2), None), '总线包属性生成不完整,缺少transSer。'
rt_doc_text = self.get_text_with_entity(['RT地址分配'])
subsys_pkt_texts = self.get_text_list_with_entity(['分系统源包'])
tasks = []
rt_adds = []
for subsys_pkt_text in subsys_pkt_texts:
doc_text = f'{rt_doc_text}\n{subsys_pkt_text}'
subsys = subsys_pkt_text[:subsys_pkt_text.index("\n")]
# 单独获取RT地址,并应用到章节下所有包
get_rt_msg = f"""返回{subsys}的RT地址,仅输出十进制的结果,不要输出其他内容,如果是系统管理单元(SMU)则返回0。"""
rt_info = self.generate_text_json(get_rt_msg, "", doc_text=rt_doc_text)
if rt_info == '0':
continue
rt_adds.append({
"rt": subsys,
"rt_addr": rt_info
})
# md5 = utils.generate_text_md5(subsys_pkt_text)
task = asyncio.to_thread(self.generate_text_json, _msg,
f"{self.json_path}/总线-{utils.to_file_name(subsys)}.json", doc_text=doc_text,
validation=validation)
tasks.append(task)
results = await asyncio.gather(*tasks)
pkts = []
# 判断是否存在总线数据包.json
if os.path.isfile(f"{self.json_path}/总线数据包列表.json"):
pkts = read_from_file(f"{self.json_path}/总线数据包列表.json")
pkts = json.loads(pkts)
else:
pktid_apid_map = {}
for index, result in enumerate(results):
pkts_diretions = []
# 全角空格去除
result = re.sub(r' ', '', result)
_pkts = json.loads(result)
rt_name = rt_adds[index]["rt"]
for _pkt in _pkts:
# 应用RT地址
_pkt['rt'] = rt_name
_pkt['rtAddr'] = rt_adds[index]["rt_addr"]
_pkt['burst'] = "突发" in f"{_pkt['interval']}"
if _pkt['apid'] is None or not re.match(r'[0-9A-Fa-f]+', _pkt['apid']):
_pkt['apid'] = ''
if _pkt['id'] in pktid_apid_map:
if _pkt['apid']:
pktid_apid_map[_pkt['id']] = _pkt['apid']
else:
_pkt['apid'] = pktid_apid_map[_pkt['id']]
else:
pktid_apid_map[_pkt['id']] = _pkt['apid']
# 转为bit长度
if _pkt['length']:
if isinstance(_pkt['length'], str) and re.match(r'^\d+$', _pkt['length']):
_pkt['length'] = int(_pkt['len']) * 8
elif isinstance(_pkt['length'], int):
_pkt['length'] = _pkt['length'] * 8
pkts.append(_pkt)
# 获取待处理的传输方向信息
pkts_diretions.append({
'id': _pkt['id'],
'rt': _pkt['rt'],
'transDirect': _pkt['transDirect'],
})
# 处理传输方向
_msg = """
处理传入的json数组,每个数组对象中包含字段:rt(自身设备)、transDirect(传输方向)。
需要你给数组对象中多加一个字段,输出数组中单个对象的传输类型(transType),传输类型有两种值“收”和“发”,判断依据是根据传输方向的内容进行判断,由rt发送给SMU的传输类型是“收”,由SMU发送给rt的传输类型是“发”
rt字段为空的数据不用处理。
在transDirect字段中rt可能为缩写,缩写对应的rt名称可以从文档中进行读取。
输出结果将增加了字段的json数组直接输出,不用输出其他内容。
输出示例:[{"id": "PMK013", "rt": "中心控制单元CCU", "transDirect": "CCU→SMU→地面", "transType": "收"},{"id": "PMK055", "rt": "中心控制单元CCU", "transDirect": "SMU→CUU", "transType": "发"}]
""" + f"""
JSON:{pkts_diretions}
"""
result_json = self.generate_text_json(_msg, f"{self.json_path}/总线-rt-{utils.to_file_name(rt_name)}.json", doc_text=rt_doc_text)
# 将处理结果同步修改到result
for pkt in _pkts:
for data in json.loads(result_json):
if "transType" in data:
if data['id'] == pkt['id']:
pkt['transType'] = data['transType']
break
print(f"总线源包个数:{len(pkts)}")
# 筛选经总线的数据包
pkts = list(filter(lambda it: it['throughBus'], pkts))
no_apid_pkts = list(filter(lambda it: not it['apid'], pkts))
# 筛选有apid的数据包
pkts = list(filter(lambda it: it['apid'], pkts))
# 筛选rtAddr不为0的数据包,SMU的
pkts = list(filter(lambda it: it['rtAddr'] != '0', pkts))
# 储存所有总线包
save_to_file(json.dumps(pkts, ensure_ascii=False, indent=2), f"{self.json_path}/总线数据包列表.json")
tasks = []
def _run(gen_pkt_details, pkt2):
_pkt2 = asyncio.run(gen_pkt_details(pkt2['name'], pkt2['id']))
if _pkt2 is not None:
pkt2['children'] = []
pkt2['children'].extend(_pkt2['datas'])
for pkt in pkts:
pkt_task = asyncio.to_thread(_run, self.gen_pkt_details, pkt)
tasks.append(pkt_task)
await asyncio.gather(*tasks)
rt_pkt_map = {}
for pkt in pkts:
# 根据数据块传输和取数分组
# 逻辑封装包的解析规则ID:RT[rt地址]SUB[子地址]S(S代表取数,方向是AA表示发送;R代表置数,方向是BB表示接受)
# 取数:逻辑封装包根据子地址和帧号组合创建,有几个组合就创建几个逻辑封装包
# 数据块:只有一个逻辑封装包
if pkt['subAddr'] is not None and not isinstance(pkt['subAddr'], int) and pkt['subAddr'].find("/") > -1:
pkt['subAddr'] = pkt['subAddr'].split("/")[0]
# 处理子地址
if pkt['subAddr'] == '平铺' or not pkt['subAddr']:
# 平铺:11~26,没有填写的默认为平铺
pkt['subAddr'] = '11~26'
elif pkt['subAddr'] == '深度':
# 深度:11
pkt['subAddr'] = '11'
pkt['burst'] = "突发" in f"{pkt['interval']}"
# 处理帧号
if pkt['burst']:
# 突发:ALL
pkt['frameNum'] = 'ALL'
elif not pkt['frameNum']:
# 有
pkt['frameNum'] = ''
# todo: 处理传输方向
rt_addr = pkt['rtAddr']
sub_addr = pkt['subAddr']
trans_ser = pkt['transSer']
frame_no = pkt['frameNum'].replace('|', ',')
if trans_ser == 'GetData':
# 取数
pkt_id = f"RT{rt_addr}SUB{sub_addr}"
vals = f"{rt_addr}/{sub_addr}/0xAA/{frame_no}/"
rt_pkt_map_gen(pkt, '取数', rt_pkt_map, pkt_id, vals, pkts)
elif trans_ser == 'DataBlock':
# 数据块
direct = '0xAA'
if pkt['transDirect'] == '发':
direct = '0xBB'
pkt_id = f"RT{rt_addr}SUB{sub_addr}{direct}"
vals = f"{rt_addr}/{sub_addr}/{direct}/ALL/"
rt_pkt_map_gen(pkt, '数据块传输', rt_pkt_map, pkt_id, vals, pkts)
_pkts = []
for k in rt_pkt_map:
_pkts.append(rt_pkt_map[k])
bus_items = data_templates.get_bus_datas(_pkts)
seq = 1
sub_key_nodes = list(filter(lambda it: 'is_key' in it, bus_items))
has_key = any(sub_key_nodes)
rule_pk = rule_enc.C_ENC_PK
sub_key = ''
key_items = []
self.compute_length_pos(bus_items)
for item in bus_items:
if item['type'] == 'enc':
if has_key:
_prop_enc = create_any_pkt(proj_pk, rule_pk, item, seq, name_path, ds, 'ENC', sub_key_nodes,
key_items)
else:
_prop_enc, rule_stream, _ = create_enc_pkt(proj_pk, rule_pk, item, rule_enc.C_ENC_PK, seq,
name_path, ds, '001', 'ENC')
else:
# 参数
_prop_enc = create_prop_enc(proj_pk, rule_pk, item, get_data_ty(item), seq)
if item.__contains__('is_key'):
sub_key += _prop_enc.C_ENCITEM_PK + '/'
key_items.append(
{"pk": _prop_enc.C_ENCITEM_PK,
'id': _prop_enc.C_SEGMENT_ID,
'name': _prop_enc.C_NAME,
'val': ''})
seq += 1
if sub_key:
rule_enc.C_KEY = sub_key
update_rule_enc(rule_enc)
async def gen_tc(self):
# 数据帧格式
frame_task = self.gen_tc_transfer_frame_format()
# 遥控包格式
pkt_format_task = self.gen_tc_pkt_format()
# 遥控包列表
instructions_task = self.gen_tc_transfer_pkts()
result = await asyncio.gather(frame_task, pkt_format_task, instructions_task)
frame = result[0]
pkt_format = result[1]
instructions = result[2]
tasks = []
for inst in instructions:
# 遥控指令数据区内容
tasks.append(self.gen_tc_details(inst))
await asyncio.gather(*tasks)
for inst in instructions:
inst['type'] = 'insUnit'
format_text = json.dumps(pkt_format, ensure_ascii=False)
format_text = utils.replace_tpl_paras(format_text, inst)
pf = json.loads(format_text)
pf['name'] = inst['name']
pf['code'] = inst['code']
data_area = next(filter(lambda x: x['name'] == '应用数据区', pf['children']))
data_area['children'].append(inst)
frame['subPkts'].append(pf)
self.order = 0
def build_def(item: dict):
if item['type'] in ['enum', 'sendFlag']:
if isinstance(item['enums'], str):
enums = json.loads(item['enums'])
else:
enums = item['enums']
return json.dumps({"EnumItems": enums, "CanInput": True}, ensure_ascii=False)
elif item['type'] == 'length':
return None
elif item['type'] == 'checkSum':
return json.dumps({"ChecksumType": item['value']['type']})
elif item['type'] == 'subPkt':
return json.dumps({"CanInput": False})
elif item['type'] in ['combPkt', 'insUnitList', 'input']:
return None
elif item['type'] == 'insUnit':
return '{"MinLength":null,"MaxLength":null,"IsSubPackage":false,"InputParams":[],"OutPutParams":[],"MatchItems":[]}'
elif item['type'] == 'pkt':
return '''{"MaxLength":1024,"IsSplit8":false,"Split8Start":null,"Split8End":null,"PadCode":null,"Alignment":null,"InputParams":[],"OutPutParams":[],"MatchItems":[]}'''
elif item['type'] == 'pktSeqCnt':
return json.dumps(
{"FirstPackValue": "PackCount", "MiddlePackValue": "PackIndex", "LastPackValue": "PackIndex",
"IndependPackValue": "InsUnitCount"})
elif 'value' in item:
return item['value']
def create_tc_format(parent_pk, field, parent_parent_pk=None):
"""
创建遥控格式
数据库数据结构:
帧字段 parent_pk=null, pk=pk_001, type=1
匿名字段(子包) parent_pk=pk_001, pk=pk_002, type=22
字段1 parent_pk=pk_002, pk=pk_003, type=15
字段2 parent_pk=pk_002, pk=pk_004, type=15
包字段 parent_pk=pk_001, pk=pk_005, type=1
匿名字段(子包) parent_pk=pk_005, pk=pk_006, type=22
字段3 parent_pk=pk_006, pk=pk_007, type=15
指令单元 parent_pk=pk_005, pk=pk_007, type=4
字段4 parent_pk=pk_007, pk=pk_008, type=15
:param parent_pk: 父级pk
:param field: 格式字段
:param parent_parent_pk: 父级的父级pk
:return:
"""
field['order'] = self.order
self.order += 1
field['def'] = build_def(field)
if 'length' in field:
field['bitWidth'] = field['length']
field['bitOrder'] = None
field['attr'] = make_attr(field)
if field['type'] == 'length' and 'value' in field and field['value']:
val = field['value']
field['range'] = val['start'] + "~" + val['end']
field['formula'] = val['formula']
# 即时输入长度为null则是变长字段,需要把类型改为variableLength
if field['type'] == 'input' and field['length'] is None:
field['type'] = 'variableLength'
if isinstance(field['value'], dict):
field['range'] = f'{field["value"]["minLength"]}~{field["value"]["maxLength"]}'
# 枚举值默认值设置
if field['type'] == 'enum' and len(field['enums']) and not next(
filter(lambda x: 'default' in x and x['default'], field['enums']), None):
field['enums'][0]['default'] = True
# 校验和
if field['type'] == 'checkSum':
field['range'] = f'{field["value"]["start"]}~{field["value"]["end"]}'
ins_format = create_ins_format(self.proj.C_PROJECT_PK, parent_pk, field)
ins_format_pk = ins_format.C_INS_FORMAT_PK
if 'children' in field:
autocode = 1
if field['type'] == 'pkt':
info = {
'order': self.order,
'type': 'subPkt',
'def': json.dumps({"CanInput": False})
}
ins_format = create_ins_format(self.proj.C_PROJECT_PK, ins_format_pk, info)
self.order += 1
for child in field['children']:
child['autocode'] = autocode
autocode += 1
if field['type'] == 'insUnitList':
_parent_pk = parent_parent_pk
else:
_parent_pk = ins_format.C_INS_FORMAT_PK
create_tc_format(_parent_pk, child, ins_format_pk)
if 'subPkts' in field:
for _pkt in field['subPkts']:
create_tc_format(ins_format_pk, _pkt, parent_pk)
create_tc_format(None, frame)
async def gen_tc_transfer_frame_format(self):
_msg = '''
# 角色
你是一名资深的软件工程师。
# 指令
分析遥控传送帧格式,提取遥控传送帧格式的字段定义。
# 需求
要提取值的帧格式字段:
- 版本号:const,二进制,以B结尾;
- 通过标志:const,二进制,以B结尾;
- 控制命令标志:const,二进制,以B结尾;
- 空闲位:const,二进制,以B结尾;
- 航天器标识:const,十六进制,以0x开头,如果是二进制或十进制需要转换为十六进制;
- 虚拟信道标识:sendFlag,发送标记,默认为“任务注入帧”,所有的值都要列举出来;
# 数据类型
- const:固定码字,数值,二进制以B结尾,十进制,十六进制以0x开头;
- sendFlag:发送标记,类似枚举,定义样例:[{"n":"name","v":"value","c":"code","default":true}],n表示名称,v表示值,c表示code(没有空着),default表示是默认值;
- checkSum:校验和,如果是校验和类型还需要分析校验和的算法,并保存在value的type中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other)
# 约束
- 以JSON格式输出;
- 仅输出JSON文本,不要输出任何其他文本。
# 输出例子:
{
"版本号": "00B",
"通过标志": "0",
...
}
'''
def validation(gen_text):
json.loads(gen_text)
doc_text = self.get_text_with_entity(['遥控帧格式'])
text = await asyncio.to_thread(
lambda: self.generate_tc_text(_msg, f'{self.json_path}/tc_transfer_frame.json', doc_text=doc_text,
validation=validation))
result: dict = json.loads(text)
format_text = utils.read_from_file('tpl/tc_transfer_frame.json')
format_text = utils.replace_tpl_paras(format_text, result)
frame = json.loads(format_text)
return frame
async def gen_tc_pkt_format(self):
_msg = '''
# 角色
你是一名资深的软件工程师。
# 指令
分析遥控包格式,提取遥控包格式的字段定义。
# 需求
要提取值的包格式字段:
- packetVersionNumber:包版本号,const,二进制;
- packetType:包类型,const,二进制;
- dataFieldHeaderFlag:数据区头标志,const,二进制;
- sequenceFlags:序列标志,const,二进制;
- ccsdsSecondaryHeaderFlag:副导头标志,const,二进制;
- tcPktVersionNumber:遥控包版本号,const,二进制;
- acknowledgmentFlag:命令正确应答,const,二进制;
- sourceAddr:源地址,const,十六进制。
# 数据类型
- 固定码字:const,数值,二进制以B结尾,十进制,十六进制以0x开头;
- 长度:length,如果字段描述内容为数据区域的长度则表示是长度,长度的value为数值、null或范围定义,
- 枚举值:enum,
- 校验和:checkSum,如果是校验和类型还需要分析校验和的算法,并保存在value的type中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other)
- 即时输入:input,如果是即时输入value的值为变长定义。
## 长度类型的范围定义描述
{"start": "起始字段code", "end": "结束字段code", "formula": "计算公式"}
- start:起始字段code,长度包括起始字段,字段描述中说明了起始字段,
- end:结束字段code,长度包括结束字段,字段描述中说明了结束字段,
- formula:计算公式,如果没有计算相关描述则表示不需要计算公式。
## 即使输入类型的变长定义描述
{"minLength": "最小长度", "maxLength": "最大长度", "variableLength": true}
- minLength:最小长度,
- maxLength:最大长度,
- variableLength:是否是变长。
计算公式定义:
- BYTES:按字节计算;
- N-x:总字节数减x,例如总字节数减1的公式为N-1。
# 约束
- 以JSON格式输出;
- 仅输出JSON文本,不要输出任何其他文本。
# 输出例子:
{
"packetVersionNumber": "00B",
"packetType": "1B",
...
}
'''
def validation(gen_text):
json.loads(gen_text)
doc_text = self.get_text_with_entity(['遥控包格式'])
text = await asyncio.to_thread(
lambda: self.generate_tc_text(_msg, f'{self.json_path}/tc_transfer_pkt.json', doc_text=doc_text,
validation=validation))
result = json.loads(text)
format_text = utils.read_from_file('tpl/tc_pkt_format.json')
format_text = utils.replace_tpl_paras(format_text, result)
pkt_format = json.loads(format_text)
return pkt_format
async def gen_tc_transfer_pkts(self):
_msg = '''
# 角色
你是一名资深的软件工程师。
# 指令
分析文档列出所有的遥控指令。
# 约束
- 应用过程标识:应用过程标识就是APID,一般会在名称后的括号中列出来;
- code:指令代号,如果没有填写或者是“/”则使用空字符串代替;
- name:指令名称,根据表格行内容提取,注意是行内容,注意名称需要提取完整,如果有多列则合并用-分割;
- shortName:指令名称,根据表格内容提取;
- apid: 应用过程标识符;
- serviceType:服务类型;
- serviceSubtype:服务子类型;
- dataArea:应用数据区,提取表格中的应用数据区内容。
# 输出例子:
[{
"name": "aaa-xxx",
"shortName": "xxx"
"code":"pkt",
"apid":"0xAA",
"serviceType":"0x1",
"serviceSubtype":"0x2",
"dataArea": ""
}]
'''
def validation(gen_text):
json.loads(gen_text)
doc_text = self.get_text_with_entity(['APID分配'])
text = await asyncio.to_thread(
lambda: self.generate_tc_text(_msg, f'{self.json_path}/tc_transfer_pkts.json', doc_text=doc_text,
validation=validation))
pkts = json.loads(text)
return pkts
async def gen_tc_details(self, pkt):
result = []
tc_name = pkt['shortName']
tc_code = pkt['code']
pkt['name'] = f'{tc_code} {tc_name}'
_msg = f"""
# 角色
你是一个资深软件工程师。
# 指令
分析文档,从文档中提取遥控指令名称为“{tc_name}”代号为“{tc_code}”的指令应用数据区定义。
有些文档内容非常简单仅仅包含特定字节的内容描述,如果是这种文档,则每个特定字节的内容描述定义为一个字段,字段类型根据字节内容确定。
""" + """
# 字段类型
- 固定码字:const,数值,二进制以B结尾,十进制,十六进制以0x开头;
- 长度:length,如果字段描述内容为数据区域的长度则表示是长度,长度的value为数值、null或范围定义,
- 枚举值:enum,
- 校验和:checkSum,如果是校验和类型还需要分析校验和的算法是什么以及校验数据域范围,并保存在value中,例如:{ "type":"CRC-CCITT", "start": "START", "end":"END" },
- 即时输入:input,如果是即时输入value的值为变长定义。
## 长度类型的范围定义描述
{"start": "起始字段code", "end": "结束字段code", "formula": "计算公式"}
- start:起始字段code,长度包括起始字段,字段描述中说明了起始字段,
- end:结束字段code,长度包括结束字段,字段描述中说明了结束字段,
- formula:计算公式,如果没有长度特殊计算相关描述则使用BYTES。
计算公式定义:
- BYTES:按字节计算,字节数;
- N-x:总字节数减x,例如总字节数减1的公式为N-1。
## 即使输入类型的变长定义描述
{"minLength": "最小长度", "maxLength": "最大长度", "variableLength": true}
- minLength:最小长度,
- maxLength:最大长度,
- variableLength:是否是变长。
# 字段类型分析方法
- 根据字段描述分析字段的类型;
- 字段描述中明确指定了字段值的,类型为const;
- 字段描述中没有明确指定字段值,但是罗列了取值范围的,类型为enum;
- 字段描述中如果没有明确指定字段值也没有罗列取值范围的,类型为input;
- 字段如果描述了当前指令中的数据域长度以及长度范围则是长度类型length,否则不是长度类型;
- 如果和数据域有关,类型为const;
- 校验和类型:字段如果与当前指令数据区的校验和有关则为校验和类型否则不是校验和类型,分析校验和的算法,并保存在value中,校验和算法包括:字节异或(ByteXOR)、累加和取反(SumNot)、累加和(AddSum)、应用循环冗余(CRC-CCITT)、CRC8(CRC8)、ISO和校验(ISOSum)、奇校验(Odd)、偶校验(Even)、其他(Other)。
# 约束
## 字段属性
- code 如果没有明确定义则使用名称的英文翻译,尽量简短,如果没有填写或者为斜线表示没有明确定义;
- length 自动转换为bit长度,必须是数值、null或范围定义,不能为0;
- value 根据字段描述提取字段值,字段值一般为数值类型,需要根据字段类型来分析,如果是length类型value的值为范围定义;
- enums 枚举类型的字段必须要有enums,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""};
## 字段类型
- 长度类型字段的范围定义中的start和end必须是生成结果中的字段code,长度范围包括start和end,必须使用长度描述中的字段;
- 如果没有长度范围描述则不是长度类型;
- 校验和类型字段必须描述的是当前指令数据域校验和,如果描述的不是当前指令的数据域校验和则不是校验和类型;
- 输出数据结构为数组,数组元素为字段信息;
- 输出内容必须为严格的json,不能输出除json以外的任何内容。
# 输出例子:
[
{
"name": "para1",
"code": "para1",
"length": 8,
"type": "const",
"value": "0xAA"
},
{
"name": "para2",
"code": "para2",
"length": 8,
"type": "length",
"value": {"start": "data", "end": "data", "formula": "BYTES"}
},
{
"name": "para3",
"code": "para3",
"length": 8,
"type": "enum",
"value": "",
"enums": [{"n":"参数1","v":"0x0A","c":"Para1"}]
},
{
"name": "数据",
"code": "data",
"length": null,
"type": "input",
"value": ""
},
{
"name": "校验和",
"code": "checksum",
"length": 2,
"type": "checkSum",
"value": { "type": "CRC-CCITT", "start":"para1", "end":"data" }
}
]
"""
def validation(gen_text):
fields = json.loads(gen_text)
for field in fields:
if field['type'] == 'length':
if field['value'] is None:
raise Exception('length类型的value不能为空')
if 'start' not in field['value'] or 'end' not in field['value']:
raise Exception('length类型的value必须包含start和end')
if field['value']['start'] not in [f['code'] for f in fields]:
raise Exception('length类型的value的start字段必须在fields中')
if field['value']['end'] not in [f['code'] for f in fields]:
raise Exception('length类型的value的end字段必须在fields中')
elif field['type'] == 'enum':
if 'enums' not in field:
raise Exception('enum类型的field必须包含enums')
if len(field['enums']) == 0:
raise Exception('enum类型的field的enums不能为空')
for enum in field['enums']:
if 'n' not in enum or 'v' not in enum:
raise Exception('enum类型的field的enums的元素必须包含n、v、c')
if enum['n'] == '' or enum['v'] == '':
raise Exception('enum类型的field的enums的元素不能为空')
doc_text = self.get_text_with_entity([tc_name])
if doc_text == '':
doc_text = self.get_text_with_tc_name(tc_name)
if doc_text == '':
doc_text = pkt['dataArea']
text = await asyncio.to_thread(self.generate_tc_text, _msg,
f"{self.json_path}/遥控指令数据域-{tc_code}-{utils.to_file_name(tc_name)}.json",
doc_text=doc_text, validation=validation)
result = json.loads(text)
pkt['children'] = result
def get_text_with_tc_name(self, tc_name: str):
entities = doc_dbh.get_entities_by_type('指令格式配置')
entity_names = '\n'.join([f'- {e.name}' for e in entities])
msg = f"""
# 需求
请从下列指令名称中匹配一个与“{tc_name}”相似度最高的指令名称。
指令名称列表:
{entity_names}
"""
name = self.generate_text(msg,None)
entity = next(filter(lambda e: e.name == name, entities,None))
if entity:
return self.get_text_with_entity([entity.name])
else:
return ''
def tc_data_generate():
exe_path = os.path.dirname(__file__) + "/db_tc_generator/InstructionGenerator.exe"
db_path = os.path.dirname(__file__) + "/db.db"
try:
# 超时时间240秒
result = subprocess.run([exe_path, db_path], timeout=240)
print(result.stdout)
print(result.returncode)
except subprocess.TimeoutExpired:
print("警告:指令数据生成失败!")
def main():
try:
project_path = r'D:\projects\KnowledgeBase'
doc_dbh.set_project_path(project_path)
# 启动大模型处理流程
asyncio.run(DbStructFlow(f'{project_path}').run())
# 生成指令数据表
tc_data_generate()
except KeyboardInterrupt:
if g_completion:
g_completion.close()
if __name__ == '__main__':
main()