import math
|
|
from knowledgebase.db.db_helper import create_property_enc, \
|
create_rule, create_rule_stream, create_rule_enc, create_enc_linear, create_rule_linear, create_property_linear, \
|
update_rule_enc, create_extend_info, create_rulekey_info
|
from knowledgebase.utils import get_bit_mask
|
|
enc_ty_flag_map = {
|
"DS": "0",
|
"ENC": "1",
|
"LOGICENC": "2",
|
"ANY": "3",
|
"LINEAR": "4",
|
}
|
|
|
def get_byte_len_str(node: dict):
|
length = node['length']
|
if node['type'] != 'linear':
|
return length
|
if isinstance(length, int):
|
length = f'{math.ceil(length / 8)}'
|
|
# if 'children' in node and len(node['children']):
|
# last = node['children'][-1:].pop()
|
# if isinstance(last['length'], int):
|
# length = last['pos'] + last['length']
|
# length = f'{math.ceil(length / 8)}'
|
return length
|
|
|
def check_gen_content(func, check_fun, try_cnt=6):
|
try:
|
ret = func()
|
check_fun(ret)
|
return ret
|
except BaseException as e:
|
if try_cnt <= 0:
|
print('生成失败!')
|
raise e
|
print(f'生成内容有误重新生成,第{6 - try_cnt}次。')
|
return check_gen_content(func, check_fun, try_cnt - 1)
|
|
|
def get_data_ty(node: dict):
|
data_ty = 'INVAR'
|
if 'dataTy' in node:
|
data_ty = node['dataTy']
|
return data_ty
|
|
|
def create_prop_enc(proj_pk, enc_pk, node, ty, seq):
|
bit_length = node['length']
|
if isinstance(bit_length, int):
|
pos = node['pos']
|
byte_length = math.ceil((pos % 8 + bit_length) / 8)
|
|
start = node['pos'] % 8
|
end = start + bit_length - 1
|
|
if start == 0 and bit_length % 8 == 0:
|
mask = 'ALL'
|
else:
|
mask = hex(get_bit_mask(start, end))
|
else:
|
mask = 'ALL'
|
byte_length = bit_length
|
para_id = f'{node["id"]}'
|
offset = f'{node["pos"] // 8}'
|
content = None
|
if 'content' in node:
|
content = node['content']
|
cond = None
|
if 'condition' in node:
|
cond = node['condition']
|
prop_enc = create_property_enc(proj_pk, enc_pk, node['id'], node['name'], ty, content, f'{offset}',
|
f'{byte_length}', '1', mask, cond, seq, '', para_id)
|
return prop_enc
|
|
|
def create_prop_linear(proj_pk, linear_pk, node, seq):
|
bit_length = node['length']
|
if isinstance(bit_length, int):
|
byte_length = math.ceil(bit_length / 8)
|
start = node['pos'] % 8
|
end = start + bit_length - 1
|
|
mask = hex(get_bit_mask(start, end))
|
else:
|
mask = 'ALL'
|
byte_length = bit_length
|
para_id = f'{node["id"]}'
|
offset = f'{node["pos"] // 8}'
|
return create_property_linear(proj_pk, linear_pk, para_id, node['name'], 'INVAR', '0', f'{offset}',
|
f'{byte_length}', None, mask, None, None, None, None, None, seq)
|
|
|
def create_key_liner_pkt(proj_pk, rule_pk, node, parent_rule_pk, seq, name_path, ds, content,
|
actual_parent_pk=None):
|
# 创建线性包,父级包含子包主键字段的情况
|
# 创建解析规则
|
rule_name = node['rule_name'] if 'rule_name' in node else node['name']
|
rule = create_rule(proj_pk, rule_pk, node['id'], rule_name, get_byte_len_str(node), parent_rule_pk,
|
enc_ty_flag_map['LINEAR'], actual_parent_pk)
|
rule_linear = create_rule_linear(proj_pk, rule_pk, node['id'], node['name'], get_byte_len_str(node), content)
|
# 创建t_rule_stream
|
rule_stream = create_rule_stream(proj_pk,
|
rule.C_RULE_PK,
|
ds.C_STREAM_PK,
|
ds.C_STREAM_ID,
|
ds.C_NAME,
|
ds.C_STREAM_DIR,
|
f"{name_path}{node['name']}/")
|
if 'children' in node:
|
seq = 1
|
for child in node['children']:
|
# 创建线性包参数
|
create_prop_linear(proj_pk, rule_linear.C_LINEAR_PK, child, seq)
|
seq = seq + 1
|
return rule
|
|
|
def create_liner_pkt(proj_pk, linear_pk, node, parent_rule_pk, seq, name_path, ds, content):
|
# 创建线性包
|
prop_enc = create_prop_enc(proj_pk, linear_pk, node, 'LINEAR', seq)
|
# 创建 enc_linear
|
enc_linear = create_enc_linear(proj_pk, prop_enc.C_ENCITEM_PK, '002')
|
rule_pk = enc_linear.C_LINEAR_PK
|
# 创建解析规则
|
length = get_byte_len_str(node)
|
rule_name = node['rule_name'] if 'rule_name' in node else node['name']
|
rule = create_rule(proj_pk, rule_pk, node['id'], rule_name, length, parent_rule_pk,
|
enc_ty_flag_map['LINEAR'])
|
rule_linear = create_rule_linear(proj_pk, rule_pk, node['id'], node['name'], length, content)
|
# 创建t_rule_stream
|
rule_stream = create_rule_stream(proj_pk,
|
rule.C_RULE_PK,
|
ds.C_STREAM_PK,
|
ds.C_STREAM_ID,
|
ds.C_NAME,
|
ds.C_STREAM_DIR,
|
f"{name_path}{node['name']}/")
|
if 'children' in node:
|
seq = 1
|
for child in node['children']:
|
# 创建线性包参数
|
create_prop_linear(proj_pk, rule_linear.C_LINEAR_PK, child, seq)
|
seq = seq + 1
|
|
|
def create_any_pkt(proj_pk, linear_pk, node, seq, name_path, ds, pkt_ty, sub_key_nodes, key_items=None):
|
# any没有t_rule、t_rule_enc、t_rule_linear
|
prop_enc = create_prop_enc(proj_pk, linear_pk, node, pkt_ty, seq)
|
rule_name = node['rule_name'] if 'rule_name' in node else node['name']
|
length = get_byte_len_str(node)
|
rule = create_rule(proj_pk, prop_enc.C_ENCITEM_PK, node['id'], rule_name, length, prop_enc.C_ENC_PK,
|
enc_ty_flag_map['ANY'], None)
|
if 'children' in node:
|
child_seq = 1
|
for child in node['children']:
|
vals = None
|
if 'vals' in child:
|
values = []
|
vals = child['vals']
|
if vals.endswith("/"):
|
vals = vals[:-1]
|
values.extend(vals.split("/"))
|
for i in range(0, len(key_items)):
|
key_items[i]['val'] = values[i]
|
node_name = '【'
|
for i in range(0, len(values)):
|
sub_key_node = sub_key_nodes[i]
|
val = values[i]
|
node_name += f'{sub_key_node["name"]}={val}'
|
node_name += '】'
|
child['rule_name'] = child['name'] + node_name
|
|
if child['type'] == 'enc': # 封装包
|
enc_linear = create_enc_linear(proj_pk, prop_enc.C_ENCITEM_PK, '001', vals)
|
_, __, _rule = create_enc_pkt(proj_pk, enc_linear.C_LINEAR_PK, child, enc_linear.C_ENCITEM_PK,
|
child_seq, name_path, ds, '001', 'ENC', parent_has_key=True,
|
actual_parent_pk=prop_enc.C_ENC_PK)
|
if key_items:
|
for it in key_items:
|
create_rulekey_info(proj_pk, _rule.C_RULE_PK, _rule.C_RULE_ID, _rule.C_RULE_NAME, it['pk'],
|
it['id'], it['name'], it['val'])
|
elif child['type'] == 'linear': # 线性包
|
# 查询已有的rule
|
_rule = None # find_rule_by_rule_id(child['id'])
|
enc_linear = create_enc_linear(proj_pk, prop_enc.C_ENCITEM_PK, '002', vals,
|
_rule.C_RULE_PK if _rule else None)
|
if not _rule:
|
_rule = create_key_liner_pkt(proj_pk, enc_linear.C_LINEAR_PK, child, enc_linear.C_ENCITEM_PK,
|
child_seq, name_path, ds, None, prop_enc.C_ENC_PK)
|
if key_items:
|
for it in key_items:
|
create_rulekey_info(proj_pk, _rule.C_RULE_PK, _rule.C_RULE_ID, _rule.C_RULE_NAME,
|
it['pk'], it['id'], it['name'], it['val'])
|
else:
|
# 创建解析规则
|
rule_name = node['rule_name'] if 'rule_name' in node else node['name']
|
_rule = create_rule(proj_pk, _rule.C_RULE_PK, child['id'], rule_name, child['length'],
|
prop_enc.C_ENCITEM_PK, enc_ty_flag_map['LINEAR'], prop_enc.C_ENC_PK)
|
# rule_linear = create_rule_linear(proj_pk, enc_linear.C_LINEAR_PK, node['id'], node['name'],
|
# node['length'], None)
|
# 创建t_rule_stream
|
rule_stream = create_rule_stream(proj_pk,
|
_rule.C_RULE_PK,
|
ds.C_STREAM_PK,
|
ds.C_STREAM_ID,
|
ds.C_NAME,
|
ds.C_STREAM_DIR,
|
f"{name_path}{child['name']}/")
|
elif child['type'] == 'logic': # 逻辑封装包
|
enc_linear = create_enc_linear(proj_pk, prop_enc.C_ENCITEM_PK, '005', vals)
|
_, __, _rule = create_enc_pkt(proj_pk, enc_linear.C_LINEAR_PK, child, enc_linear.C_ENCITEM_PK,
|
child_seq,
|
name_path, ds, '005', 'ENC', True, prop_enc.C_ENC_PK)
|
if key_items:
|
for it in key_items:
|
create_rulekey_info(proj_pk, _rule.C_RULE_PK, _rule.C_RULE_ID, _rule.C_RULE_NAME,
|
it['pk'], it['id'], it['name'], it['val'])
|
child_seq += 1
|
return prop_enc
|
|
|
def create_enc_pkt(proj_pk, linear_pk, node, parent_rule_pk, seq, name_path, ds, ty, pkt_ty,
|
parent_has_key=False, actual_parent_pk=None):
|
"""
|
创建封装包
|
:param enc_pk:
|
:param proj_pk:
|
:param node:
|
:param parent_rule_pk:
|
:param seq:
|
:param name_path:
|
:param ds:
|
:param ty:
|
:param is_logic_enc:
|
:return:
|
"""
|
prop_enc = None
|
key_items = []
|
length = get_byte_len_str(node)
|
# 查询已有的rule
|
if not parent_has_key:
|
# 创建封装包
|
prop_enc = create_prop_enc(proj_pk, linear_pk, node, pkt_ty, seq)
|
encitem_pk = prop_enc.C_ENCITEM_PK
|
vals = None
|
if 'vals' in node:
|
vals = node['vals'] + '/'
|
enc_linear = create_enc_linear(proj_pk, encitem_pk, ty, vals)
|
rule_pk = enc_linear.C_LINEAR_PK
|
else:
|
rule_pk = linear_pk
|
# 创建封装包下面的解析规则
|
|
rule_name = node['rule_name'] if 'rule_name' in node else node['name']
|
rule = create_rule(proj_pk, rule_pk, node['id'], rule_name, length, parent_rule_pk,
|
enc_ty_flag_map[pkt_ty], actual_parent_pk)
|
rule_enc = create_rule_enc(proj_pk, rule.C_RULE_PK, node['id'], node['name'],
|
node['content'] if 'content' in node else None)
|
name_path = f"{name_path}{node['name']}/"
|
rule_stream = create_rule_stream(proj_pk,
|
rule.C_RULE_PK,
|
ds.C_STREAM_PK,
|
ds.C_STREAM_ID,
|
ds.C_NAME,
|
ds.C_STREAM_DIR,
|
name_path)
|
|
if 'extInfo' in node and node['extInfo']:
|
for info in node['extInfo']:
|
create_extend_info(proj_pk, info['id'], info['name'], info['val'], rule.C_RULE_PK)
|
|
if 'children' in node:
|
child_seq = 1
|
sub_key_nodes = list(filter(lambda it: it.__contains__('is_key'), node['children']))
|
has_key = len(sub_key_nodes) > 0
|
sub_key = ''
|
for child in node['children']:
|
if child['type'] == 'enc': # 封装包
|
if has_key:
|
create_any_pkt(proj_pk, rule.C_RULE_PK, child, child_seq, name_path, ds, 'ENC', sub_key_nodes)
|
else:
|
create_enc_pkt(proj_pk, rule.C_RULE_PK, child, rule_pk, child_seq,
|
name_path, ds, '001', 'ENC')
|
elif child['type'] == 'para': # 数据段参数
|
_prop_enc = create_prop_enc(proj_pk, rule_enc.C_ENC_PK, child, get_data_ty(child), child_seq)
|
is_key = False
|
if 'is_key' in child:
|
is_key = child['is_key']
|
if is_key:
|
sub_key += _prop_enc.C_ENCITEM_PK + '/'
|
key_items.append(
|
{"pk": _prop_enc.C_ENCITEM_PK,
|
'id': _prop_enc.C_SEGMENT_ID,
|
'name': _prop_enc.C_NAME,
|
'val': ''})
|
elif child['type'] == 'linear': # 线性包
|
if has_key:
|
create_any_pkt(proj_pk, rule.C_RULE_PK, child, child_seq, name_path, ds, 'LINEAR', sub_key_nodes,
|
key_items)
|
else:
|
create_liner_pkt(proj_pk, rule.C_RULE_PK, child, rule_pk, child_seq,
|
name_path, ds, None)
|
elif child['type'] == 'logic': # 逻辑封装包
|
if has_key:
|
create_any_pkt(proj_pk, rule.C_RULE_PK, child, child_seq, name_path, ds, 'LOGICENC', sub_key_nodes,
|
key_items)
|
else:
|
create_enc_pkt(proj_pk, rule.C_RULE_PK, child, rule_pk, child_seq, name_path, ds,
|
'005', 'LOGICENC')
|
elif child['type'] == 'any': # 任意包
|
if has_key:
|
create_any_pkt(proj_pk, rule.C_RULE_PK, child, child_seq, name_path, ds, 'ANY', sub_key_nodes,
|
key_items)
|
else:
|
create_enc_pkt(proj_pk, rule.C_RULE_PK, child, rule_pk, child_seq, name_path, ds,
|
'005', 'ANY')
|
child_seq += 1
|
if sub_key:
|
rule_enc.C_KEY = sub_key
|
update_rule_enc(rule_enc)
|
|
return prop_enc, rule_stream, rule
|