import os
|
import time
|
from datetime import datetime
|
|
from openai import OpenAI
|
import re
|
import json
|
|
import data_templates
|
from knowledgebase.db.db_helper import create_project, create_device, create_data_stream, \
|
update_rule_enc, create_extend_info, create_ref_ds_rule_stream, create_ins_format
|
from knowledgebase.db.data_creator import create_prop_enc, create_enc_pkt, get_data_ty, create_any_pkt
|
|
from knowledgebase.db.models import TProject
|
|
file_map = {
|
"文档合并": "./doc/文档合并.md",
|
"遥测源包设计报告": "./doc/XA-5D无人机分系统探测源包设计报告(公开).md",
|
"遥测大纲": "./doc/XA-5D无人机探测大纲(公开).md",
|
"总线传输通信帧分配": "./doc/XA-5D无人机1314A总线传输通信帧分配(公开).md",
|
"应用软件用户需求": "./doc/XA-5D无人机软件用户需求(公开).docx.md",
|
"指令格式": "./doc/ZL格式(公开).docx.md"
|
}
|
# file_map = {
|
# "遥测源包设计报告": "./docs/HY-4A数管分系统遥测源包设计报告 Z 240824 更改3(内部) .docx.md",
|
# "遥测大纲": "./docs/HY-4A卫星遥测大纲 Z 240824 更改3(内部).docx.md",
|
# "总线传输通信帧分配": "./docs/HY-4A卫星1553B总线传输通信帧分配 Z 240824 更改3(内部).docx.md",
|
# "应用软件用户需求": "./docs/HY-4A数管分系统应用软件用户需求(星务管理分册) Z 240831 更改4(内部).docx.md"
|
# }
|
# file_map = {
|
# "文档合并": "./doc/文档合并.md",
|
# "遥测源包设计报告": "./doc/XA-5D无人机分系统探测源包设计报告(公开).md",
|
# "遥测大纲": "./doc/XA-5D无人机探测大纲(公开).md",
|
# "总线传输通信帧分配": "./doc/XA-5D无人机1314A总线传输通信帧分配(公开).md"
|
# }
|
|
BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1'
|
API_KEY = 'sk-15ecf7e273ad4b729c7f7f42b542749e'
|
MODEL_NAME = 'qwen2.5-14b-instruct-1m'
|
|
# BASE_URL = 'http://10.74.15.164:11434/v1/'
|
# API_KEY = 'ollama'
|
# MODEL_NAME = 'qwen2.5:32b-128k'
|
|
# BASE_URL = 'http://10.74.15.164:1001/api'
|
# API_KEY = 'sk-a909385bc14d4491a718b6ee264c3227'
|
# MODEL_NAME = 'qwen2.5:32b-128k'
|
|
USE_CACHE = True
|
assistant_msg = """
|
# 角色
|
你是一个专业的文档通信分析师,擅长进行文档分析和通信协议分析,同时能够解析 markdown 类型的文档。拥有成熟准确的文档阅读与分析能力,能够妥善处理多文档间存在引用关系的复杂情况。
|
|
## 技能
|
### 技能 1:文档分析(包括 markdown 文档)
|
1. 当用户提供文档时,仔细阅读文档内容,严格按照文档中的描述提取关键信息,不得加入自己的回答或建议。
|
2. 分析文档的结构、主题和重点内容,同样只依据文档进行表述。
|
3. 如果文档间存在引用关系,梳理引用脉络,明确各文档之间的关联,且仅呈现文档中体现的内容。
|
|
### 技能 2:通信协议分析
|
1. 接收通信协议相关信息,理解协议的规则和流程,仅依据所给信息进行分析。
|
|
## 背景知识
|
###软件主要功能与运行机制总结如下:
|
1. 数据采集和处理:
|
DIU负责根据卫星的工作状态或模式提供遥测数据,包括模拟量(AN)、总线信号(BL)以及温度(TH)和数字量(DS),并将这些信息打包,通过总线发送给SMU。
|
SMU则收集硬通道上的遥测参数,并通过总线接收DIU采集的信息。
|
2. 多路复用与数据传输:
|
遥测源包被组织成E-PDU,进一步复用为M-PDU,并填充到VCDU中构成遥测帧。
|
利用CCSDS AOS CADU格式进行遥测数据的多路复用和传输。
|
3. 虚拟信道(VC)调度机制:
|
通过常规遥测VC、突发数据VC、延时遥测VC、记录数据VC以及回放VC实现不同类型的数据下传。
|
4. 遥控指令处理:
|
上行遥控包括直接指令和间接指令,需经过格式验证后转发给相应单机执行。
|
遥控帧通过特定的虚拟信道(VC)进行传输。
|
这些知识需要你记住,再后续的处理中可以帮助你理解要处理的数据。
|
|
## 目标导向
|
1. 通过对文档和通信协议的分析,为用户提供清晰、准确的数据结构,帮助用户更好地理解和使用相关信息。
|
|
## 规则
|
1. 每一个型号都会有一套文档,需准确判断是否为同一个型号的文档后再进行整体分析,每次只分析同一个型号的文档。
|
2. 大多数文档结构为:型号下包含设备,设备下包含数据流,数据流下包含数据帧,数据帧中有一块是包域,包域中会挂载各种类型的数据包。
|
3. 文档都是对于数据传输协议的描述,在数据流、数据帧、数据包等传输实体中都描述了各个字段的分布、各个字段的大小和位置等信息,且大小单位不统一,需理解这些单位,并将所有输出单位统一为 bits,长度字段使用 length 表示,位置字段使用 pos 表示,如果为变长使用“"变长"”表示。
|
4. 如果有层级,使用树形 JSON 输出,如果有子节点,子节点 key 使用children;需保证一次输出的数据结构统一,并且判断每个层级是什么类型,输出类型字段(type),类型字段的 key 使用 type,类型包括:型号(project)、设备(dev)、封装包(enc)、线性包(linear)、参数(para),封装包子级有数据包,所以type为enc,线性包子级只有参数,所以type为linear;每个层级都包含偏移位置(pos),每个层级的偏移位置从0开始。
|
5. 名称相关的字段的 key 使用name;代号、编号或者唯一标识相关的字段的key使用id,id由数字、英文字母、下划线组成且以英文字母开头,长度尽量简短;序号相关的字段的key使用number;偏移位置相关字段的key使用pos;其他没有举例的字段使用精简的翻译作为字段的key;每个结构必须包含name和id。
|
6. 遥测帧为CADU,其中包含同步头和VCDU,按照习惯需要使用VCDU层级嵌套传输帧主导头、传输帧插入域、传输帧数据域、传输帧尾的结构。
|
7. 数据包字段包括:name、id、type、pos、length、children;参数字段包括:name、id、pos、type、length;必须包含pos和length字段。
|
8. 常用id参考:遥测(TM)、遥控(TC)、总线(BUS)、版本号(Ver)、应用过程标识(APID)。
|
9. 注意:一定要记得morkdown文档中会将一些特殊字符进行转义,以此来保证文档的正确性,这些转义符号(也就是反斜杠‘\’)不需要在结果中输出。
|
10. 以 JSON 格式组织输出内容,确保数据结构的完整性和可读性,注意:生成的JSON语法格式必须符合json规范,避免出现错误。
|
|
## 限制:
|
- 所输出的内容必须按照JSON格式进行组织,不能偏离框架要求,且严格遵循文档内容进行输出,只输出 JSON ,不要输出其它文字。
|
- 不输出任何注释等描述性信息。
|
"""
|
|
g_completion = None
|
|
|
def read_from_file(cache_file):
|
with open(cache_file, 'r', encoding='utf-8') as f:
|
text = f.read()
|
return text
|
|
|
def save_to_file(text, file_path):
|
if USE_CACHE:
|
with open(file_path, 'w', encoding='utf-8') as f:
|
f.write(text)
|
|
|
json_pat = re.compile(r'```json(.*?)```', re.DOTALL)
|
|
|
def remove_markdown(text):
|
# 使用正则表达式提取json文本
|
try:
|
return json_pat.findall(text)[0]
|
except IndexError:
|
return text
|
|
|
def rt_pkt_map_gen(pkt, trans_ser, rt_pkt_map, pkt_id, vals):
|
# 逻辑封装包,数据块传输的只有一个,取数的根据RT地址、子地址和帧号划分
|
frame_num = pkt['frameNum']
|
if trans_ser == '数据块传输':
|
# 数据块传输根据RT地址和子地址划分
|
key = f'{pkt["rt"]}_{pkt["subAddr"]}'
|
name = f'{pkt["rt"]}_{pkt["subAddr"]}_{trans_ser}'
|
else:
|
# 取数根据RT地址、子地址和帧号划分
|
key = f'{pkt["rt"]}_{pkt["subAddr"]}_{pkt["frameNum"]}'
|
name = f'{pkt["rt"]}_{pkt["subAddr"]}_帧号{frame_num}_{trans_ser}'
|
#
|
if key not in rt_pkt_map:
|
rt_pkt_map[key] = {
|
"name": name,
|
"id": pkt_id,
|
"type": "logic",
|
"pos": 0,
|
"content": "CYCLEBUFFER,Message,28,0xFFFF",
|
"length": "",
|
"vals": vals,
|
"children": []
|
}
|
frame = f'{pkt["frameNum"]}'
|
|
interval = f'{pkt["interval"]}'.replace(".", "_")
|
if trans_ser == '取数':
|
_key = f'RT{pkt["rtAddr"]}Frame{frame.replace("|", "_")}_Per{interval}'
|
else:
|
# 数据块传输
|
if pkt['burst']:
|
_key = f'RT{pkt["rtAddr"]}FrameALL'
|
else:
|
_key = f'RT{pkt["rtAddr"]}Frame{frame}Per{interval}'
|
|
_pkt = next(filter(lambda it: it['name'] == _key, rt_pkt_map[key]['children']), None)
|
if _pkt is None:
|
ext_info = None
|
if trans_ser == '数据块传输' and not pkt['burst']:
|
# 数据块传输且有周期的包需要
|
ext_info = [{"id": "PeriodTriger", "name": "时分复用总线触发属性", "val": f"{pkt['interval']}"},
|
{"id": "FrameNumber", "name": "时分复用协议帧号", "val": frame}]
|
_pkt = {
|
"name": _key,
|
"id": _key,
|
"type": "enc",
|
"pos": 0,
|
"content": "1:N;EPDU",
|
"length": "length",
|
"extInfo": ext_info,
|
"children": [
|
{
|
"id": "C02_ver",
|
"name": "遥测版本",
|
"type": "para",
|
"pos": 0,
|
"length": 3,
|
"dataTy": "INVAR",
|
"content": "0"
|
},
|
{
|
"id": "C02_type",
|
"name": "类型",
|
"type": "para",
|
"pos": 3,
|
"length": 1,
|
"dataTy": "INVAR",
|
"content": "0"
|
},
|
{
|
"id": "C02_viceHead",
|
"name": "副导头标识",
|
"type": "para",
|
"pos": 4,
|
"length": 1,
|
"content": "1",
|
"dataTy": "INVAR"
|
},
|
{
|
"id": "C02_PackSign",
|
"name": "APID",
|
"type": "para",
|
"pos": 5,
|
"length": 11,
|
"is_key": True,
|
"dataTy": "ENUM"
|
},
|
{
|
"id": "C02_SerCtr_1",
|
"name": "序列标记",
|
"type": "para",
|
"pos": 16,
|
"length": 2,
|
"content": "3"
|
},
|
{
|
"id": "C02_SerCtr_2",
|
"name": "包序计数",
|
"type": "para",
|
"pos": 18,
|
"length": 14,
|
"content": "0:167772:1",
|
"dataTy": "INCREASE"
|
},
|
{
|
"id": "C02_PackLen",
|
"name": "包长",
|
"type": "para",
|
"pos": 32,
|
"length": 16,
|
"content": "1Bytes/C02_Data.length+1",
|
"dataTy": "LEN"
|
},
|
{
|
"id": "C02_Ser",
|
"name": "服务",
|
"type": "para",
|
"pos": 48,
|
"length": 8,
|
"is_key": True,
|
"dataTy": "ENUM"
|
},
|
{
|
"id": "C02_SubSer",
|
"name": "子服务",
|
"type": "para",
|
"pos": 56,
|
"length": 8,
|
"is_key": True,
|
"dataTy": "ENUM"
|
},
|
{
|
"id": "C02_Data",
|
"name": "数据区",
|
"type": "linear",
|
"pos": 64,
|
"length": 'length-current',
|
"children": []
|
},
|
]
|
}
|
rt_pkt_map[key]['children'].append(_pkt)
|
# 数据区下面的包
|
data_area = next(filter(lambda it: it['name'] == '数据区', _pkt['children']), None)
|
ser_sub_ser: str = pkt['service']
|
ser = ''
|
sub_ser = ''
|
if ser_sub_ser:
|
nums = re.findall(r'\d+', ser_sub_ser)
|
if len(nums) == 2:
|
ser = nums[0]
|
sub_ser = nums[1]
|
if 'children' not in pkt:
|
pkt['children'] = []
|
p_name = pkt['id'] + '_' + pkt['name']
|
|
data_area['children'].append({
|
"name": p_name,
|
"id": pkt["id"],
|
"type": "linear",
|
"pos": 0,
|
"length": pkt["length"],
|
"vals": f"0x{pkt['apid']}/{ser}/{sub_ser}/",
|
"children": pkt['children'],
|
})
|
|
|
def build_vcid_content(vcs):
|
_vcs = []
|
for vc in vcs:
|
_vcs.append(vc['name'] + ',' + vc['VCID'])
|
return ' '.join(_vcs)
|
|
|
class DbStructFlow:
|
# 工程
|
proj: TProject = None
|
# 遥测源包列表,仅包名称、包id和hasParams
|
tm_pkts = []
|
# vc源包
|
vc_pkts = []
|
|
def __init__(self):
|
self.client = OpenAI(
|
api_key=API_KEY,
|
base_url=BASE_URL,
|
# api_key="ollama",
|
# base_url="http://192.168.1.48:11434/v1/",
|
)
|
|
def run(self):
|
# 生成型号结构
|
# 生成设备结构
|
# 生成数据流结构 CADU
|
# 生成VCDU结构
|
# 生成遥测数据包结构
|
self.proj = self.gen_project()
|
|
devs = self.gen_device(self.proj)
|
|
# self.gen_tc()
|
return ''
|
|
def _gen(self, msgs, msg, files=None):
|
if files is None:
|
files = [file_map['文档合并']]
|
messages = [] if msgs is None else msgs
|
doc_text = ''
|
for file in files:
|
doc_text += '\n' + read_from_file(file)
|
if len(messages) == 0:
|
# 如果是第一次提问加入system消息
|
messages.append({'role': 'system', 'content': assistant_msg})
|
messages.append({'role': 'user', 'content': "以下是文档内容:\n" + doc_text})
|
messages.append({'role': 'user', 'content': msg})
|
|
completion = self.client.chat.completions.create(
|
model=MODEL_NAME,
|
messages=messages,
|
stream=True,
|
temperature=0.0,
|
top_p=0,
|
timeout=30 * 60000,
|
max_completion_tokens=1000000,
|
seed=0
|
# stream_options={"include_usage": True}
|
)
|
g_completion = completion
|
text = ''
|
for chunk in completion:
|
if chunk.choices[0].delta.content is not None:
|
text += chunk.choices[0].delta.content
|
print(chunk.choices[0].delta.content, end="")
|
print("")
|
g_completion = None
|
return text
|
|
def generate_text(self, msg, cache_file, msgs=None, files=None, validation=None, try_cnt=5):
|
if msgs is None:
|
msgs = []
|
if USE_CACHE and os.path.isfile(cache_file):
|
text = read_from_file(cache_file)
|
else:
|
s = time.time()
|
text = self._gen(msgs, msg, files)
|
text = remove_markdown(text)
|
if validation:
|
try:
|
validation(text)
|
except BaseException as e:
|
print(e)
|
if try_cnt <= 0:
|
raise RuntimeError('生成失败,重试次数太多,强制结束!')
|
return self.generate_text(msg, cache_file, msgs, files, validation, try_cnt - 1)
|
save_to_file(text, cache_file)
|
print(f'耗时:{time.time() - s}')
|
return text
|
|
def generate_tc_text(self, msg, cache_file, messages=None, files=None, validation=None, try_cnt=5):
|
if messages is None:
|
messages = []
|
doc_text = ''
|
for file in files:
|
doc_text += '\n' + read_from_file(file)
|
if len(messages) == 0:
|
# 如果是第一次提问加入system消息
|
messages.append({'role': 'user', 'content': "以下是文档内容:\n" + doc_text})
|
return self.generate_text(msg, cache_file, messages, files, validation, try_cnt)
|
|
def gen_project(self):
|
# _msg = """
|
# 根据文档输出型号信息,型号字段包括:名称和代号。仅输出型号这一级。
|
# 例如:{"name":"xxx","id":"xxx"}
|
# """
|
# print('型号信息:')
|
# text = self.generate_text(_msg, 'out/型号信息.json', files=[file_map['应用软件用户需求']])
|
# proj_dict = json.loads(text)
|
# 工程信息从系统获取
|
proj_dict = {
|
"id": "JB200001",
|
"name": "HY-4A"
|
}
|
code = proj_dict['id']
|
name = proj_dict['name']
|
proj = create_project(code, name, code, name, "", datetime.now())
|
return proj
|
|
def gen_device(self, proj):
|
"""
|
设备列表生成规则:
|
1.如文档中有1553协议描述,加入1553设备
|
2.如是类SMU软件(遥测遥控包含,BC或者RT),加入对应相关设备,文档只有设备名称和设备ID,设备类型90%是标准类型
|
3.如是类RTU软件,加入对应相关设备,文档里面有设备名称和设备ID,同上
|
4.如基于软平台,如是SMU软件,加入SMU工控机设备,待定
|
|
设备类型:工控机[0]、1553B[1]
|
|
:param proj:
|
:return:
|
"""
|
proj_pk = proj.C_PROJECT_PK
|
devices = []
|
|
_msg = f"""
|
输出分系统下的硬件产品(设备)列表,字段包括:名称(name)、代号(code),硬件产品名称一般会包含“管理单元”或者“接口单元”,如果没有代号则使用名称的英文缩写代替缩写长度不超过5个字符;
|
并且给每个硬件产品增加三个字段:第一个字段hasTcTm“是否包含遥控遥测”,判断该硬件产品是否包含遥控遥测的功能、
|
第二个字段hasTemperatureAnalog“是否包含温度量、模拟量等数据的采集”,判断该硬件产品是否包含温度量等信息的采集功能、
|
第三个字段hasBus“是否是总线硬件产品”,判断该设备是否属于总线硬件产品,是否有RT地址;每个字段的值都使用true或false来表示。
|
仅输出JSON,结构最外层为数组,数组元素为设备信息,不要输出JSON以外的任何字符。
|
"""
|
print('设备列表:')
|
cache_file = 'out/设备列表.json'
|
|
def validation(gen_text):
|
_devs = json.loads(gen_text)
|
assert isinstance(_devs, list), '数据结构最外层不是数组'
|
assert next(filter(lambda it: it['name'].endswith('管理单元'), _devs), None), '生成的设备列表中没有管理单元'
|
|
text = self.generate_text(_msg, cache_file, files=[file_map['应用软件用户需求']], validation=validation)
|
devs = json.loads(text)
|
|
# 类SMU设备,包含遥测和遥控功能,名称结尾为“管理单元”
|
like_smu_devs = list(filter(lambda it: it['hasTcTm'] and it['name'].endswith('管理单元'), devs))
|
for dev in like_smu_devs:
|
dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK)
|
devices.append(dev)
|
# 创建数据流
|
ds_tmfl, rule_stream, _ = create_data_stream(proj_pk, dev.C_DEV_PK, 'AOS遥测', 'TMF1', 'TMFL', '1', 'TMF1',
|
'001')
|
self.gen_tm_frame(proj_pk, rule_stream.C_RULE_PK, ds_tmfl, rule_stream.C_PATH)
|
# ds_tcfl, rule_stream, _ = create_data_stream(proj_pk, dev.C_DEV_PK, '遥控指令', 'TCFL', 'TCFL', '0', 'TCFL',
|
# '006')
|
|
hasBus = any(d['hasBus'] for d in devs)
|
if hasBus:
|
# 总线设备
|
dev = create_device("1553", "1553总线", '1', 'StandardProCommunicationDev', proj_pk)
|
create_extend_info(proj_pk, "BusType", "总线类型", "ECSS_Standard", dev.C_DEV_PK)
|
devices.append(dev)
|
# 创建数据流
|
ds_u153, rs_u153, rule_enc = create_data_stream(proj_pk, dev.C_DEV_PK, '上行总线数据', 'U15E', 'B153',
|
'0', '1553', '001')
|
# 创建总线结构
|
self.gen_bus(proj_pk, rule_enc, '1553', ds_u153, rs_u153.C_PATH, dev.C_DEV_NAME)
|
ds_d153, rule_stream, rule_enc = create_data_stream(proj_pk, dev.C_DEV_PK, '下行总线数据', 'D15E', 'B153',
|
'1', '1553', '001', rs_u153.C_RULE_PK)
|
create_ref_ds_rule_stream(proj_pk, rule_stream.C_STREAM_PK, rule_stream.C_STREAM_ID,
|
rule_stream.C_STREAM_NAME, rule_stream.C_STREAM_DIR, rs_u153.C_STREAM_PK)
|
# 类RTU设备,包含温度量和模拟量功能,名称结尾为“接口单元”
|
# like_rtu_devs = list(filter(lambda it: it['hasTemperatureAnalog'] and it['name'].endswith('接口单元'), devs))
|
# for dev in like_rtu_devs:
|
# dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK)
|
|
# for dev in like_rtu_devs:
|
# dev = create_device(dev['code'], dev['name'], '0', '', proj.C_PROJECT_PK)
|
# devices.append(dev)
|
# # 创建数据流
|
# ds_tmfl = create_data_stream(proj.C_PROJECT_PK, '温度量', 'TMFL', 'TMFL', '1', 'TMFL', '001')
|
# ds_tcfl = create_data_stream(proj.C_PROJECT_PK, '模拟量', 'TCFL', 'TCFL', '0', 'TCFL', '006')
|
|
return devices
|
|
def gen_insert_domain_params(self):
|
_msg = """
|
分析文档,输出插入域的参数列表,将所有参数全部输出,不要有遗漏。
|
数据结构最外层为数组,数组元素为参数信息对象,参数信息字段包括:name、id、pos、length、type。
|
1个字节的长度为8位,使用B0-B7来表示,请认真计算参数长度。
|
文档中位置描述信息可能存在跨字节的情况,,例如:"Byte1_B6~Byte2_B0":表示从第1个字节的第7位到第2个字节的第1位,长度是3;"Byte27_B7~Byte28_B0":表示从第27个字节的第8位到第28个字节的第1位,长度是2。
|
"""
|
print('插入域参数列表:')
|
files = [file_map['遥测大纲']]
|
|
def validation(gen_text):
|
params = json.loads(gen_text)
|
assert isinstance(params, list), '插入域参数列表数据结构最外层必须是数组'
|
assert len(params), '插入域参数列表不能为空'
|
|
text = self.generate_text(_msg, './out/插入域参数列表.json', files=files, validation=validation)
|
return json.loads(text)
|
|
def gen_tm_frame_data(self):
|
_msg = """
|
"""
|
files = [file_map['遥测大纲']]
|
|
def validation(gen_text):
|
pass
|
|
def gen_tm_frame(self, proj_pk, rule_pk, ds, name_path):
|
# 插入域参数列表
|
insert_domain = self.gen_insert_domain_params()
|
|
# VC源包格式
|
vc_pkt_fields = data_templates.vc_pkt_fields # self.gen_pkt_format()
|
|
# 获取虚拟信道 vc
|
vcs = self.gen_vc()
|
for vc in vcs:
|
vc['children'] = []
|
vc['VCID'] = str(int(vc['VCID'], 2))
|
for field in vc_pkt_fields:
|
if field['name'] == '数据域':
|
field['children'] = []
|
vc['children'].append(dict(field))
|
|
# VCID 字段内容
|
vcid_content = build_vcid_content(vcs)
|
|
# 遥测帧结构由模板生成,只需提供特定参数
|
tm_data = {
|
"vcidContent": vcid_content,
|
'insertDomain': insert_domain,
|
}
|
cadu = data_templates.get_tm_frame(tm_data)
|
|
# VC源包
|
self.vc_pkts = self.gen_pkt_vc()
|
# 遥测源包设计中的源包列表
|
self.tm_pkts = self.gen_pkts()
|
|
# 处理VC下面的遥测包数据
|
for vc in vcs:
|
# 此VC下的遥测包过滤
|
_vc_pkts = filter(lambda it: it['vcs'].__contains__(vc['id']), self.vc_pkts)
|
for _pkt in _vc_pkts:
|
# 判断遥测包是否有详细定义
|
if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], self.tm_pkts), None):
|
continue
|
# 获取包详情
|
_pkt = self.gen_pkt_details(_pkt['name'], _pkt['id'])
|
epdu = next(filter(lambda it: it['name'] == '数据域', vc['children']), None)
|
if epdu and _pkt:
|
_pkt['children'] = _pkt['datas']
|
_last_par = _pkt['children'][len(_pkt['children']) - 1]
|
_pkt['length'] = (_last_par['pos'] + _last_par['length'])
|
_pkt['pos'] = 0
|
if 'children' not in epdu:
|
epdu['children'] = []
|
# 添加解析规则后缀防止重复
|
_pkt['id'] = _pkt['id'] + '_' + vc['VCID']
|
# 给包名加代号前缀
|
if not _pkt['name'].startswith(_pkt['id']):
|
_pkt['name'] = _pkt['id'] + '_' + _pkt['name']
|
epdu['children'].append(_pkt)
|
apid_node = next(filter(lambda it: it['name'].__contains__('应用过程'), _pkt['headers']), None)
|
ser_node = next(filter(lambda it: it['name'] == '服务', _pkt['headers']), None)
|
sub_ser_node = next(filter(lambda it: it['name'] == '子服务', _pkt['headers']), None)
|
_pkt['vals'] = \
|
f"{apid_node['content']}/{int(ser_node['content'], 16)}/{int(sub_ser_node['content'], 16)}/"
|
|
# 重新计数起始偏移
|
self.compute_length_pos(cadu['children'])
|
|
# 将数据插入数据库
|
seq = 1
|
for cadu_it in cadu['children']:
|
if cadu_it['name'] == 'VCDU':
|
# VCDU
|
# 将信道替换到数据域位置
|
vc_data = next(filter(lambda it: it['name'].__contains__('数据域'), cadu_it['children']), None)
|
if vc_data:
|
idx = cadu_it['children'].index(vc_data)
|
cadu_it['children'].pop(idx)
|
for vc in vcs:
|
# 处理虚拟信道属性
|
vc['type'] = 'logic'
|
vc['length'] = vc_data['length']
|
vc['pos'] = vc_data['pos']
|
vc['content'] = 'CCSDSMPDU'
|
vcid = vc['VCID']
|
vc['condition'] = f'VCID=={vcid}'
|
# 将虚拟信道插入到VCDU
|
cadu_it['children'].insert(idx, vc)
|
idx += 1
|
for vc in vcs:
|
self.compute_length_pos(vc['children'])
|
|
# 设置VCID的content
|
vcid_node = next(filter(lambda it: it['name'].__contains__('VCID'), cadu_it['children']), None)
|
if vcid_node:
|
vcid_node['content'] = vcid_content
|
|
create_enc_pkt(proj_pk, rule_pk, cadu_it, rule_pk, seq, name_path, ds, '001', 'ENC')
|
else:
|
# 参数
|
create_prop_enc(proj_pk, rule_pk, cadu_it, get_data_ty(cadu_it), seq)
|
seq += 1
|
|
return cadu
|
|
def gen_vc(self):
|
_msg = """
|
请分析文档中的遥测包格式,输出遥测虚拟信道的划分,数据结构最外层为数组,数组元素为虚拟信道信息字典,字典包含以下键值对:
|
id: 虚拟信道代号
|
name: 虚拟信道名称
|
VCID: 虚拟信道VCID(二进制)
|
format: 根据虚拟信道类型获取对应的数据包的格式的名称
|
深入理解文档中描述的关系,例如:文档中描述了常规遥测是常规数据的下传信道,并且还描述了分系统常规遥测参数包就是实时遥测参数包,并且文档中对实时遥测参数包的格式进行了描述,所以常规遥测VC应该输出为:{"id": "1", "name": "常规遥测VC", "VCID": "0", "format": "实时遥测参数包"}
|
"""
|
|
def validation(gen_text):
|
vcs = json.loads(gen_text)
|
assert next(filter(lambda it: re.match('^[0-1]+$', it['VCID']), vcs)), '生成的VCID必须是二进制'
|
|
print('虚拟信道:')
|
text = self.generate_text(_msg, "out/虚拟信道.json", files=[file_map['遥测大纲']], validation=validation)
|
vcs = json.loads(text)
|
return vcs
|
|
def gen_dev_pkts(self):
|
_msg = f"""
|
输出文档中遥测源包类型定义描述的设备以及设备下面的遥测包,数据结构:最外层为数组 > 设备 > 遥测包列表(pkts),设备字段包括:名称(name)、代号(id),源包字段包括:名称(name)、代号(id)
|
"""
|
print('设备遥测源包信息:')
|
files = [file_map["遥测源包设计报告"]]
|
text = self.generate_text(_msg, 'out/设备数据包.json', [], files)
|
dev_pkts = json.loads(text)
|
return dev_pkts
|
|
def pkt_in_tm_pkts(self, pkt_name):
|
cache_file = f'out/数据包-{pkt_name}.json'
|
if os.path.isfile(cache_file):
|
return True
|
files = [file_map['遥测源包设计报告']]
|
print(f'文档中有无“{pkt_name}”的字段描述:', end='')
|
_msg = f"""
|
文档中有遥测包“{pkt_name}”的字段表描述吗?遥测包名称必须完全匹配。输出:“无”或“有”,不要输出其他任何内容。
|
注意:遥测包的字段表紧接着遥测包章节标题,如果章节标题后面省略了或者详见xxx则是没有字段表描述。
|
根据文档内容输出。"""
|
text = self.generate_text(_msg, f'out/pkts/有无数据包-{pkt_name}.txt', [], files)
|
return text == '有'
|
|
def gen_pkt_details(self, pkt_name, pkt_id):
|
cache_file = f'out/数据包-{pkt_name}.json'
|
files = [file_map['遥测源包设计报告']]
|
if not os.path.isfile(cache_file):
|
_msg = f"""
|
输出文档中描述的名称为“{pkt_name}”代号为“{pkt_id}”遥测包;
|
遥测包字段包括:名称(name)、代号(id)、类型(type)、包头属性列表(headers)、数据域参数列表(datas),类型为 linear;
|
包头属性字段包括:名称(name)、代号(id)、位置(pos)、定义(content)、长度(length)、类型(type),类型为 para;
|
数据域参数字段包括:参数名称(name)、参数代号(id)、位置(pos)、长度(length)、字节顺序(byteOrder),类型为 para;
|
如果没有名称用代号代替,如果没有代号用名称的英文翻译代替,翻译尽量简短;
|
你需要理解数据包的位置信息,并且将所有输出单位统一转换为 bits,位置字段的输出格式必须为数值类型;
|
数据结构仅只包含遥测包,仅输出json,不要输出任何其他内容。"""
|
print(f'遥测源包“{pkt_name}”信息:')
|
|
def validation(gen_text):
|
_pkt = json.loads(gen_text)
|
assert 'headers' in _pkt, '包结构中必须包含headers字段'
|
assert 'datas' in _pkt, '包结构中必须包含datas字段'
|
|
text = self.generate_text(_msg, cache_file, [], files, validation)
|
pkt = json.loads(text)
|
else:
|
pkt = json.loads(read_from_file(cache_file))
|
pkt_len = 0
|
for par in pkt['datas']:
|
par['pos'] = pkt_len
|
pkt_len += par['length']
|
pkt['length'] = pkt_len
|
return pkt
|
|
def gen_pkts(self):
|
_msg = f"""
|
输出文档中描述的遥测包。
|
遥测包字段包括:名称(name)、代号(id)、hasParams,
|
名称中不要包含代号,
|
hasParams表示当前遥测包是否有参数列表,遥测包的参数表紧接着遥测包章节标题,如果章节标题后面省略了或者详见xxx则是没有参数表,
|
如果没有代号用名称的英文翻译代替,如果没有名称用代号代替,
|
数据结构最外层为数组数组元素为遥测包,不包括遥测包下面的参数。
|
"""
|
print(f'遥测源包列表:')
|
files = [file_map['遥测源包设计报告']]
|
text = self.generate_text(_msg, 'out/源包列表.json', [], files)
|
pkt = json.loads(text)
|
return pkt
|
|
def gen_pkt_vc(self):
|
_msg = f"""
|
根据遥测源包下传时机定义,输出各个遥测源包信息列表,顶级结构为数组元素为遥测源包,源包字段包括:包代号(id),名称(name),所属虚拟信道(vcs),下传时机(timeTags)
|
"""
|
files = [file_map['遥测大纲']]
|
print('遥测源包所属虚拟信道:')
|
|
def validation(gen_text):
|
pkts = json.loads(gen_text)
|
assert len(pkts), 'VC源包列表不能为空'
|
|
text = self.generate_text(_msg, 'out/遥测VC源包.json', files=files, validation=validation)
|
pkt_vcs = json.loads(text)
|
return pkt_vcs
|
|
def gen_pkt_format(self):
|
_msg = f"""
|
请仔细分系文档,输出各个数据包的格式,数据结构最外层为数组,数组元素为数据包格式,将主导头的子级提升到主导头这一级并且去除主导头,数据包type为logic,包数据域type为any。
|
包格式children包括:版本号(id:Ver)、类型(id:TM_Type)、副导头标志(id:Vice_Head)、应用过程标识符(id:Proc_Sign)、分组标志(id:Group_Sign)、包序列计数(id:Package_Count)、包长(id:Pack_Len)、数据域(id:EPDU_DATA)。
|
children元素的字段包括:name、id、pos、length、type
|
注意:生成的JSON语法格式要合法。
|
"""
|
print('遥测包格式:')
|
text = self.generate_text(_msg, 'out/数据包格式.json', files=[file_map['遥测大纲']])
|
pkt_formats = json.loads(text)
|
return pkt_formats
|
|
def compute_length_pos(self, items: list):
|
length = 0
|
pos = 0
|
for child in items:
|
if 'children' in child:
|
self.compute_length_pos(child['children'])
|
child['pos'] = pos
|
if 'length' in child and isinstance(child['length'], int):
|
length = length + child['length']
|
pos = pos + child['length']
|
# node['length'] = length
|
|
def gen_bus(self, proj_pk, rule_enc, rule_id, ds, name_path, dev_name):
|
_msg = f"""
|
请析文档,列出总线通信包传输约定中描述的所有数据包列表,
|
数据包字段包括:id、name、apid(16进制字符串)、service(服务子服务)、length(bit长度)、interval(传输周期)、subAddr(子地址/模式)、frameNum(通信帧号)、
|
transSer(传输服务)、note(备注)、rtAddr(所属RT的地址十进制)、rt(所属rt名称)、throughBus(是否经过总线)、burst(是否突发)、transDirect(传输方向),
|
数据结构最外层是数组,数组元素为数据包,以JSON格式输出,不要输出JSON以外的任何文本。
|
通信帧号:使用文档中的文本不要做任何转换。
|
subAddr:值为“深度”、“平铺”、“数字”或null。
|
是否经过总线的判断依据:“备注”列填写了内容类似“不经过总线”的文字表示不经过总线否则经过总线。
|
传输服务分三种:SetData(置数)、GetData(取数)、DataBlock(数据块传输)。
|
传输方向分:”收“和”发“,传输服务如果是”取数“是”收“,如果是”数据块传输“则根据包所在的分系统以及表格的”传输方向“列进行判断,判断对于SMU来说是收还是发。
|
是否突发的判断依据:根据表格中的”传输周期“列进行判断,如果填写了类似”突发“的文字表示是突发否则表示不是突发。
|
"""
|
print('总线数据包:')
|
|
def validation(gen_text):
|
json.loads(gen_text)
|
|
text = self.generate_text(_msg, 'out/总线.json', files=[file_map['总线传输通信帧分配']], validation=validation)
|
pkts = json.loads(text)
|
# 筛选经总线的数据包
|
pkts = list(filter(lambda it: it['throughBus'], pkts))
|
no_apid_pkts = list(filter(lambda it: not it['apid'], pkts))
|
# 筛选有apid的数据包
|
pkts = list(filter(lambda it: it['apid'], pkts))
|
|
pkts2 = []
|
for pkt in pkts:
|
if self.pkt_in_tm_pkts(pkt["name"]):
|
pkts2.append(pkt)
|
for pkt in pkts2:
|
_pkt = self.gen_pkt_details(pkt['name'], pkt['id'])
|
if _pkt:
|
pkt['children'] = []
|
pkt['children'].extend(_pkt['datas'])
|
pkt['length'] = _pkt['length']
|
rt_pkt_map = {}
|
for pkt in pkts:
|
# 根据数据块传输和取数分组
|
# 逻辑封装包的解析规则ID:RT[rt地址]SUB[子地址]S(S代表取数,方向是AA表示发送;R代表置数,方向是BB表示接受)
|
# 取数:逻辑封装包根据子地址和帧号组合创建,有几个组合就创建几个逻辑封装包
|
# 数据块:只有一个逻辑封装包
|
|
# 处理子地址
|
if pkt['burst']:
|
# 突发包子地址是18~26
|
pkt['subAddr'] = 26
|
elif pkt['subAddr'] == '平铺' or pkt['subAddr'] is None:
|
# 平铺:11~26,没有填写的默认为平铺
|
pkt['subAddr'] = 26
|
elif pkt['subAddr'] == '深度':
|
# 深度:11
|
pkt['subAddr'] = 11
|
|
# 处理帧号
|
if pkt['burst']:
|
# 突发:ALL
|
pkt['frameNum'] = 'ALL'
|
elif not pkt['frameNum']:
|
# 有
|
pkt['frameNum'] = ''
|
|
# todo: 处理传输方向
|
|
rt_addr = pkt['rtAddr']
|
sub_addr = pkt['subAddr']
|
trans_ser = pkt['transSer']
|
|
frame_no = pkt['frameNum'].replace('|', ',')
|
|
if trans_ser == 'GetData':
|
# 取数
|
pkt_id = f"RT{rt_addr}SUB{sub_addr}"
|
vals = f"{rt_addr}/{sub_addr}/0xAA/{frame_no}/"
|
rt_pkt_map_gen(pkt, '取数', rt_pkt_map, pkt_id, vals)
|
elif trans_ser == 'DataBlock':
|
# 数据块
|
direct = '0xAA'
|
rt_pkt_map_gen(pkt, '数据块传输', rt_pkt_map, f"RT{rt_addr}SUB{sub_addr}{direct}",
|
f"{rt_addr}/{sub_addr}/{direct}/ALL/")
|
_pkts = []
|
for k in rt_pkt_map:
|
_pkts.append(rt_pkt_map[k])
|
|
bus_items = data_templates.get_bus_datas(_pkts)
|
seq = 1
|
sub_key_nodes = list(filter(lambda it: 'is_key' in it, bus_items))
|
has_key = any(sub_key_nodes)
|
rule_pk = rule_enc.C_ENC_PK
|
sub_key = ''
|
key_items = []
|
self.compute_length_pos(bus_items)
|
for item in bus_items:
|
if item['type'] == 'enc':
|
if has_key:
|
_prop_enc = create_any_pkt(proj_pk, rule_pk, item, seq, name_path, ds, 'ENC', sub_key_nodes,
|
key_items)
|
else:
|
_prop_enc, rule_stream, _ = create_enc_pkt(proj_pk, rule_pk, item, rule_enc.C_ENC_PK, seq,
|
name_path, ds, '001', 'ENC')
|
else:
|
# 参数
|
_prop_enc = create_prop_enc(proj_pk, rule_pk, item, get_data_ty(item), seq)
|
if item.__contains__('is_key'):
|
sub_key += _prop_enc.C_ENCITEM_PK + '/'
|
key_items.append(
|
{"pk": _prop_enc.C_ENCITEM_PK,
|
'id': _prop_enc.C_SEGMENT_ID,
|
'name': _prop_enc.C_NAME,
|
'val': ''})
|
seq += 1
|
if sub_key:
|
rule_enc.C_KEY = sub_key
|
update_rule_enc(rule_enc)
|
|
def gen_tc(self):
|
# 数据帧格式
|
frame = self.gen_tc_transfer_frame()
|
# 数据包格式
|
pkt_format = self.gen_tc_transfer_pkt()
|
# 数据包列表
|
pkts = self.gen_tc_transfer_pkts()
|
for pkt in pkts:
|
pf = json.loads(json.dumps(pkt_format))
|
pf['name'] = pkt['name']
|
ph = next(filter(lambda x: x['name'] == '主导头', pf['children']), None)
|
apid = next(filter(lambda x: x['name'] == '应用进程标识符(APID)', ph['children']), None)
|
apid['value'] = pkt['apid']
|
apid['type'] = 'const'
|
sh = next(filter(lambda x: x['name'] == '副导头', pf['children']), None)
|
ser = next(filter(lambda x: x['name'] == '服务类型', sh['children']), None)
|
sub_ser = next(filter(lambda x: x['name'] == '服务子类型', sh['children']), None)
|
ser['value'] = pkt['server']
|
ser['type'] = 'const'
|
sub_ser['value'] = pkt['subServer']
|
sub_ser['type'] = 'const'
|
frame['subPkts'].append(pf)
|
self.order = 0
|
|
def build_def(item: dict):
|
if item['type'] == 'enum':
|
return json.dumps({"EnumItems": item['enums'], "CanInput": True})
|
elif item['type'] == 'length':
|
return None
|
elif item['type'] == 'checkSum':
|
return json.dumps({"ChecksumType": "CRC-CCITT"})
|
elif item['type'] == 'subPkt':
|
return json.dumps({"CanInput": False})
|
elif item['type'] == 'combPkt':
|
return None
|
elif 'value' in item:
|
return item['value']
|
|
def create_tc_format(parent_pk, field):
|
field['order'] = self.order
|
self.order += 1
|
field['def'] = build_def(field)
|
if 'length' in field:
|
field['bitWidth'] = field['length']
|
field['bitOrder'] = None
|
field['attr'] = 0
|
if field['type'] == 'length':
|
val = field['value']
|
field['range'] = val['start'] + "~" + val['end']
|
field['formula'] = val['formula']
|
ins_format = create_ins_format(self.proj.C_PROJECT_PK, parent_pk, field)
|
if 'children' in field:
|
autocode = 1
|
if field['type'] == 'pkt':
|
ins_format = create_ins_format(self.proj.C_PROJECT_PK, ins_format.C_INS_FORMAT_PK,
|
{'order': self.order, 'type': 'subPkt',
|
'def': json.dumps({"CanInput": False})})
|
self.order += 1
|
for child in field['children']:
|
child['autocode'] = autocode
|
autocode += 1
|
create_tc_format(ins_format.C_INS_FORMAT_PK, child)
|
# if 'subPkts' in field:
|
# for pkt in field['subPkts']:
|
# ins_format = create_ins_format(self.proj.C_PROJECT_PK, ins_format.C_INS_FORMAT_PK,
|
# {'order': self.order, 'type': 'subPkt',
|
# 'def': json.dumps({"CanInput": False})})
|
# create_tc_format(ins_format.C_INS_FORMAT_PK, pkt)
|
|
create_tc_format(None, frame)
|
|
def gen_tc_transfer_frame(self):
|
_msg = '''
|
分析YK传送帧格式,提取YK传送帧的数据结构,不包括数据包的数据结构。
|
## 经验:
|
字段类型包括:
|
1.组合包:combPkt,
|
2.固定码字:const,
|
3.长度:length,
|
4.枚举值:enum,
|
5.校验和:checkSum,
|
6.数据区:subPkt。
|
|
根据字段描述分析字段的类型,分析方法:
|
1.字段描述中明确指定了字段值的,类型为const,
|
2.字段中没有明确指定字段值,但是罗列了取值范围的,类型为enum,
|
3.字段描述中如果存在多层级描述则父级字段的类型为combPkt,
|
4.字段如果是和“长度”有关,类型为length,
|
5.如果和数据域有关,类型为subPkt,
|
6.字段如果和校验和有关,类型为checkSum。
|
|
字段值提取方法:
|
1.字段描述中明确指定了字段值,
|
2.长度字段的值要根据描述确定起止字段范围以及计算公式,value格式例如:{"start":"<code>","end":"<code>","formula":"N-1"},注意:start和end的值为字段code。
|
|
## 限制:
|
- length 自动转换为bit长度。
|
- value 根据字段描述提取。
|
- enums 有些字段是枚举值,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""}。
|
- 输出内容必须为严格的json,不能输出除json以外的任何内容。
|
|
字段数据结构:
|
主导头
|
版本号、通过标志、控制命令标志、空闲位、HTQ标识、虚拟信道标识、帧长、帧序列号
|
传送帧数据域
|
帧差错控制域。
|
|
# 输出内容例子:
|
{
|
"name": "YK帧",
|
"type": "pkt"
|
"children":[
|
{
|
"name": "主导头",
|
"code": "primaryHeader",
|
"length": 2,
|
"value": "00",
|
"type": "combPkt",
|
"children": [
|
{
|
"name": "版本号",
|
"code": "verNum"
|
"length": 1,
|
"value": "00"
|
}
|
]
|
}
|
],
|
"subPkts":[]
|
}
|
'''
|
|
def validation(gen_text):
|
json.loads(gen_text)
|
|
text = self.generate_tc_text(_msg, 'out/tc_transfer_frame.json', files=[file_map['指令格式']],
|
validation=validation)
|
frame = json.loads(text)
|
return frame
|
|
def gen_tc_transfer_pkt(self):
|
_msg = '''
|
仅分析YK包格式,提取YK包数据结构。
|
## 经验:
|
|
字段类型包括:
|
1.组合包:combPkt,
|
2.固定码字:const,
|
3.长度:length,
|
4.枚举值:enum,
|
5.校验和:checkSum,
|
6.数据区:subPkt。
|
|
根据字段描述分析字段的类型,分析方法:
|
1.字段描述中明确指定了字段值的,类型为const,
|
2.字段中没有明确指定字段值,但是罗列了取值范围的,类型为enum,
|
3.字段描述中如果存在多层级描述则父级字段的类型为combPkt,
|
4.字段如果是和“长度”有关,类型为length,
|
5.如果和数据域有关,类型为subPkt,
|
6.字段如果和校验和有关,类型为checkSum。
|
|
字段值提取方法:
|
1.字段描述中明确指定了字段值,
|
2.长度字段的值要根据描述确定起止字段范围以及计算公式,value格式例如:{"start":"<code>","end":"<code>","formula":"N-1"},注意:start和end的值为字段code。
|
|
## 限制:
|
- length 自动转换为bit长度。
|
- value 根据字段描述提取。
|
- enums 有些字段是枚举值,根据字段描述提取,枚举元素的数据结构为{"n":"","v":"","c":""}。
|
- 输出内容必须为严格的json,不能输出除json以外的任何内容。
|
|
字段数据结构:
|
主导头
|
包识别
|
包版本号、包类型、数据区头标志、应用进程标识符(APID)
|
包序列控制
|
序列标志
|
包序列计数
|
包长
|
副导头
|
CCSDS副导头标志
|
YK包版本号
|
命令正确应答(Ack)
|
服务类型
|
服务子类型
|
源地址
|
应用数据区
|
帧差错控制域。
|
|
# 输出内容例子:
|
{
|
"name": "YK包",
|
"type": "pkt"
|
"children":[
|
{
|
"name": "主导头",
|
"code": "primaryHeader",
|
"length": 2,
|
"value": "00",
|
"type": "combPkt",
|
"children": [
|
{
|
"name": "版本号",
|
"code": "verNum"
|
"length": 1,
|
"value": "00"
|
}
|
]
|
}
|
],
|
"subPkts":[]
|
}
|
'''
|
|
def validation(gen_text):
|
json.loads(gen_text)
|
|
text = self.generate_tc_text(_msg, 'out/tc_transfer_pkt.json', files=[file_map['指令格式']],
|
validation=validation)
|
pkt_format = json.loads(text)
|
return pkt_format
|
|
def gen_tc_transfer_pkts(self):
|
_msg = '''
|
分析文档列出所有的遥控源包。
|
## 数据结构如下:
|
[{
|
"name": "xxx",
|
"code":"pkt",
|
"apid":"0xAA",
|
"server":"0x1",
|
"subServer":"0x2"
|
}]
|
'''
|
|
def validation(gen_text):
|
json.loads(gen_text)
|
|
text = self.generate_tc_text(_msg, 'out/tc_transfer_pkts.json', files=[file_map['指令格式']],
|
validation=validation)
|
pkts = json.loads(text)
|
return pkts
|
|
|
if __name__ == '__main__':
|
try:
|
os.makedirs("./out/pkts", exist_ok=True)
|
# 启动大模型处理流程
|
ret_text = DbStructFlow().run()
|
except KeyboardInterrupt:
|
if g_completion:
|
g_completion.close()
|