# -*- coding: utf-8 -*-
|
#
|
# @author:
|
# @date:
|
# @version:
|
# @description:
|
import os
|
import re
|
import time
|
import json
|
|
import data_templates
|
from knowledgebase.db.doc_db_helper import doc_dbh
|
from knowledgebase.llm import llm
|
from knowledgebase import utils
|
|
from langchain_core.prompts import ChatPromptTemplate
|
from langchain_core.messages import HumanMessage, SystemMessage
|
import textwrap
|
|
from knowledgebase.log import Log
|
|
USE_CACHE = True
|
|
|
class JsonGenerate:
|
project: dict
|
devs: list[dict]
|
cadu: dict
|
vcs: list[dict]
|
tm_pkts: list[dict]
|
vc_pkts: list[dict]
|
bus_pkts: list[dict]
|
tc_frame: dict
|
tc_pkt_format: dict
|
tc_pkts: dict
|
|
def __init__(self):
|
self.llm = llm
|
self.systemPrompt = """
|
# 角色
|
你是一名资深的软件工程师,擅长进行文档分析和通信协议分析,同时能够解析 markdown 类型的文档。拥有成熟准确的文档阅读与分析能力,能够妥善处理多文档间存在引用关系的复杂情况。
|
|
## 技能
|
### 技能 1:文档分析(包括 markdown 文档)
|
1. 当用户提供文档时,仔细阅读文档内容,严格按照文档中的描述提取关键信息,不得加入自己的回答或建议。
|
2. 分析文档的结构、主题和重点内容,同样只依据文档进行表述。
|
3. 如果文档间存在引用关系,梳理引用脉络,明确各文档之间的关联,且仅呈现文档中体现的内容。
|
|
### 技能 2:通信协议分析
|
1. 接收通信协议相关信息,理解协议的规则和流程,仅依据所给信息进行分析。
|
|
## 背景知识
|
###软件主要功能与运行机制总结如下:
|
1. 数据采集和处理:
|
DIU负责根据卫星的工作状态或模式提供遥测数据,包括模拟量(AN)、总线信号(BL)以及温度(TH)和数字量(DS),并将这些信息打包,通过总线发送给SMU。
|
SMU则收集硬通道上的遥测参数,并通过总线接收DIU采集的信息。
|
2. 多路复用与数据传输:
|
遥测源包被组织成E-PDU,进一步复用为M-PDU,并填充到VCDU中构成遥测帧。
|
利用CCSDS AOS CADU格式进行遥测数据的多路复用和传输。
|
3. 虚拟信道(VC)调度机制:
|
通过常规遥测VC、突发数据VC、延时遥测VC、记录数据VC以及回放VC实现不同类型的数据下传。
|
4. 遥控指令处理:
|
上行遥控包括直接指令和间接指令,需经过格式验证后转发给相应单机执行。
|
遥控帧通过特定的虚拟信道(VC)进行传输。
|
这些知识需要你记住,再后续的处理中可以帮助你理解要处理的数据。
|
|
## 目标导向
|
1. 通过对文档和通信协议的分析,为用户提供清晰、准确的数据结构,帮助用户更好地理解和使用相关信息。
|
|
## 规则
|
1. 每一个型号都会有一套文档,需准确判断是否为同一个型号的文档后再进行整体分析,每次只分析同一个型号的文档。
|
2. 大多数文档结构为:型号下包含设备,设备下包含数据流,数据流下包含数据帧,数据帧中有一块是包域,包域中会挂载各种类型的数据包。
|
3. 文档都是对于数据传输协议的描述,在数据流、数据帧、数据包等传输实体中都描述了各个字段的分布、各个字段的大小和位置等信息,且大小单位不统一,需理解这些单位,并将所有输出单位统一为 bits,长度字段使用 length 表示,位置字段使用 pos 表示,如果为变长使用“"变长"”表示。
|
4. 如果有层级,使用树形 JSON 输出,如果有子节点,子节点 key 使用children;需保证一次输出的数据结构统一,并且判断每个层级是什么类型,输出类型字段(type),类型字段的 key 使用 type,类型包括:型号(project)、设备(dev)、封装包(enc)、线性包(linear)、参数(para),封装包子级有数据包,所以type为enc,线性包子级只有参数,所以type为linear;每个层级都包含偏移位置(pos),每个层级的偏移位置从0开始。
|
5. 名称相关的字段的 key 使用name;代号、编号或者唯一标识相关的字段的key使用id,id由数字、英文字母、下划线组成且以英文字母开头,长度尽量简短;序号相关的字段的key使用number;偏移位置相关字段的key使用pos;其他没有举例的字段使用精简的翻译作为字段的key;每个结构必须包含name和id。
|
6. 遥测帧为CADU,其中包含同步头和VCDU,按照习惯需要使用VCDU层级嵌套传输帧主导头、传输帧插入域、传输帧数据域、传输帧尾的结构。
|
7. 数据包字段包括:name、id、type、pos、length、children;参数字段包括:name、id、pos、type、length;必须包含pos和length字段。
|
8. 常用id参考:遥测(TM)、遥控(TC)、总线(BUS)、版本号(Ver)、应用过程标识(APID)。
|
9. 注意:一定要记得morkdown文档中会将一些特殊字符进行转义,以此来保证文档的正确性,这些转义符号(也就是反斜杠‘\’)不需要在结果中输出。
|
10. 以 JSON 格式组织输出内容,确保数据结构的完整性和可读性,注意:生成的JSON语法格式必须符合json规范,避免出现错误。
|
|
## 限制:
|
- id和code的命名规则:英文字母、数字、下划线组成,且以英文字母或下划线开头。
|
- 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。
|
- 不输出任何注释等描述性信息。
|
"""
|
|
# 模型调用
|
def call_model(self, msg: str, cache_file: str, doc: str, validation=None, try_cnt=3) -> str:
|
"""
|
调用大模型
|
:param msg: 问题
|
:param cache_file: 生成结果缓存文件
|
:param doc: 文档文本
|
:param validation: 校验函数,(text: str)-> None
|
:param try_cnt: 失败重试次数
|
:return: 生成的文本
|
"""
|
if USE_CACHE and os.path.isfile(cache_file):
|
with open(cache_file, 'r', encoding='utf-8') as f:
|
text = f.read()
|
else:
|
s = time.time()
|
messages = [SystemMessage(self.systemPrompt)]
|
for info in doc:
|
messages.append(HumanMessage(info))
|
prompt = ChatPromptTemplate.from_messages(messages)
|
chain = prompt | self.llm
|
# 去除多余的缩进
|
msg = textwrap.dedent(msg).strip()
|
resp = chain.invoke({"msg": msg})
|
text = resp.content
|
if validation:
|
try:
|
validation(text)
|
except BaseException as e:
|
Log.error(e)
|
if try_cnt <= 0:
|
raise RuntimeError('生成失败,重试次数太多,强制结束!')
|
return self.call_model(msg, cache_file, validation, try_cnt - 1)
|
if cache_file:
|
with open(cache_file, 'w', encoding='utf-8') as f:
|
f.write(text)
|
Log.info(f'耗时:{time.time() - s}')
|
return text
|
|
@staticmethod
|
def get_text_with_entity(entity_names: list[str]) -> str:
|
"""
|
根据实体词获取文档文本
|
:param entity_names: str - 实体词名称
|
:return: str - 文本内容
|
"""
|
return doc_dbh.get_text_with_entities(entity_names)
|
|
def run(self):
|
# 根据文档,生成结构化数据
|
self.handle_tm_structured_data()
|
self.handle_tc_structured_data()
|
|
# region start 遥测
|
def handle_tm_structured_data(self):
|
self.gen_project()
|
self.gen_device()
|
|
# 获取项目信息
|
def gen_project(self):
|
_msg = """
|
# 指令
|
根据文档内容分析型号信息,型号字段包括:名称和代号。
|
# 例子
|
{"name":"xxx","id":"xxx"}
|
"""
|
doc_text = self.get_text_with_entity(['系统概述'])
|
text = self.call_model(_msg, 'out/型号信息.json', doc_text)
|
self.project = json.loads(text)
|
Log.info('型号信息:' + self.project)
|
|
# 获取设备信息
|
def gen_device(self):
|
_msg = """
|
# 指令
|
我需要从文档提取设备列表信息,你要帮助我完成设备列表信息提取。
|
# 需求
|
输出分系统下的硬件产品(设备)列表,硬件产品名称一般会包含“管理单元”或者“接口单元”;
|
# 字段包括:
|
- 名称(name):设备名称;
|
- 代号(code):设备代号;
|
- 是否包含遥控遥测(hasTcTm):标识该硬件产品是否包含遥控遥测的功能,布尔值true或false;
|
- 是否包含温度量模拟量等数据的采集(hasTemperatureAnalog):标识该硬件产品是否包含温度量等信息的采集功能,布尔值true或false;
|
- 是否有总线硬件产品(hasBus):标识该设备是否属于总线硬件产品,是否有RT地址,布尔值true或false;
|
# 约束
|
- 如果没有代号则使用名称的英文缩写代替缩写长度不超过5个字符;
|
- 数据结构最外层为数组,数组元素为设备信息
|
- 仅输出JSON,不要输出JSON以外的任何字符。
|
# 例子
|
[
|
{
|
"name": "系统管理单元",
|
"code": "SMU",
|
"hasTcTm": true,
|
"hasTemperatureAnalog": false,
|
"hasBus": true
|
},
|
{
|
"name": "1553B总线",
|
"code": "1553",
|
"hasTcTm": true,
|
"hasTemperatureAnalog": true,
|
"hasBus": true
|
}
|
]
|
"""
|
doc_text = self.get_text_with_entity(['系统概述', '总线管理'])
|
text = self.call_model(_msg, 'out/设备列表.json', doc_text)
|
Log.info('设备列表:' + text)
|
|
self.devs = json.loads(text)
|
# 类SMU设备,包含遥测和遥控功能,名称结尾为“管理单元”
|
like_smu_devs = list(filter(lambda it: it['hasTcTm'] and it['name'].endswith('管理单元'), self.devs))
|
for dev in like_smu_devs:
|
self.gen_tm_frame(dev)
|
# 总线
|
hasBus = any(d['hasBus'] for d in self.devs)
|
if hasBus:
|
self.gen_bus()
|
|
def gen_tm_frame(self, dev):
|
# 插入域参数列表
|
insert_domain = self.gen_insert_domain_params(dev)
|
# VC源包格式
|
vc_pkt_fields = data_templates.vc_pkt_fields
|
# 获取虚拟信道 vc
|
self.vcs = self.gen_vc(dev)
|
for vc in self.vcs:
|
vc['children'] = []
|
vc['VCID'] = str(int(vc['VCID'], 2))
|
for field in vc_pkt_fields:
|
if field['name'] == '数据域':
|
field['children'] = []
|
vc['children'].append(dict(field))
|
|
def build_vcid_content(vcs):
|
_vcs = []
|
for _vc in vcs:
|
_vcs.append(_vc['name'] + ',' + _vc['VCID'])
|
return ' '.join(_vcs)
|
|
# VCID 字段内容
|
vcid_content = build_vcid_content(self.vcs)
|
# 遥测帧结构由模板生成,只需提供特定参数
|
tm_data = {
|
"vcidContent": vcid_content,
|
'insertDomain': insert_domain,
|
}
|
self.cadu = data_templates.get_tm_frame(tm_data)
|
|
# 获取vc源包
|
self.vc_pkts = self.gen_pkt_vc(dev)
|
# 获取源包列表
|
self.tm_pkts = self.gen_pkts(dev)
|
|
# 获取VC下面的遥测包数据
|
for vc in self.vcs:
|
# 此VC下的遥测包过滤
|
_vc_pkts = filter(lambda it: it['vcs'].__contains__(vc['id']), self.vc_pkts)
|
for _pkt in _vc_pkts:
|
# 判断遥测包是否有详细定义
|
if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], self.tm_pkts), None):
|
continue
|
# 获取包详情
|
_pkt = self.gen_pkt_details(_pkt['name'], _pkt['id'])
|
epdu = next(filter(lambda it: it['name'] == '数据域', vc['children']), None)
|
if epdu and _pkt:
|
_pkt['children'] = _pkt['datas']
|
_last_par = _pkt['children'][len(_pkt['children']) - 1]
|
_pkt['length'] = (_last_par['pos'] + _last_par['length'])
|
_pkt['pos'] = 0
|
if 'children' not in epdu:
|
epdu['children'] = []
|
# 添加解析规则后缀防止重复
|
_pkt['id'] = _pkt['id'] + '_' + vc['VCID']
|
# 给包名加代号前缀
|
if not _pkt['name'].startswith(_pkt['id']):
|
_pkt['name'] = _pkt['id'] + '_' + _pkt['name']
|
epdu['children'].append(_pkt)
|
apid_node = next(filter(lambda it: it['name'].__contains__('应用过程'), _pkt['headers']), None)
|
ser_node = next(filter(lambda it: it['name'] == '服务', _pkt['headers']), None)
|
sub_ser_node = next(filter(lambda it: it['name'] == '子服务', _pkt['headers']), None)
|
_pkt['vals'] = \
|
f"{apid_node['content']}/{int(ser_node['content'], 16)}/{int(sub_ser_node['content'], 16)}/"
|
# 重新计数起始偏移
|
self.compute_length_pos(self.cadu['children'])
|
|
def compute_length_pos(self, items: list):
|
length = 0
|
pos = 0
|
for child in items:
|
if 'children' in child:
|
self.compute_length_pos(child['children'])
|
child['pos'] = pos
|
if 'length' in child and isinstance(child['length'], int):
|
length = length + child['length']
|
pos = pos + child['length']
|
# node['length'] = length
|
|
def gen_insert_domain_params(self, dev):
|
_msg = """
|
# 指令
|
我需要从文档中提取插入域的参数列表,你要帮助我完成插入域参数列表的提取。
|
# 需求
|
分析文档,输出插入域的参数列表,将所有参数全部输出。
|
参数信息字段包括:name(参数名称)、id(参数代号)、pos(参数起始bit位置)、length(参数bit长度)、type(类型:para)。
|
注意:
|
1个字节的长度为8位,使用B0-B7来表示,请精确计算参数长度。
|
文档中位置描述信息可能存在跨字节的情况,例如:"Byte1_B6~Byte2_B0":表示从第1个字节的第7位到第2个字节的第1位,长度是3;"Byte27_B7~Byte28_B0":表示从第27个字节的第8位到第28个字节的第1位,长度是2。
|
# 约束
|
- 不要遗漏任何参数;
|
- 数据结构最外层为数组,数组元素为参数信息对象;
|
- 仅输出JSON文本。
|
# 例子
|
[
|
{
|
"name": "遥测模式字",
|
"id": "TMS215",
|
"pos": 0,
|
"length": 8,
|
"type": "para"
|
}
|
]
|
"""
|
|
def validation(gen_text):
|
params = json.loads(gen_text)
|
assert isinstance(params, list), '插入域参数列表数据结构最外层必须是数组'
|
assert len(params), '插入域参数列表不能为空'
|
|
doc_text = self.get_text_with_entity(['插入域'])
|
result = self.call_model(_msg, 'out/' + dev.code + '_插入域参数列表.json', doc_text,
|
validation)
|
Log.info('插入域参数列表:' + result)
|
return json.loads(result)
|
|
def gen_vc(self, dev):
|
_msg = """
|
# 指令
|
我需要从文档中提取虚拟信道列表,你要帮助我完成虚拟信道列表的提取。
|
# 需求
|
请分析文档中的遥测包格式以及遥测虚拟信道,输出遥测虚拟信道列表。
|
字段包括:id(虚拟信道代号)、name(虚拟信道名称)、VCID(虚拟信道VCID,二进制)、format(根据虚拟信道类型获取对应的数据包的格式的名称)
|
# 上下文
|
深入理解文档中描述的关系,例如:文档中描述了常规遥测是常规数据的下传信道,并且还描述了分系统常规遥测参数包就是实时遥测参数包,并且文档中对实时遥测参数包的格式进行了描述,所以常规遥测VC应该输出为:{"id": "1", "name": "常规遥测VC", "VCID": "0", "format": "实时遥测参数包"}
|
# 约束
|
- 数据结构最外层为数组,数组元素为虚拟信道信息;
|
- format:必须是数据包格式的名称;
|
- 仅输出JSON文本。
|
# 例子:
|
[
|
{
|
"id": "VC0",
|
"name": "空闲信道",
|
"VCID": "111111",
|
"format": "空闲包"
|
}
|
]
|
"""
|
|
def validation(gen_text):
|
vcs = json.loads(gen_text)
|
assert next(filter(lambda it: re.match('^[0-1]+$', it['VCID']), vcs)), '生成的VCID必须是二进制'
|
doc_text = self.get_text_with_entity(['虚拟信道定义'])
|
result = self.call_model(_msg, 'out/' + dev.code + '_虚拟信道.json', doc_text, validation)
|
Log.info('虚拟信道:' + result)
|
return json.loads(result)
|
|
def gen_pkt_vc(self, dev):
|
_msg = """
|
# 指令
|
我需要从文档中提取遥测源包信息,你要帮助我完成遥测源包信息的提取。
|
# 需求
|
根据”遥测源包下传时机定义“章节的内容输出各个遥测源包信息列表,顶级结构为数组元素为遥测源包,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs),下传时机(timeTags)。
|
# 约束
|
- 从”遥测源包下传时机定义“章节中提取遥测源包信息;
|
- 所属虚拟信道:必须是文档中描述的遥测虚拟信道代号(序号);
|
- 下传时机:与表格中定义的一致;
|
- 不要遗漏任何遥测源包。
|
# 例子:
|
[
|
{
|
"id": "PMS001",
|
"name": "数管数字量快速源包",
|
"vcs": ["VC1"],
|
"timeTags": ["实时"]
|
},
|
]
|
"""
|
|
def validation(gen_text):
|
pkts = json.loads(gen_text)
|
assert len(pkts), 'VC源包列表不能为空'
|
|
text = self.call_model(_msg, 'out/' + dev.code + '_遥测源包下传时机.json', ['遥测源包下传时机'], validation)
|
Log.info('遥测源包所属虚拟信道:' + text)
|
return json.loads(text)
|
|
def gen_pkts(self, dev):
|
_msg = """
|
# 指令
|
我需要从文档中提取遥测包数据,你要根据文档内容帮我完成遥测包数据的提取。
|
# 需求
|
输出文档中描述的遥测包列表,遥测包字段包括:名称(name)、代号(id)、是否有参数(hasParams)。
|
字段描述:
|
1.名称:遥测包的名称;
|
2.代号:遥测包的代号;
|
3.是否有参数:表示当前遥测包是否有参数列表,遥测包的参数表紧接着遥测包章节标题,如果章节标题后面省略了或者类似”详见xxx“则是没有参数表。
|
# 约束
|
- name:名称中不要包含代号,仅从文档中提取源包名称;
|
- hasParams:值为布尔值,true或false;
|
- 如果没有代号,使用遥测包名称的英文翻译代替;
|
- 如果没有名称用代号代替;
|
- 不要漏掉任何遥测包;
|
- 数据结构最外层为数组数组元素为遥测包,不包括遥测包下面的参数。
|
# 例子
|
[
|
{
|
"name": "数管数字量快速源包",
|
"id": "PMS001",
|
"hasParams": true
|
}
|
]
|
"""
|
result = self.call_model(_msg, 'out/' + dev.code + '_源包列表.json', ['这里是文档中抽取的内容'])
|
Log.info('遥测源包列表:' + result)
|
return json.loads(result)
|
|
def gen_pkt_details(self, pkt_name, pkt_id):
|
cache_file = f'out/数据包-{pkt_name}.json'
|
if not os.path.isfile(cache_file):
|
# 先问最后一个参数的字节位置
|
Log.info(f'遥测源包“{pkt_name}”信息:')
|
_msg = f"""
|
# 指令
|
我需要从文档中提取遥测源包的最后一个参数的bit位置和数据域参数个数,你要帮我完成参数bit位置和数据域参数个数的提取。
|
# 需求
|
输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包的最后一个参数的bit位置和数据域参数个数。
|
# 约束
|
- 遥测源包的内容在一个表格中定义,表格结束则包内容结束;
|
- 数据域中每一行对应一个参数;
|
- 不要跨表格提取;
|
- 字节位置中字节位置是从1开始的,bit位置是从0开始的;
|
- bit位置计算公式为:(N-1)*8+B,其中N是字节数,B是bit数;
|
- 仅输出json,不要输出其他任何字符。
|
# 例子:
|
{"last_par_pos":128, "par_num": 20}
|
"""
|
text = self.call_model(_msg, '', ['这里是文档中抽取的内容'])
|
result = json.loads(text)
|
last_par_pos = result['last_par_pos']
|
par_num = result['par_num']
|
|
_msg = f"""
|
# 指令
|
我需要从文档中提取遥测源包信息列表,你要帮我完成遥测源包信息列表的提取。
|
# 需求
|
输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包。
|
注意:最后一个参数的起始bit偏移位置为{last_par_pos}。
|
""" + """
|
遥测包字段包括:名称(name)、代号(id)、类型(type)、包头属性列表(headers)、数据域参数列表(datas),类型为 linear;
|
包头的属性的字段包括:名称(name)、代号(id)、位置(pos)、定义(content)、长度(length)、类型(type),类型为 para;
|
数据域参数字段包括:参数名称(name)、参数代号(id)、位置(pos)、长度(length)、字节顺序(byteOrder),类型为 para;
|
|
包头属性包括:包版本号、包类型、副导头标识、应用过程标识、序列标记、包序列计数、包长、服务、子服务。
|
包头属性的长度:包版本号(3)、包类型(1)、副导头标识(1)、应用过程标识(11)、序列标记(2)、包序列计数(14)、包长(16)、服务(8)、子服务(8)。
|
|
表格单元格合并说明:包格中存在单元格合并的情况,如果水平或垂直相邻的单元格内容一样那么这几个内容一样的单元格有可能是一个合并单元格在分析时应该当作合并单元格分析。
|
# 约束
|
- 代号命名规则:数字、英文字母和下划线组成且以英文字母和下划线开头;
|
- 如果没有名称用代号代替,如果没有代号用名称的英文翻译代替,翻译尽量简短;
|
- 如果有代号严格依照文档中的代号,文档中的代号如果不符合代号命名规则将特殊字符转换为下划线,例如:Rsv-1转换为Rsv_1;
|
- 你需要理解数据包的位置信息,由位置信息得到长度,并且将所有输出单位统一转换为 bits;
|
- pos字段:数值类型,从0开始计算,由长度(length)累加得到;
|
- 应用过程标识:如果不是十六进制转换为十六进制,转换完成后要验证是否正确,以0x开头,;
|
- 包头后面的每一行都对应一个参数,逐行输出参数,不要遗漏任何参数;
|
- 类似”保留(Rsv)“的行也要当参数生成;
|
- 重复的行也要生成;
|
- 注意包内容的范围,不要提取到其他包中的内容,包内容都在同一个表格中;
|
- 字节顺序:值为大端“B”,小端“L”,默认为“B”;
|
- 输出严格按照文档中的内容生成,不要创造文档中不存在的内容;
|
- 仅输出json,不要输出任何其他内容。
|
# 例子
|
{
|
"name": "数管缓变遥测包",
|
"id": "PMS003",
|
"type": "linear",
|
"headers": [
|
{
|
"name": "包标识",
|
"id": "packetIdentifier",
|
"pos": 0,
|
"content": "000",
|
"length": 8,
|
"type": "para"
|
}
|
],
|
"datas": [
|
{
|
"name": "XXX包",
|
"id": "XXX",
|
"pos": 0,
|
"length": 8,
|
"byteOrder": ""
|
}
|
]
|
"""
|
|
def validation(gen_text):
|
_pkt = json.loads(gen_text)
|
with open(f'out/tmp/{time.time()}.json', 'w') as f:
|
f.write(gen_text)
|
assert 'headers' in _pkt, '包结构中必须包含headers字段'
|
assert 'datas' in _pkt, '包结构中必须包含datas字段'
|
Log.info(f'参数个数:{len(_pkt["datas"])}')
|
# assert par_num == len(_pkt['datas']), f'数据域参数个数不对!预计{par_num}个,实际{len(_pkt["datas"])}'
|
assert last_par_pos == _pkt['datas'][-1]['pos'], '最后一个参数的字节位置不对!'
|
|
result = self.call_model(_msg, f'out/数据包-{pkt_name}.json', [], ['这里是文档中抽取的内容'], validation)
|
Log.info(f'数据包“{pkt_name}”信息:' + result)
|
pkt = json.loads(result)
|
else:
|
pkt = json.loads(utils.read_from_file(cache_file))
|
pkt_len = 0
|
for par in pkt['datas']:
|
par['pos'] = pkt_len
|
pkt_len += par['length']
|
pkt['length'] = pkt_len
|
return pkt
|
|
def gen_bus(self):
|
_msg = """
|
# 指令
|
我需要从文档中提取经总线的数据包列表,你要帮助我完成经总线的数据包列表的提取。
|
# 需求
|
请析文档,列出总线通信包传输约定中描述的所有数据包列表,
|
数据包字段包括:id(数据包代号)、name(数据包名称)、apid(16进制字符串)、service(服务子服务)、length(bit长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、
|
transSer(传输服务)、note(备注)、rtAddr(所属RT的地址十进制)、rt(所属rt名称)、throughBus(是否经过总线)、burst(是否突发)、transDirect(传输方向)。
|
# 约束
|
- frameNum:使用文档中的文本不要做任何转换;
|
- subAddr:值为“深度”、“平铺”、“数字”或null;
|
- 是否经过总线的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线;
|
- 传输服务分三种:SetData(置数)、GetData(取数)、DataBlock(数据块传输);
|
- 传输方向分”收“和”发“,传输服务如果是”取数“是”收“,如果是”数据块传输“则根据包所在的分系统以及表格的”传输方向“列进行判断,判断对于SMU来说是收还是发;
|
- 是否突发:根据表格中的”传输周期“列进行判断,如果填写了类似”突发“的文字表示是突发否则表示不是突发;
|
- 不要漏掉任何一个数据包;
|
- 数据结构最外层是数组,数组元素为数据包,以JSON格式输出,不要输出JSON以外的任何文本。
|
# 例子
|
[
|
{
|
"id": "PCS005",
|
"name": "总线管理(内部指令)",
|
"apid": "418",
|
"service": "(1, 2)",
|
"length": 1,
|
"interval": 1000,
|
"subAddr": null,
|
"frameNum": "1|2",
|
"transSer": "DataBlock",
|
"note": "",
|
"rtAddr": 28,
|
"rt": "数据接口单元XIU",
|
"throughBus": true,
|
"burst": true,
|
"transDirect": "发"
|
}
|
]
|
"""
|
|
def validation(gen_text):
|
json.loads(gen_text)
|
|
result = self.call_model(_msg, 'out/总线.json', ['这里是文档中抽取的内容'], validation)
|
Log.info('总线数据包:' + result)
|
|
pkts = json.loads(result)
|
# 筛选经总线的数据包
|
pkts = list(filter(lambda it: it['throughBus'], pkts))
|
# 筛选有apid的数据包
|
pkts = list(filter(lambda it: it['apid'], pkts))
|
|
pkts2 = []
|
# todo 这一步应该通过数据库筛选,数据库中已经有所有遥测包以及遥测包对应的定义段落文本
|
for pkt in pkts:
|
if self.pkt_in_tm_pkts(pkt["name"]):
|
pkts2.append(pkt)
|
for pkt in pkts2:
|
self.gen_pkt_details(pkt['name'], pkt['id'])
|
_pkt = self.gen_pkt_details(pkt['name'], pkt['id'])
|
if _pkt:
|
pkt['children'] = []
|
pkt['children'].extend(_pkt['datas'])
|
pkt['length'] = _pkt['length']
|
self.bus_pkts = pkts
|
|
def pkt_in_tm_pkts(self, pkt_name):
|
_msg = f"""
|
# 指令
|
我需要从文档中分析判读是否有某个遥测包的字段表描述,你要帮助我判断。
|
# 问题
|
文档中有遥测包“{pkt_name}”的字段表描述吗?
|
注意:遥测包的字段表紧接着遥测包章节标题,如果章节标题后面省略了或者详见xxx则是没有字段表描述。
|
# 约束
|
- 根据文档内容输出;
|
- 遥测包名称必须完全匹配;
|
- 输出“无”或“有”,不要输出其他任何内容。
|
# 例子
|
有
|
"""
|
text = self.call_model(_msg, f'out/pkts/有无数据包-{pkt_name}.txt', ['这里是文档中抽取的内容'])
|
Log.info(f'文档中有无“{pkt_name}”的字段描述:' + text)
|
return text == '有'
|
|
# endregion 遥测-end
|
|
# region start 遥控
|
def handle_tc_structured_data(self):
|
# 数据帧格式
|
self.tc_frame = self.gen_tc_transfer_frame_format()
|
# 遥控包格式
|
self.tc_pkt_format = self.gen_tc_pkt_format()
|
# 遥控包列表
|
self.tc_pkts = self.gen_tc_transfer_pkts()
|
for pkt in self.tc_pkts:
|
# 遥控包数据区内容
|
self.gen_tc_pkt_details(pkt)
|
|
def gen_tc_transfer_frame_format(self):
|
_msg = '''
|
# 指令
|
分析遥控传送帧格式,提取遥控传送帧格式的字段定义。
|
# 需求
|
要提取值的帧格式字段:
|
- 版本号:const,二进制,以B结尾;
|
- 通过标志:const,二进制,以B结尾;
|
- 控制命令标志:const,二进制,以B结尾;
|
- 空闲位:const,二进制,以B结尾;
|
- 航天器标识:const,十六进制,以0x开头;
|
- 虚拟信道标识:sendFlag,发送标记,默认为“任务注入帧”,所有的值都要列举出来;
|
# 数据类型
|
- const:固定码字,数值,二进制以B结尾,十进制,十六进制以0x开头;
|
- sendFlag:发送标记,类似枚举,定义样例:[{"n":"name","v":"value","c":"code","default":true}],n表示名称,v表示值,c表示code(没有空着),default表示是默认值;
|
# 约束
|
- 以JSON格式输出;
|
- 仅输出JSON文本,不要输出任何其他文本。
|
# 输出例子:
|
{
|
"版本号": "00B",
|
"通过标志": "0",
|
...
|
}
|
'''
|
|
def validation(gen_text):
|
json.loads(gen_text)
|
|
text = self.call_model(_msg, 'out/tc_transfer_frame.json', ['这里是文档中抽取的内容'], validation)
|
result: dict = json.loads(text)
|
format_text = utils.read_from_file('tpl/tc_transfer_frame.json')
|
format_text = utils.replace_tpl_paras(format_text, result)
|
frame = json.loads(format_text)
|
Log.info('遥控帧格式:' + format_text)
|
return frame
|
|
def gen_tc_pkt_format(self):
|
_msg = '''
|
# 指令
|
分析遥控包格式,提取遥控包格式的字段定义。
|
# 需求
|
要提取值的包格式字段:
|
- 包版本号: const,二进制;
|
- 包类型: const,二进制;
|
- 数据区头标志: const,二进制;
|
- 序列标志: const,二进制;
|
- 包长:length,
|
- 副导头标志: const,二进制;
|
- 遥控包版本号: const,二进制;
|
- 命令正确应答: const,二进制;
|
- 源地址: const,十六进制。
|
# 数据类型
|
- const:固定码字,数值,二进制以B结尾,十进制,十六进制以0x开头;
|
# 约束
|
- 以JSON格式输出;
|
- 仅输出JSON文本,不要输出任何其他文本。
|
# 输出例子:
|
{
|
"包版本号": "00B",
|
"包类型": "1B",
|
...
|
}
|
'''
|
|
def validation(gen_text):
|
json.loads(gen_text)
|
|
text = self.call_model(_msg, 'out/tc_transfer_pkt.json', ['这里是文档中抽取的内容'], validation)
|
result = json.loads(text)
|
|
format_text = utils.read_from_file('tpl/tc_pkt_format.json')
|
format_text = utils.replace_tpl_paras(format_text, result)
|
pkt_format = json.loads(format_text)
|
Log.info('遥控包格式:' + format_text)
|
return pkt_format
|
|
def gen_tc_transfer_pkts(self):
|
_msg = '''
|
# 指令
|
分析文档列出所有的遥控源包。
|
# 输出例子:
|
[{
|
"name": "xxx",
|
"code":"pkt",
|
"应用过程标识符":"0xAA",
|
"服务类型":"0x1",
|
"服务子类型":"0x2"
|
}]
|
'''
|
|
def validation(gen_text):
|
json.loads(gen_text)
|
|
text = self.call_model(_msg, 'out/tc_transfer_pkts.json', ['这里是文档中抽取的内容'], validation)
|
Log.info('遥控包列表:' + text)
|
return json.loads(text)
|
|
def gen_tc_pkt_details(self, pkt):
|
tc_name = pkt['name']
|
tc_code = pkt['code']
|
pkt['name'] = f'{tc_code} {tc_name}'
|
_msg = f"""
|
# 指令
|
分析文档,从文档中提取遥控指令名称为“{tc_name}”代号为“{tc_code}”的指令应用数据区定义。
|
""" + """
|
# 约束
|
- code 如果没有明确定义则使用名称的英文翻译,尽量简短;
|
- length 自动转换为bit长度,必须是数值或null,不能为0;
|
- value 根据字段描述提取;
|
- enums 有些字段是枚举值,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""};
|
- 输出内容必须为严格的json,不能输出除json以外的任何内容。
|
|
# 字段类型
|
- 固定码字:const,
|
- 长度:length,
|
- 枚举值:enum,
|
- 校验和:checkSum,
|
- 即时输入:input。
|
|
# 字段类型分析方法
|
- 根据字段描述分析字段的类型;
|
- 字段描述中明确指定了字段值的,类型为const;
|
- 字段中没有明确指定字段值,但是罗列了取值范围的,类型为enum;
|
- 字段如果是和“长度”有关,类型为length;
|
- 如果和数据域有关,类型为const;
|
- 字段如果和校验和有关,类型为checkSum。
|
|
# 输出例子:
|
[
|
{
|
"name": "para1",
|
"code": "para1",
|
"length": 8,
|
"type": "const",
|
"value": "0xAA"
|
}
|
...
|
]
|
"""
|
|
def validation(gen_text):
|
json.loads(gen_text)
|
|
result = self.call_model(_msg, f'out/遥控指令数据域-{tc_code}-{utils.to_file_name(tc_name)}.json',
|
['这里是文档中抽取的内容'], validation)
|
Log.info('遥控指令数据域:' + result)
|
|
# endregion 遥控-end
|