import uuid from sqlalchemy.orm import sessionmaker, scoped_session from knowledgebase.db.models import engine, TProject, TDevice, TDataStream, TDevStream, TRule, TRuleEnc, TPropertyEnc, \ TPropertyLinear, TRuleStream, TEncLinear, TRuleLinear, TParameter, TParameterType, TExtendInfo, TRulekeyInfo, \ TInsFormat from hashlib import md5 # 创建一个会话工厂 session_factory = sessionmaker(bind=engine) # 创建一个会话对象 Session = scoped_session(session_factory) session = Session() _para_id_map = {} def get_pk(): n = uuid.uuid4().hex pk = md5(n.encode('utf-8')).hexdigest() return pk def create_project(sat_id, sat_name, proj_code, proj_name, desc, date_time, ) -> TProject: """ 创建project :param sat_id: :param sat_name: :param proj_code: :param proj_name: :param desc: :param date_time: :return: 创建完成的project """ project = TProject(C_PROJECT_PK=get_pk(), C_SAT_ID=sat_id, C_SAT_NAME=sat_name, C_PROJECT_CODE=proj_code, C_DESCRIPTION=desc, C_HASH=uuid.uuid4().int & 0xffffffff, C_PROJECT_NAME=proj_name, C_DATETIME=date_time, C_CREATEOR='') session.add(project) session.commit() return project def create_device(device_id, device_name, device_type, dll, project_pk): """ 创建device :param device_id: :param device_name: :param device_type: :param dll: :param project_pk: :return: """ device = TDevice(C_DEV_PK=get_pk(), C_DEV_ID=device_id, C_DEV_NAME=device_name, C_DEV_TYPE=device_type, C_DLL=dll, C_PROJECT_PK=project_pk) session.add(device) session.commit() return device def create_extend_info(proj_pk, prop_id, prop_name, val, fk): ext_info = TExtendInfo( C_PK=get_pk(), C_PROJECT_PK=proj_pk, C_PROPERTY_ID=prop_id, C_PROPERTY_NAME=prop_name, C_VAL=val, C_FOREIGN_PK=fk ) session.add(ext_info) session.commit() def create_data_stream(proj_pk, dev_pk, name, code, data_ty, direct, rule_id, rule_ty, rule_pk=None): """ 创建data_stream :param proj_pk: :param dev_pk: :param name: :param code: :param data_ty: :param direct: :param rule_id: :param rule_ty: :return: """ ds = TDataStream(C_STREAM_PK=get_pk(), C_PROJECT_PK=proj_pk, C_STREAM_ID=code, C_DATA_TYPE=data_ty, C_STREAM_DIR=direct, C_NAME=name, C_DESCRIPTION='', C_RULE_ID=rule_id, C_RULE_TYPE=rule_ty) session.add(ds) link = TDevStream(C_PK=get_pk(), C_DEV_PK=dev_pk, C_STREAM_PK=ds.C_STREAM_PK, C_PROJECT_PK=proj_pk) session.add(link) rule_enc = None # 创建解析规则 if rule_pk is None: rule_pk = get_pk() if rule_ty == '001': # 封装包 rule_enc = create_rule_enc(proj_pk, rule_pk, rule_id, rule_id) rule = create_rule(proj_pk, ds.C_STREAM_PK, rule_id, name, None, None, '0') rule = create_rule(proj_pk, rule_pk, rule_id, rule_id, None, ds.C_STREAM_PK, '1') # rule stream rule_stream = create_rule_stream(proj_pk, rule_pk, ds.C_STREAM_PK, ds.C_STREAM_ID, ds.C_NAME, ds.C_STREAM_DIR, f"{ds.C_NAME}/{rule_id}/") session.add(rule_stream) session.commit() return ds, rule_stream, rule_enc def create_rule(proj_pk, rule_pk, rule_id, rule_name, rule_len, parent_pk, flag, actual_parent_pk=None): rule = TRule( C_PK=get_pk(), C_PROJECT_PK=proj_pk, C_RULE_PK=rule_pk, C_RULE_ID=rule_id, C_RULE_NAME=rule_name, C_RULE_LENGTH=rule_len, C_PARENT_PK=parent_pk, C_FLAG=flag, C_ACTUAL_PARENT_PK=actual_parent_pk ) session.add(rule) session.commit() return rule def find_rule_by_rule_id(rule_id): return session.query(TRule).filter(TRule.C_RULE_ID == rule_id).first() def create_rule_stream(proj_pk, rule_pk, stream_pk, stream_id, stream_name, stream_dir, _path): rule_stream = TRuleStream( C_PK=get_pk(), C_PROJECT_PK=proj_pk, C_RULE_PK=rule_pk, C_STREAM_PK=stream_pk, C_STREAM_ID=stream_id, C_STREAM_NAME=stream_name, C_STREAM_DIR=stream_dir, C_PATH=_path ) session.add(rule_stream) session.commit() return rule_stream def create_ref_ds_rule_stream(proj_pk, stream_pk, stream_id, stream_name, stream_dir, target_stream_pk): items: list = session.query(TRuleStream).filter(TRuleStream.C_STREAM_PK == target_stream_pk).all() for it in items: _path = it.C_PATH if len(_path.split('/')) == 3: continue _path = f'{stream_name}/{stream_id}/'.join(_path.split('/')[2:]) + '/' create_rule_stream(proj_pk, it.C_RULE_PK, stream_pk, stream_id, stream_name, stream_dir, _path) def create_rule_enc(proj_pk, enc_pk, enc_id, name, content=None): rule_enc = TRuleEnc( C_ENC_PK=enc_pk, C_PROJECT_PK=proj_pk, C_ENC_ID=enc_id, C_NAME=name, C_CONTENT=content, ) session.add(rule_enc) session.commit() return rule_enc def create_rule_linear(proj_pk, linear_pk, linear_id, name, length, content): rule_linear = TRuleLinear( C_LINEAR_PK=linear_pk, C_PROJECT_PK=proj_pk, C_LINEAR_ID=linear_id, C_NAME=name, C_LENGTH=length, C_PACKAGE_TYPE=None, C_REL_LINEAR_PK=None, C_CONTENT=content ) session.add(rule_linear) session.commit() return rule_linear def create_property_enc(proj_pk, enc_pk, segment_id, name, ty, content, offset, length, msb_first, mask, cond, seq, rel_enc_item_pk, para_id): property_enc = TPropertyEnc( C_ENCITEM_PK=get_pk(), C_ENC_PK=enc_pk, C_SEGMENT_ID=segment_id, C_NAME=name, C_TYPE=ty, C_CONTENT=content, C_PUBLISH=None, C_OFFSET=offset, C_LENGTH=length, C_MSBFIRST=msb_first, C_MASK=mask, C_CONDITION=cond, C_PROJECT_PK=proj_pk, C_SEQ=seq, C_REL_ENCITEM_PK=rel_enc_item_pk, C_PAR_ID=para_id ) session.add(property_enc) para = TParameter( C_PAR_PK=get_pk(), C_PROJECT_PK=proj_pk, C_PAR_CODE=segment_id, C_PAR_NAME=name, C_SUBSYS=None, C_TYPE='0', C_UNIT=None, C_VALUE_RANGE=None, C_DIS_REQUIRE=None, C_MODULUS=None, C_PARAMS=None, C_PRECISION='0', C_REG_PK=None, C_METHOD_PK=None ) session.add(para) if ty == 'ENUM' and content: items: list = content.split(' ') for item in items: idx = items.index(item) name, val = item.split(',') pt = TParameterType( C_PK=get_pk(), C_TYPE_ID=f'{idx}', C_TYPE_NAME=name, C_VALUE=val, C_DATA_TYPE=None, C_PAR_PK=para.C_PAR_PK, C_PROJECT_PK=proj_pk ) session.add(pt) session.commit() return property_enc def get_para_id(para_id): for i in range(1, 9999): _id = f'{i}'.zfill(4) _para_id = para_id + '_' + _id if _para_id not in _para_id_map: _para_id_map[_para_id] = True return _para_id def create_property_linear(proj_pk, linear_pk, para_id, name, ty, content, offset, length, msb_first, mask, cond, calc_expr, simuval, reg_par, params, seq): property_linear = TPropertyLinear( C_PK=get_pk(), C_LINEAR_PK=linear_pk, C_PAR_ID=para_id, C_TYPE=ty, C_CONTENT=content, C_OFFSET=offset, C_LENGTH=length, C_MSBFIRST=msb_first, C_MASK=mask, C_CONDITION=cond, C_CALC_EXPR=calc_expr, C_PAR_PK=get_pk(), C_SIMUVAL=simuval, C_REG_PAR=reg_par, C_PARAMS=params, C_PROJECT_PK=proj_pk, C_SEQ=seq, C_REL_PK=None ) session.add(property_linear) if para_id in _para_id_map: get_para_id(para_id) para = TParameter( C_PAR_PK=property_linear.C_PAR_PK, C_PROJECT_PK=proj_pk, C_PAR_CODE=para_id, C_PAR_NAME=name, C_SUBSYS=None, C_TYPE=None, C_UNIT=None, C_VALUE_RANGE=None, C_DIS_REQUIRE=None, C_MODULUS=None, C_PARAMS=None, C_PRECISION='0', C_REG_PK=None, C_METHOD_PK=None ) session.add(para) if ty == 'ENUM' and content: items: list = content.split(' ') for item in items: idx = items.index(item) name, val = item.split(',') pt = TParameterType( C_PK=get_pk(), C_TYPE_ID=f'{idx}', C_TYPE_NAME=name, C_VALUE=val, C_DATA_TYPE=None, C_PAR_PK=para.C_PAR_PK, C_PROJECT_PK=proj_pk ) session.add(pt) session.commit() return property_linear def create_enc_linear(proj_pk, enc_item_pk, ty, vals=None, linear_pk=None): """ 创建 t_enc_linear :param proj_pk: 工程pk :param enc_item_pk: :param ty: 001:封装包,002:线性包 :param vals: 逻辑封装包的key值 :return: """ if linear_pk is None: linear_pk = get_pk() enc_linear = TEncLinear( C_PK=get_pk(), C_LINEAR_PK=linear_pk, C_ENCITEM_PK=enc_item_pk, C_VALS=vals, C_PROJECT_PK=proj_pk, C_TYPE=ty, C_FOLDER_PK=None ) session.add(enc_linear) session.commit() return enc_linear def update_rule_enc(rule_enc): # 更新 session.query(TRuleEnc).filter(TRuleEnc.C_ENC_PK == rule_enc.C_ENC_PK).update({ TRuleEnc.C_KEY: rule_enc.C_KEY }) session.commit() def create_rulekey_info(proj_pk, rule_pk, rule_id, rule_name, key_pk, key_id, key_name, key_val): info = TRulekeyInfo( C_PK=get_pk(), C_PROJECT_PK=proj_pk, C_RULE_PK=rule_pk, C_RULE_ID=rule_id, C_RULE_NAME=rule_name, C_KEY_PK=key_pk, C_KEY_ID=key_id, C_KEY_NAME=key_name, C_KEY_VAL=key_val ) session.add(info) session.commit() ins_ty = { "pkt": 1, "subPkt": 22, "combPkt": 12, "const": 15, "length": 17, "enum": 26, "checkSum": 20, } def create_ins_format(proj_pk: str, parent_pk: str, info: dict) -> TInsFormat: ins_format = TInsFormat( C_INS_FORMAT_PK=get_pk(), C_PROJECT_PK=proj_pk, C_PARENT_PK=parent_pk, C_ORDER=info['order'] if 'order' in info else 0, C_AUTOCODE=info['autocode'] if 'autocode' in info else None, C_NAME=info['name'] if 'name' in info else '', C_CODE=info['code'] if 'code' in info else '', C_TYPE=ins_ty[info['type']] if 'type' in info else 0, C_DEF=info['def'] if 'def' in info else None, C_BIT_WIDTH=info['bitWidth'] if 'bitWidth' in info else 0, C_BIT_ORDER=info['bitOrder'] if 'bitOrder' in info else 0, C_ATTR=info['attr'] if 'attr' in info else 0, C_RANGE=info['range'] if 'range' in info else None, C_CONDITION='', C_FORMULA=info['formula'] if 'formula' in info else '', C_NUMBER='', ) session.add(ins_format) session.commit() return ins_format