import math from knowledgebase.db.db_helper import create_property_enc, \ create_rule, create_rule_stream, create_rule_enc, create_enc_linear, create_rule_linear, create_property_linear, \ update_rule_enc, create_extend_info, create_rulekey_info from knowledgebase.utils import get_bit_mask enc_ty_flag_map = { "DS": "0", "ENC": "1", "LOGICENC": "2", "ANY": "3", "LINEAR": "4", } def get_byte_len_str(node: dict): length = node['length'] if node['type'] != 'linear': return length if isinstance(length, int): length = f'{math.ceil(length / 8)}' # if 'children' in node and len(node['children']): # last = node['children'][-1:].pop() # if isinstance(last['length'], int): # length = last['pos'] + last['length'] # length = f'{math.ceil(length / 8)}' return length def check_gen_content(func, check_fun, try_cnt=6): try: ret = func() check_fun(ret) return ret except BaseException as e: if try_cnt <= 0: print('生成失败!') raise e print(f'生成内容有误重新生成,第{6 - try_cnt}次。') return check_gen_content(func, check_fun, try_cnt - 1) def get_data_ty(node: dict): data_ty = 'INVAR' if 'dataTy' in node: data_ty = node['dataTy'] return data_ty def create_prop_enc(proj_pk, enc_pk, node, ty, seq): bit_length = node['length'] if isinstance(bit_length, int): pos = node['pos'] byte_length = math.ceil((pos % 8 + bit_length) / 8) start = node['pos'] % 8 end = start + bit_length - 1 if start == 0 and bit_length % 8 == 0: mask = 'ALL' else: mask = hex(get_bit_mask(start, end)) else: mask = 'ALL' byte_length = bit_length para_id = f'{node["id"]}' offset = f'{node["pos"] // 8}' content = None if 'content' in node: content = node['content'] cond = None if 'condition' in node: cond = node['condition'] prop_enc = create_property_enc(proj_pk, enc_pk, node['id'], node['name'], ty, content, f'{offset}', f'{byte_length}', '1', mask, cond, seq, '', para_id) return prop_enc def create_prop_linear(proj_pk, linear_pk, node, seq): bit_length = node['length'] if isinstance(bit_length, int): byte_length = math.ceil(bit_length / 8) start = node['pos'] % 8 end = start + bit_length - 1 mask = hex(get_bit_mask(start, end)) else: mask = 'ALL' byte_length = bit_length para_id = f'{node["id"]}' offset = f'{node["pos"] // 8}' return create_property_linear(proj_pk, linear_pk, para_id, node['name'], 'INVAR', '0', f'{offset}', f'{byte_length}', None, mask, None, None, None, None, None, seq) def create_key_liner_pkt(proj_pk, rule_pk, node, parent_rule_pk, seq, name_path, ds, content, actual_parent_pk=None): # 创建线性包,父级包含子包主键字段的情况 # 创建解析规则 rule_name = node['rule_name'] if 'rule_name' in node else node['name'] rule = create_rule(proj_pk, rule_pk, node['id'], rule_name, get_byte_len_str(node), parent_rule_pk, enc_ty_flag_map['LINEAR'], actual_parent_pk) rule_linear = create_rule_linear(proj_pk, rule_pk, node['id'], node['name'], get_byte_len_str(node), content) # 创建t_rule_stream rule_stream = create_rule_stream(proj_pk, rule.C_RULE_PK, ds.C_STREAM_PK, ds.C_STREAM_ID, ds.C_NAME, ds.C_STREAM_DIR, f"{name_path}{node['name']}/") if 'children' in node: seq = 1 for child in node['children']: # 创建线性包参数 create_prop_linear(proj_pk, rule_linear.C_LINEAR_PK, child, seq) seq = seq + 1 return rule def create_liner_pkt(proj_pk, linear_pk, node, parent_rule_pk, seq, name_path, ds, content): # 创建线性包 prop_enc = create_prop_enc(proj_pk, linear_pk, node, 'LINEAR', seq) # 创建 enc_linear enc_linear = create_enc_linear(proj_pk, prop_enc.C_ENCITEM_PK, '002') rule_pk = enc_linear.C_LINEAR_PK # 创建解析规则 length = get_byte_len_str(node) rule_name = node['rule_name'] if 'rule_name' in node else node['name'] rule = create_rule(proj_pk, rule_pk, node['id'], rule_name, length, parent_rule_pk, enc_ty_flag_map['LINEAR']) rule_linear = create_rule_linear(proj_pk, rule_pk, node['id'], node['name'], length, content) # 创建t_rule_stream rule_stream = create_rule_stream(proj_pk, rule.C_RULE_PK, ds.C_STREAM_PK, ds.C_STREAM_ID, ds.C_NAME, ds.C_STREAM_DIR, f"{name_path}{node['name']}/") if 'children' in node: seq = 1 for child in node['children']: # 创建线性包参数 create_prop_linear(proj_pk, rule_linear.C_LINEAR_PK, child, seq) seq = seq + 1 def create_any_pkt(proj_pk, linear_pk, node, seq, name_path, ds, pkt_ty, sub_key_nodes, key_items=None): # any没有t_rule、t_rule_enc、t_rule_linear prop_enc = create_prop_enc(proj_pk, linear_pk, node, pkt_ty, seq) rule_name = node['rule_name'] if 'rule_name' in node else node['name'] length = get_byte_len_str(node) rule = create_rule(proj_pk, prop_enc.C_ENCITEM_PK, node['id'], rule_name, length, prop_enc.C_ENC_PK, enc_ty_flag_map['ANY'], None) if 'children' in node: child_seq = 1 for child in node['children']: vals = None if 'vals' in child: values = [] vals = child['vals'] if vals.endswith("/"): vals = vals[:-1] values.extend(vals.split("/")) for i in range(0, len(key_items)): key_items[i]['val'] = values[i] node_name = '【' for i in range(0, len(values)): sub_key_node = sub_key_nodes[i] val = values[i] node_name += f'{sub_key_node["name"]}={val}' node_name += '】' child['rule_name'] = child['name'] + node_name if child['type'] == 'enc': # 封装包 enc_linear = create_enc_linear(proj_pk, prop_enc.C_ENCITEM_PK, '001', vals) _, __, _rule = create_enc_pkt(proj_pk, enc_linear.C_LINEAR_PK, child, enc_linear.C_ENCITEM_PK, child_seq, name_path, ds, '001', 'ENC', parent_has_key=True, actual_parent_pk=prop_enc.C_ENC_PK) if key_items: for it in key_items: create_rulekey_info(proj_pk, _rule.C_RULE_PK, _rule.C_RULE_ID, _rule.C_RULE_NAME, it['pk'], it['id'], it['name'], it['val']) elif child['type'] == 'linear': # 线性包 # 查询已有的rule _rule = None # find_rule_by_rule_id(child['id']) enc_linear = create_enc_linear(proj_pk, prop_enc.C_ENCITEM_PK, '002', vals, _rule.C_RULE_PK if _rule else None) if not _rule: _rule = create_key_liner_pkt(proj_pk, enc_linear.C_LINEAR_PK, child, enc_linear.C_ENCITEM_PK, child_seq, name_path, ds, None, prop_enc.C_ENC_PK) if key_items: for it in key_items: create_rulekey_info(proj_pk, _rule.C_RULE_PK, _rule.C_RULE_ID, _rule.C_RULE_NAME, it['pk'], it['id'], it['name'], it['val']) else: # 创建解析规则 rule_name = node['rule_name'] if 'rule_name' in node else node['name'] _rule = create_rule(proj_pk, _rule.C_RULE_PK, child['id'], rule_name, child['length'], prop_enc.C_ENCITEM_PK, enc_ty_flag_map['LINEAR'], prop_enc.C_ENC_PK) # rule_linear = create_rule_linear(proj_pk, enc_linear.C_LINEAR_PK, node['id'], node['name'], # node['length'], None) # 创建t_rule_stream rule_stream = create_rule_stream(proj_pk, _rule.C_RULE_PK, ds.C_STREAM_PK, ds.C_STREAM_ID, ds.C_NAME, ds.C_STREAM_DIR, f"{name_path}{child['name']}/") elif child['type'] == 'logic': # 逻辑封装包 enc_linear = create_enc_linear(proj_pk, prop_enc.C_ENCITEM_PK, '005', vals) _, __, _rule = create_enc_pkt(proj_pk, enc_linear.C_LINEAR_PK, child, enc_linear.C_ENCITEM_PK, child_seq, name_path, ds, '005', 'ENC', True, prop_enc.C_ENC_PK) if key_items: for it in key_items: create_rulekey_info(proj_pk, _rule.C_RULE_PK, _rule.C_RULE_ID, _rule.C_RULE_NAME, it['pk'], it['id'], it['name'], it['val']) child_seq += 1 return prop_enc def create_enc_pkt(proj_pk, linear_pk, node, parent_rule_pk, seq, name_path, ds, ty, pkt_ty, parent_has_key=False, actual_parent_pk=None): """ 创建封装包 :param enc_pk: :param proj_pk: :param node: :param parent_rule_pk: :param seq: :param name_path: :param ds: :param ty: :param is_logic_enc: :return: """ prop_enc = None key_items = [] length = get_byte_len_str(node) # 查询已有的rule if not parent_has_key: # 创建封装包 prop_enc = create_prop_enc(proj_pk, linear_pk, node, pkt_ty, seq) encitem_pk = prop_enc.C_ENCITEM_PK vals = None if 'vals' in node: vals = node['vals'] + '/' enc_linear = create_enc_linear(proj_pk, encitem_pk, ty, vals) rule_pk = enc_linear.C_LINEAR_PK else: rule_pk = linear_pk # 创建封装包下面的解析规则 rule_name = node['rule_name'] if 'rule_name' in node else node['name'] rule = create_rule(proj_pk, rule_pk, node['id'], rule_name, length, parent_rule_pk, enc_ty_flag_map[pkt_ty], actual_parent_pk) rule_enc = create_rule_enc(proj_pk, rule.C_RULE_PK, node['id'], node['name'], node['content'] if 'content' in node else None) name_path = f"{name_path}{node['name']}/" rule_stream = create_rule_stream(proj_pk, rule.C_RULE_PK, ds.C_STREAM_PK, ds.C_STREAM_ID, ds.C_NAME, ds.C_STREAM_DIR, name_path) if 'extInfo' in node and node['extInfo']: for info in node['extInfo']: create_extend_info(proj_pk, info['id'], info['name'], info['val'], rule.C_RULE_PK) if 'children' in node: child_seq = 1 sub_key_nodes = list(filter(lambda it: it.__contains__('is_key'), node['children'])) has_key = len(sub_key_nodes) > 0 sub_key = '' for child in node['children']: if child['type'] == 'enc': # 封装包 if has_key: create_any_pkt(proj_pk, rule.C_RULE_PK, child, child_seq, name_path, ds, 'ENC', sub_key_nodes) else: create_enc_pkt(proj_pk, rule.C_RULE_PK, child, rule_pk, child_seq, name_path, ds, '001', 'ENC') elif child['type'] == 'para': # 数据段参数 _prop_enc = create_prop_enc(proj_pk, rule_enc.C_ENC_PK, child, get_data_ty(child), child_seq) is_key = False if 'is_key' in child: is_key = child['is_key'] if is_key: sub_key += _prop_enc.C_ENCITEM_PK + '/' key_items.append( {"pk": _prop_enc.C_ENCITEM_PK, 'id': _prop_enc.C_SEGMENT_ID, 'name': _prop_enc.C_NAME, 'val': ''}) elif child['type'] == 'linear': # 线性包 if has_key: create_any_pkt(proj_pk, rule.C_RULE_PK, child, child_seq, name_path, ds, 'LINEAR', sub_key_nodes, key_items) else: create_liner_pkt(proj_pk, rule.C_RULE_PK, child, rule_pk, child_seq, name_path, ds, None) elif child['type'] == 'logic': # 逻辑封装包 if has_key: create_any_pkt(proj_pk, rule.C_RULE_PK, child, child_seq, name_path, ds, 'LOGICENC', sub_key_nodes, key_items) else: create_enc_pkt(proj_pk, rule.C_RULE_PK, child, rule_pk, child_seq, name_path, ds, '005', 'LOGICENC') elif child['type'] == 'any': # 任意包 if has_key: create_any_pkt(proj_pk, rule.C_RULE_PK, child, child_seq, name_path, ds, 'ANY', sub_key_nodes, key_items) else: create_enc_pkt(proj_pk, rule.C_RULE_PK, child, rule_pk, child_seq, name_path, ds, '005', 'ANY') child_seq += 1 if sub_key: rule_enc.C_KEY = sub_key update_rule_enc(rule_enc) return prop_enc, rule_stream, rule