import uuid from enum import Enum from threading import RLock from sqlalchemy.orm import sessionmaker, scoped_session from knowledgebase.db.models import get_engine, TProject, TDevice, TDataStream, TDevStream, TRule, TRuleEnc, TPropertyEnc, \ TPropertyLinear, TRuleStream, TEncLinear, TRuleLinear, TParameter, TParameterType, TExtendInfo, TRulekeyInfo, \ TInsFormat from hashlib import md5 _session = None _para_id_map = {} db_lock = RLock() def get_session(): global _session return _session def init_db_helper(): # 创建一个会话工厂 session_factory = sessionmaker(bind=get_engine()) # 创建一个会话对象 Session = scoped_session(session_factory) global _session _session = Session() def th_safe(func): def wrapper(*args, **kwargs): with db_lock: result = func(*args, **kwargs) return result return wrapper def get_pk(): n = uuid.uuid4().hex pk = md5(n.encode('utf-8')).hexdigest() return pk @th_safe def create_project(sat_id, sat_name, proj_code, proj_name, desc, date_time, ) -> TProject: """ 创建project :param sat_id: :param sat_name: :param proj_code: :param proj_name: :param desc: :param date_time: :return: 创建完成的project """ project = TProject(C_PROJECT_PK=get_pk(), C_SAT_ID=sat_id, C_SAT_NAME=sat_name, C_PROJECT_CODE=proj_code, C_DESCRIPTION=desc, C_HASH=uuid.uuid4().int & 0xffffffff, C_PROJECT_NAME=proj_name, C_DATETIME=date_time, C_CREATEOR='') _session.add(project) _session.commit() return project @th_safe def create_device(device_id, device_name, device_type, dll, project_pk): """ 创建device :param device_id: :param device_name: :param device_type: :param dll: :param project_pk: :return: """ device = TDevice(C_DEV_PK=get_pk(), C_DEV_ID=device_id, C_DEV_NAME=device_name, C_DEV_TYPE=device_type, C_DLL=dll, C_PROJECT_PK=project_pk) _session.add(device) _session.commit() return device @th_safe def create_extend_info(proj_pk, prop_id, prop_name, val, fk): ext_info = TExtendInfo( C_PK=get_pk(), C_PROJECT_PK=proj_pk, C_PROPERTY_ID=prop_id, C_PROPERTY_NAME=prop_name, C_VAL=val, C_FOREIGN_PK=fk ) _session.add(ext_info) _session.commit() @th_safe def create_data_stream(proj_pk, dev_pk, name, code, data_ty, direct, rule_id, rule_ty, rule_pk=None): """ 创建data_stream :param proj_pk: :param dev_pk: :param name: :param code: :param data_ty: :param direct: :param rule_id: :param rule_ty: :return: """ ds = TDataStream(C_STREAM_PK=get_pk(), C_PROJECT_PK=proj_pk, C_STREAM_ID=code, C_DATA_TYPE=data_ty, C_STREAM_DIR=direct, C_NAME=name, C_DESCRIPTION='', C_RULE_ID=rule_id, C_RULE_TYPE=rule_ty) _session.add(ds) link = TDevStream(C_PK=get_pk(), C_DEV_PK=dev_pk, C_STREAM_PK=ds.C_STREAM_PK, C_PROJECT_PK=proj_pk) _session.add(link) rule_enc = None # 创建解析规则 if rule_pk is None: rule_pk = get_pk() if rule_ty == '001': # 封装包 rule_enc = _create_rule_enc(proj_pk, rule_pk, rule_id, rule_id) rule = _create_rule(proj_pk, ds.C_STREAM_PK, rule_id, name, None, None, '0') rule = _create_rule(proj_pk, rule_pk, rule_id, rule_id, None, ds.C_STREAM_PK, '1') # rule stream rule_stream = _create_rule_stream(proj_pk, rule_pk, ds.C_STREAM_PK, ds.C_STREAM_ID, ds.C_NAME, ds.C_STREAM_DIR, f"{ds.C_NAME}/{rule_id}/") _session.add(rule_stream) _session.commit() return ds, rule_stream, rule_enc @th_safe def create_rule(proj_pk, rule_pk, rule_id, rule_name, rule_len, parent_pk, flag, actual_parent_pk=None): return _create_rule(proj_pk, rule_pk, rule_id, rule_name, rule_len, parent_pk, flag, actual_parent_pk) def _create_rule(proj_pk, rule_pk, rule_id, rule_name, rule_len, parent_pk, flag, actual_parent_pk=None): rule = TRule( C_PK=get_pk(), C_PROJECT_PK=proj_pk, C_RULE_PK=rule_pk, C_RULE_ID=rule_id, C_RULE_NAME=rule_name, C_RULE_LENGTH=rule_len, C_PARENT_PK=parent_pk, C_FLAG=flag, C_ACTUAL_PARENT_PK=actual_parent_pk ) _session.add(rule) _session.commit() return rule @th_safe def find_rule_by_rule_id(rule_id): return _session.query(TRule).filter(TRule.C_RULE_ID == rule_id).first() @th_safe def create_rule_stream(proj_pk, rule_pk, stream_pk, stream_id, stream_name, stream_dir, _path): return _create_rule_stream(proj_pk, rule_pk, stream_pk, stream_id, stream_name, stream_dir, _path) def _create_rule_stream(proj_pk, rule_pk, stream_pk, stream_id, stream_name, stream_dir, _path): rule_stream = TRuleStream( C_PK=get_pk(), C_PROJECT_PK=proj_pk, C_RULE_PK=rule_pk, C_STREAM_PK=stream_pk, C_STREAM_ID=stream_id, C_STREAM_NAME=stream_name, C_STREAM_DIR=stream_dir, C_PATH=_path ) _session.add(rule_stream) _session.commit() return rule_stream @th_safe def create_ref_ds_rule_stream(proj_pk, stream_pk, stream_id, stream_name, stream_dir, target_stream_pk): items: list = _session.query(TRuleStream).filter(TRuleStream.C_STREAM_PK == target_stream_pk).all() for it in items: _path = it.C_PATH if len(_path.split('/')) == 3: continue _path = f'{stream_name}/{stream_id}/'.join(_path.split('/')[2:]) + '/' _create_rule_stream(proj_pk, it.C_RULE_PK, stream_pk, stream_id, stream_name, stream_dir, _path) @th_safe def create_rule_enc(proj_pk, enc_pk, enc_id, name, content=None): return _create_rule_enc(proj_pk, enc_pk, enc_id, name, content) def _create_rule_enc(proj_pk, enc_pk, enc_id, name, content=None): rule_enc = TRuleEnc( C_ENC_PK=enc_pk, C_PROJECT_PK=proj_pk, C_ENC_ID=enc_id, C_NAME=name, C_CONTENT=content, ) _session.add(rule_enc) _session.commit() return rule_enc @th_safe def create_rule_linear(proj_pk, linear_pk, linear_id, name, length, content): rule_linear = TRuleLinear( C_LINEAR_PK=linear_pk, C_PROJECT_PK=proj_pk, C_LINEAR_ID=linear_id, C_NAME=name, C_LENGTH=length, C_PACKAGE_TYPE=None, C_REL_LINEAR_PK=None, C_CONTENT=content ) _session.add(rule_linear) _session.commit() return rule_linear @th_safe def create_property_enc(proj_pk, enc_pk, segment_id, name, ty, content, offset, length, msb_first, mask, cond, seq, rel_enc_item_pk, para_id): property_enc = TPropertyEnc( C_ENCITEM_PK=get_pk(), C_ENC_PK=enc_pk, C_SEGMENT_ID=segment_id, C_NAME=name, C_TYPE=ty, C_CONTENT=content, C_PUBLISH=None, C_OFFSET=offset, C_LENGTH=length, C_MSBFIRST=msb_first, C_MASK=mask, C_CONDITION=cond, C_PROJECT_PK=proj_pk, C_SEQ=seq, C_REL_ENCITEM_PK=rel_enc_item_pk, C_PAR_ID=para_id ) _session.add(property_enc) para = TParameter( C_PAR_PK=get_pk(), C_PROJECT_PK=proj_pk, C_PAR_CODE=segment_id, C_PAR_NAME=name, C_SUBSYS=None, C_TYPE='0', C_UNIT=None, C_VALUE_RANGE=None, C_DIS_REQUIRE=None, C_MODULUS=None, C_PARAMS=None, C_PRECISION='0', C_REG_PK=None, C_METHOD_PK=None ) _session.add(para) if ty == 'ENUM' and content: items: list = content.split(' ') for item in items: idx = items.index(item) name, val = item.split(',') pt = TParameterType( C_PK=get_pk(), C_TYPE_ID=f'{idx}', C_TYPE_NAME=name, C_VALUE=val, C_DATA_TYPE=None, C_PAR_PK=para.C_PAR_PK, C_PROJECT_PK=proj_pk ) _session.add(pt) _session.commit() return property_enc def get_para_id(para_id): for i in range(1, 9999): _id = f'{i}'.zfill(4) _para_id = para_id + '_' + _id if _para_id not in _para_id_map: _para_id_map[_para_id] = True return _para_id para_id_pk_map = {} @th_safe def create_property_linear(proj_pk, linear_pk, para_id, name, ty, content, offset, length, msb_first, mask, cond, calc_expr, simuval, reg_par, params, seq): par_pk = get_pk() if para_id in para_id_pk_map: par_pk = para_id_pk_map[para_id] property_linear = TPropertyLinear( C_PK=get_pk(), C_LINEAR_PK=linear_pk, C_PAR_ID=para_id, C_TYPE=ty, C_CONTENT=content, C_OFFSET=offset, C_LENGTH=length, C_MSBFIRST=msb_first, C_MASK=mask, C_CONDITION=cond, C_CALC_EXPR=calc_expr, C_PAR_PK=par_pk, C_SIMUVAL=simuval, C_REG_PAR=reg_par, C_PARAMS=params, C_PROJECT_PK=proj_pk, C_SEQ=seq, C_REL_PK=None ) _session.add(property_linear) if para_id not in para_id_pk_map: if para_id in _para_id_map: get_para_id(para_id) para = TParameter( C_PAR_PK=property_linear.C_PAR_PK, C_PROJECT_PK=proj_pk, C_PAR_CODE=para_id, C_PAR_NAME=name, C_SUBSYS=None, C_TYPE=None, C_UNIT=None, C_VALUE_RANGE=None, C_DIS_REQUIRE=None, C_MODULUS=None, C_PARAMS=None, C_PRECISION='0', C_REG_PK=None, C_METHOD_PK=None ) _session.add(para) if ty == 'ENUM' and content: items: list = content.split(' ') for item in items: idx = items.index(item) name, val = item.split(',') pt = TParameterType( C_PK=get_pk(), C_TYPE_ID=f'{idx}', C_TYPE_NAME=name, C_VALUE=val, C_DATA_TYPE=None, C_PAR_PK=para.C_PAR_PK, C_PROJECT_PK=proj_pk ) _session.add(pt) _session.commit() return property_linear @th_safe def create_enc_linear(proj_pk, enc_item_pk, ty, vals=None, linear_pk=None): """ 创建 t_enc_linear :param proj_pk: 工程pk :param enc_item_pk: :param ty: 001:封装包,002:线性包 :param vals: 逻辑封装包的key值 :return: """ if linear_pk is None: linear_pk = get_pk() enc_linear = TEncLinear( C_PK=get_pk(), C_LINEAR_PK=linear_pk, C_ENCITEM_PK=enc_item_pk, C_VALS=vals, C_PROJECT_PK=proj_pk, C_TYPE=ty, C_FOLDER_PK=None ) _session.add(enc_linear) _session.commit() return enc_linear @th_safe def update_rule_enc(rule_enc): # 更新 _session.query(TRuleEnc).filter(TRuleEnc.C_ENC_PK == rule_enc.C_ENC_PK).update({ TRuleEnc.C_KEY: rule_enc.C_KEY }) _session.commit() @th_safe def create_rulekey_info(proj_pk, rule_pk, rule_id, rule_name, key_pk, key_id, key_name, key_val): info = TRulekeyInfo( C_PK=get_pk(), C_PROJECT_PK=proj_pk, C_RULE_PK=rule_pk, C_RULE_ID=rule_id, C_RULE_NAME=rule_name, C_KEY_PK=key_pk, C_KEY_ID=key_id, C_KEY_NAME=key_name, C_KEY_VAL=key_val ) _session.add(info) _session.commit() ins_ty = { "pkt": 1, "subPkt": 22, "combPkt": 12, "const": 15, "length": 17, "enum": 18, "sendFlag": 26, "checkSum": 20, "insUnit": 4, "insUnitList": 11, "input": 19, "pktSeqCnt": 25, "variableLength": 14 } class BinaryType(Enum): Integer = 0 # 整型 Float = 1 # 浮点型 Ascii = 2 # ASCII class NumberDataType(Enum): Unsigned = 0 # 无符号整型 SignInteger = 1 # 有符号整型 Phy = 2 # 物理量 Bytes = 3 # 多字节码 class InputFormat(Enum): Binary = 0 # 二进制 Decimal = 1 # 十进制 Hex = 2 # 十六进制 class ProcessMethod(Enum): Dirct = 0 # 直读 Equivalent = 1 # 当量 Formula = 2 # 公式 Script = 3 # 脚本 class ExpressionType(Enum): Count = 0 # 个数计算 Formula = 1 # 数值计算 class EnumType(Enum): NameValue = 0 # 名值对枚举 Range = 1 # 范围枚举 Classify = 2 # 多级分类 InsCategory = 3 # 指令类别枚举 InsUnit = 4 # 指令单元枚举 InsFormat = 5 # 指令格式枚举 def make_attr(field: dict): """ 获取字段定义的ATTR。 位掩码,用于标识节点类型。 类型:0~2 BinaryType; 3~5 DataType; 6~8: InputFormat; 9 : IsSubPackage; 10: IsSendFlag; 11~13: ProcessMethod; 14~16: ExpressionType; 17~19: EnumType :param field: 字段信息 :return: """ attr = 0 # 即时输入,无符号整数,十进制,直读 if field['type'] == ins_ty['input']: attr |= (NumberDataType.Unsigned.value << 3) | (InputFormat.Decimal.value << 6) | ( ProcessMethod.Dirct.value << 11) # 是否是子包 attr |= (1 << 9) if field['type'] == ins_ty['subPkt'] else 0 # 是否是发送标记 attr |= (1 << 10) if field['type'] == ins_ty['sendFlag'] else 0 # 计算类型 # 枚举类型 return attr @th_safe def create_ins_format(proj_pk: str, parent_pk: str, info: dict) -> TInsFormat: ins_format = TInsFormat( C_INS_FORMAT_PK=get_pk(), C_PROJECT_PK=proj_pk, C_PARENT_PK=parent_pk, C_ORDER=info['order'] if 'order' in info else 0, C_AUTOCODE=info['autocode'] if 'autocode' in info else None, C_NAME=info['name'] if 'name' in info else '', C_CODE=info['code'] if 'code' in info else '', C_TYPE=ins_ty[info['type']] if 'type' in info else 0, C_DEF=info['def'] if 'def' in info else None, C_BIT_WIDTH=info['bitWidth'] if 'bitWidth' in info else None, C_BIT_ORDER=info['bitOrder'] if 'bitOrder' in info else 0, C_ATTR=info['attr'] if 'attr' in info else 0, C_RANGE=info['range'] if 'range' in info else None, C_CONDITION='', C_FORMULA=info['formula'] if 'formula' in info else '', C_NUMBER='', ) _session.add(ins_format) _session.commit() return ins_format