From 1e85c429ceaad860aba16d1f518160d263c094c0 Mon Sep 17 00:00:00 2001 From: lyg <1543117173@qq.com> Date: 星期二, 08 四月 2025 11:48:52 +0800 Subject: [PATCH] 生成指令帧和包格式结构 --- .gitignore | 4 knowledgebase/markitdown/__about__.py | 4 data_templates.py | 337 ++++ knowledgebase/db/models.py | 3 knowledgebase/markitdown/__main__.py | 82 + db_struct_flow.py | 1218 +++++++++++++--- requirements.txt | 0 knowledgebase/markitdown/__init__.py | 11 knowledgebase/db/__init__.py | 0 knowledgebase/db/data_creator.py | 327 ++++ tc_frame_format.json | 77 + knowledgebase/__init__.py | 0 knowledgebase/utils.py | 11 main.py | 32 knowledgebase/markitdown/_markitdown.py | 1708 ++++++++++++++++++++++++ knowledgebase/db/db_helper.py | 408 +++++ prompts.json | 14 17 files changed, 3,968 insertions(+), 268 deletions(-) diff --git a/.gitignore b/.gitignore index 64c1b54..ddcc4f8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ /db.db -/out +/out_bak /doc /datas /.conda +/docs +/out* \ No newline at end of file diff --git a/data_templates.py b/data_templates.py new file mode 100644 index 0000000..ebf1a92 --- /dev/null +++ b/data_templates.py @@ -0,0 +1,337 @@ +vc_pkt_fields = [ + { + "name": "鐗堟湰鍙�", + "id": "Ver", + "pos": 0, + "length": 3, + "type": "para", + "content": "0", + "dataTy": "INVAR" + }, + { + "name": "绫诲瀷", + "id": "TM_Type", + "pos": 3, + "length": 1, + "type": "para", + "content": "0", + "dataTy": "INVAR" + }, + { + "name": "鍓澶存爣蹇�", + "id": "Vice_Head", + "pos": 4, + "length": 1, + "type": "para", + "dataTy": "RANDOM" + }, + { + "name": "搴旂敤杩囩▼鏍囪瘑绗�", + "id": "Proc_Sign", + "pos": 5, + "length": 11, + "type": "para", + "dataTy": "ENUM", + "is_key": True + }, + { + "name": "鍒嗙粍鏍囧織", + "id": "Group_Sign", + "pos": 16, + "length": 2, + "type": "para", + "content": "3", + "dataTy": "INVAR" + }, + { + "name": "鍖呭簭鍒楄鏁�", + "id": "Package_Count", + "pos": 18, + "length": 14, + "type": "para", + "dataTy": "RANDOM" + }, + { + "name": "鍖呴暱", + "id": "Pack_Len", + "pos": 32, + "length": 16, + "type": "para", + "content": "1Bytes/EPDU_Data.length - 1", + "dataTy": "LEN" + }, + { + "name": "鏈嶅姟", + "id": "service", + "pos": 48, + "length": 8, + "type": "para", + "content": None, + "dataTy": "ENUM", + "is_key": True + }, + { + "name": "瀛愭湇鍔�", + "id": "subservice", + "pos": 56, + "length": 8, + "type": "para", + "content": None, + "dataTy": "ENUM", + "is_key": True + }, + { + "name": "鏁版嵁鍩�", + "id": "EPDU_DATA", + "pos": 64, + "length": "length-current", + "type": "any", + "children": [] + } +] + + +def get_tm_frame(data): + return { + "name": "閬ユ祴甯�", + "id": "TM_Frame", + "type": "enc", + "pos": 0, + "length": 8192, + "children": [ + { + "name": "鍚屾澶�", + "id": "Sync_Head", + "type": "para", + "pos": 0, + "content": "0x1ACFFC1D", + "dataTy": "INVAR", + "length": 32 + }, + { + "name": "VCDU", + "id": "VCDU", + "type": "enc", + "pos": 32, + "length": 8160, + "content": "1", + "children": [ + { + "name": "浼犺緭甯х増鏈彿", + "id": "Ver_", + "type": "para", + "pos": 0, + "length": 2, + "content": "01B", + "dataTy": "INVAR" + }, + { + "name": "鑸ぉ鍣ㄦ爣璇嗙SCID", + "id": "SCID", + "type": "para", + "pos": 2, + "length": 8, + "content": "0x01", + "dataTy": "INVAR" + }, + { + "name": "铏氭嫙淇¢亾鏍囪瘑绗CID", + "id": "VCID", + "type": "para", + "pos": 10, + "length": 6, + "content": data['vcidContent'], + "dataTy": "ENUM" + }, + { + "name": "VCDU璁℃暟", + "id": "VCDUCnt", + "type": "para", + "pos": 16, + "length": 24, + "content": "0:16777215:1", + "dataTy": "INCREASE" + }, + { + "name": "鍥炴斁鏍囧織", + "id": "PlaybackFlag", + "type": "para", + "pos": 40, + "length": 1, + "content": "鍥炴斁,1 涓嶅洖鏀�,0", + "dataTy": "ENUM" + }, + { + "name": "淇濈暀浣�", + "id": "spare", + "type": "para", + "pos": 41, + "length": 7, + "content": "0", + "dataTy": "INVAR" + }, + { + "name": "鎻掑叆鍩�", + "id": "InsertionDomain", + "type": "linear", + "pos": 48, + "length": 640, + "content": None, + "children": data['insertDomain'] + }, + { + "name": "浼犺緭甯ф暟鎹煙", + "id": "DataDomain", + "type": "enc", + "pos": 688, + "length": 7456 + }, + { + "name": "浼犺緭甯у熬", + "id": "FrameTail", + "type": "para", + "pos": 8144, + "length": 16, + "content": "CRC_check;1;All;this.START+0;this.CURRENT-1", + "dataTy": "CHECKSUM" + } + ] + } + ] + } + + +def get_bus_datas(pkts): + return [ + { + "name": "浼犺緭娑堟伅绫诲瀷", + "id": "BMessageType", + "type": "para", + "pos": 0, + "length": 8, + "content": "骞挎挱BC/RT_浼犵粺,0x00 骞挎挱RT/RT_浼犵粺,0xFF BC/RT_浼犵粺,0x11 RT/RT_浼犵粺,0x12 鏃跺垎澶嶇敤妯″紡鐨凚C/RT,0x21", + "dataTy": "ENUM", + "is_key": True + }, + { + "name": "娑堟伅浼犺緭鏍煎紡鍙婃秷鎭綋", + "id": "BMessagePro", + "type": "enc", + "pos": 8, + "length": "length-current", + "children": [ + { + "name": "BMessagePro", + "id": "BMessagePro", + "type": "enc", + "pos": 0, + "length": "length-current", + "vals": "0x11/", + "children": [ + { + "id": "BRT_Add", + "name": "RT鍦板潃", + "type": "para", + "pos": 0, + "content": "1,1 2,2 3,3 4,4 5,5 6,6 7,7 8,8 9,9 10,10 11,11 12,12 13,13 14,14 15,15 16,16 17,17 18,18 19,19 20,20 21,21 22,22 23,23 24,24 25,25 26,26 27,27 28,28 29,29 30,30 31,31", + "length": 8, + "dataTy": "ENUM", + "is_key": True, + }, + { + "id": "BSub_add", + "name": "瀛愬湴鍧�", + "type": "para", + "pos": 8, + "content": "1,1 2,2 3,3 4,4 5,5 6,6 7,7 8,8 9,9 10,10 11,11 12,12 13,13 14,14 15,15 16,16 17,17 18,18 19,19 20,20 21,21 22,22 23,23 24,24 25,25 26,26 27,27 28,28 29,29 30,30 31,31", + "length": 8, + "dataTy": "ENUM", + "is_key": True, + }, + { + "id": "BT_R_M", + "name": "浼犺緭鏂瑰悜/鏂瑰紡浠e彿", + "type": "para", + "pos": 16, + "content": "RT2BC,0xAA BC2RT,0xBB 鏂瑰紡瀛�,0xCC", + "length": 8, + "dataTy": "ENUM", + "is_key": True + }, + { + "id": "BFrame", + "name": "甯у彿", + "type": "para", + "pos": 24, + "content": "0:19:1", + "length": 8, + "dataTy": "INCREASE", + "is_key": True + }, + { + "id": "BusA_B", + "name": "鎬荤嚎A/B", + "type": "para", + "pos": 32, + "content": "A鎬荤嚎,1 B鎬荤嚎,0", + "length": 8, + "dataTy": "ENUM" + }, + { + "id": "BErrorFlag", + "name": "Error Flag(status word.bit12)", + "type": "para", + "pos": 40, + "content": None, + "length": 16, + "dataTy": "RANDOM" + }, + { + "id": "BControlWord", + "name": "ControlWord", + "type": "para", + "pos": 56, + "content": None, + "length": 16, + "dataTy": "RANDOM" + }, + { + "id": "BCommandWord", + "name": "CommandWord", + "type": "para", + "pos": 72, + "content": None, + "length": 16, + "dataTy": "RANDOM" + }, + { + "id": "BStatusWord", + "name": "StatusWord", + "type": "para", + "pos": 88, + "content": None, + "length": 16, + "dataTy": "RANDOM" + }, + { + "id": "BTime", + "name": "浼犺緭鏃堕棿", + "type": "para", + "pos": 104, + "content": None, + "length": 64, + "dataTy": "RANDOM" + }, + { + "id": "SA7_258", + "name": "缁煎悎鏁扮鍗曞厓鏁版嵁鍧椾紶杈�", + "type": "any", + "pos": 168, + "length": "length-current", + "children": pkts + } + ] + } + ] + } + ] diff --git a/db_struct_flow.py b/db_struct_flow.py index ade1e21..bf2817a 100644 --- a/db_struct_flow.py +++ b/db_struct_flow.py @@ -1,20 +1,52 @@ import os +import time from datetime import datetime from openai import OpenAI -from pathlib import Path import re import json -import copy -from datas import pkt_vc, pkt_datas, dev_pkt, proj_data -from db.db_generate import create_project, create_device, create_data_stream -from db.models import TProject, TDevice +import data_templates +from knowledgebase.db.db_helper import create_project, create_device, create_data_stream, \ + update_rule_enc, create_extend_info, create_ref_ds_rule_stream, create_ins_format +from knowledgebase.db.data_creator import create_prop_enc, create_enc_pkt, get_data_ty, create_any_pkt + +from knowledgebase.db.models import TProject + +file_map = { + "鏂囨。鍚堝苟": "./doc/鏂囨。鍚堝苟.md", + "閬ユ祴婧愬寘璁捐鎶ュ憡": "./doc/XA-5D鏃犱汉鏈哄垎绯荤粺鎺㈡祴婧愬寘璁捐鎶ュ憡锛堝叕寮�锛�.md", + "閬ユ祴澶х翰": "./doc/XA-5D鏃犱汉鏈烘帰娴嬪ぇ绾诧紙鍏紑锛�.md", + "鎬荤嚎浼犺緭閫氫俊甯у垎閰�": "./doc/XA-5D鏃犱汉鏈�1314A鎬荤嚎浼犺緭閫氫俊甯у垎閰嶏紙鍏紑锛�.md", + "搴旂敤杞欢鐢ㄦ埛闇�姹�": "./doc/XA-5D鏃犱汉鏈鸿蒋浠剁敤鎴烽渶姹傦紙鍏紑锛�.docx.md", + "鎸囦护鏍煎紡": "./doc/ZL鏍煎紡(鍏紑).docx.md" +} +# file_map = { +# "閬ユ祴婧愬寘璁捐鎶ュ憡": "./docs/HY-4A鏁扮鍒嗙郴缁熼仴娴嬫簮鍖呰璁℃姤鍛� Z 240824 鏇存敼3(鍐呴儴) .docx.md", +# "閬ユ祴澶х翰": "./docs/HY-4A鍗槦閬ユ祴澶х翰 Z 240824 鏇存敼3锛堝唴閮級.docx.md", +# "鎬荤嚎浼犺緭閫氫俊甯у垎閰�": "./docs/HY-4A鍗槦1553B鎬荤嚎浼犺緭閫氫俊甯у垎閰� Z 240824 鏇存敼3锛堝唴閮級.docx.md", +# "搴旂敤杞欢鐢ㄦ埛闇�姹�": "./docs/HY-4A鏁扮鍒嗙郴缁熷簲鐢ㄨ蒋浠剁敤鎴烽渶姹傦紙鏄熷姟绠$悊鍒嗗唽锛� Z 240831 鏇存敼4锛堝唴閮級.docx.md" +# } +# file_map = { +# "鏂囨。鍚堝苟": "./doc/鏂囨。鍚堝苟.md", +# "閬ユ祴婧愬寘璁捐鎶ュ憡": "./doc/XA-5D鏃犱汉鏈哄垎绯荤粺鎺㈡祴婧愬寘璁捐鎶ュ憡锛堝叕寮�锛�.md", +# "閬ユ祴澶х翰": "./doc/XA-5D鏃犱汉鏈烘帰娴嬪ぇ绾诧紙鍏紑锛�.md", +# "鎬荤嚎浼犺緭閫氫俊甯у垎閰�": "./doc/XA-5D鏃犱汉鏈�1314A鎬荤嚎浼犺緭閫氫俊甯у垎閰嶏紙鍏紑锛�.md" +# } BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1' API_KEY = 'sk-15ecf7e273ad4b729c7f7f42b542749e' -MODEL_NAME = 'qwen-long' +MODEL_NAME = 'qwen2.5-14b-instruct-1m' +# BASE_URL = 'http://10.74.15.164:11434/v1/' +# API_KEY = 'ollama' +# MODEL_NAME = 'qwen2.5:32b-128k' + +# BASE_URL = 'http://10.74.15.164:1001/api' +# API_KEY = 'sk-a909385bc14d4491a718b6ee264c3227' +# MODEL_NAME = 'qwen2.5:32b-128k' + +USE_CACHE = True assistant_msg = """ # 瑙掕壊 浣犳槸涓�涓笓涓氱殑鏂囨。閫氫俊鍒嗘瀽甯堬紝鎿呴暱杩涜鏂囨。鍒嗘瀽鍜岄�氫俊鍗忚鍒嗘瀽锛屽悓鏃惰兘澶熻В鏋� markdown 绫诲瀷鐨勬枃妗c�傛嫢鏈夋垚鐔熷噯纭殑鏂囨。闃呰涓庡垎鏋愯兘鍔涳紝鑳藉濡ュ杽澶勭悊澶氭枃妗i棿瀛樺湪寮曠敤鍏崇郴鐨勫鏉傛儏鍐点�� @@ -25,64 +57,259 @@ 2. 鍒嗘瀽鏂囨。鐨勭粨鏋勩�佷富棰樺拰閲嶇偣鍐呭锛屽悓鏍峰彧渚濇嵁鏂囨。杩涜琛ㄨ堪銆� 3. 濡傛灉鏂囨。闂村瓨鍦ㄥ紩鐢ㄥ叧绯伙紝姊崇悊寮曠敤鑴夌粶锛屾槑纭悇鏂囨。涔嬮棿鐨勫叧鑱旓紝涓斾粎鍛堢幇鏂囨。涓綋鐜扮殑鍐呭銆� - ### 鎶�鑳� 2锛氶�氫俊鍗忚鍒嗘瀽 1. 鎺ユ敹閫氫俊鍗忚鐩稿叧淇℃伅锛岀悊瑙e崗璁殑瑙勫垯鍜屾祦绋嬶紝浠呬緷鎹墍缁欎俊鎭繘琛屽垎鏋愩�� +## 鑳屾櫙鐭ヨ瘑 +###杞欢涓昏鍔熻兘涓庤繍琛屾満鍒舵�荤粨濡備笅锛� +1. 鏁版嵁閲囬泦鍜屽鐞嗭細 + DIU璐熻矗鏍规嵁鍗槦鐨勫伐浣滅姸鎬佹垨妯″紡鎻愪緵閬ユ祴鏁版嵁锛屽寘鎷ā鎷熼噺锛圓N锛夈�佹�荤嚎淇″彿锛圔L锛変互鍙婃俯搴︼紙TH锛夊拰鏁板瓧閲忥紙DS锛夛紝骞跺皢杩欎簺淇℃伅鎵撳寘锛岄�氳繃鎬荤嚎鍙戦�佺粰SMU銆� + SMU鍒欐敹闆嗙‖閫氶亾涓婄殑閬ユ祴鍙傛暟锛屽苟閫氳繃鎬荤嚎鎺ユ敹DIU閲囬泦鐨勪俊鎭�� +2. 澶氳矾澶嶇敤涓庢暟鎹紶杈擄細 + 閬ユ祴婧愬寘琚粍缁囨垚E-PDU锛岃繘涓�姝ュ鐢ㄤ负M-PDU锛屽苟濉厖鍒癡CDU涓瀯鎴愰仴娴嬪抚銆� + 鍒╃敤CCSDS AOS CADU鏍煎紡杩涜閬ユ祴鏁版嵁鐨勫璺鐢ㄥ拰浼犺緭銆� +3. 铏氭嫙淇¢亾锛圴C锛夎皟搴︽満鍒讹細 + 閫氳繃甯歌閬ユ祴VC銆佺獊鍙戞暟鎹甐C銆佸欢鏃堕仴娴媀C銆佽褰曟暟鎹甐C浠ュ強鍥炴斁VC瀹炵幇涓嶅悓绫诲瀷鐨勬暟鎹笅浼犮�� +4. 閬ユ帶鎸囦护澶勭悊锛� + 涓婅閬ユ帶鍖呮嫭鐩存帴鎸囦护鍜岄棿鎺ユ寚浠わ紝闇�缁忚繃鏍煎紡楠岃瘉鍚庤浆鍙戠粰鐩稿簲鍗曟満鎵ц銆� + 閬ユ帶甯ч�氳繃鐗瑰畾鐨勮櫄鎷熶俊閬擄紙VC锛夎繘琛屼紶杈撱�� +杩欎簺鐭ヨ瘑闇�瑕佷綘璁颁綇锛屽啀鍚庣画鐨勫鐞嗕腑鍙互甯姪浣犵悊瑙h澶勭悊鐨勬暟鎹�� + ## 鐩爣瀵煎悜 1. 閫氳繃瀵规枃妗e拰閫氫俊鍗忚鐨勫垎鏋愶紝涓虹敤鎴锋彁渚涙竻鏅般�佸噯纭殑鏁版嵁缁撴瀯锛屽府鍔╃敤鎴锋洿濂藉湴鐞嗚В鍜屼娇鐢ㄧ浉鍏充俊鎭�� -2. 浠� JSON 鏍煎紡缁勭粐杈撳嚭鍐呭锛岀‘淇濇暟鎹粨鏋勭殑瀹屾暣鎬у拰鍙鎬с�� ## 瑙勫垯 -1. 姣忎竴涓瀷鍙烽兘浼氭湁涓�濂楁枃妗o紝闇�鍑嗙‘鍒ゆ柇鏄惁涓哄悓涓�涓瀷鍙风殑鏂囨。鍚庡啀杩涜鏁翠綋鍒嗘瀽銆� -2. 姣忔鍙垎鏋愬悓涓�涓瀷鍙枫�� -3. 澶у鏁版枃妗g粨鏋勪负锛氬瀷鍙蜂笅鍖呭惈璁惧锛岃澶囦笅鍖呭惈鏁版嵁娴侊紝鏁版嵁娴佷笅鍖呭惈鏁版嵁甯э紝鏁版嵁甯т腑鏈変竴鍧楁槸鍖呭煙锛屽寘鍩熶腑浼氭寕杞藉悇绉嶇被鍨嬬殑鏁版嵁鍖呫�� -4. 杩欎簺鏂囨。閮芥槸鏁版嵁浼犺緭鍗忚鐨勬弿杩帮紝鍦ㄦ暟鎹祦銆佹暟鎹抚銆佹暟鎹寘绛変紶杈撳疄浣撲腑閮芥弿杩颁簡鍚勪釜瀛楁鐨勫垎甯冨拰姣忎釜瀛楁鐨勫ぇ灏忥紝涓斿ぇ灏忓崟浣嶄笉缁熶竴锛岄渶鐞嗚В杩欎簺鍗曚綅锛屽苟灏嗘墍鏈夎緭鍑哄崟浣嶇粺涓�涓� bits锛岀粺涓�浣跨敤length琛ㄧず銆� -5. 濡傛灉鏈夊眰绾э紝浣跨敤鏍戝舰 JSON 杈撳嚭锛屽瓙鑺傜偣 key 浣跨敤children锛涢渶淇濊瘉鐩稿悓绫诲瀷鐨勬暟鎹粨鏋勭粺涓�锛屽苟涓斿垽鏂瘡涓眰绾ф槸浠�涔堢被鍨嬶紝杈撳嚭绫诲瀷瀛楁锛岀被鍨嬪瓧娈电殑 key 浣跨敤 type 锛涗緥濡傚綋鍓嶅眰绾т负瀛楁鏃朵娇鐢細type:"field"锛涘綋鍓嶅眰绾т负璁惧鏃朵娇鐢細type:"device" -6.鍚嶇О鐩稿叧鐨勫瓧娈电殑 key 浣跨敤name锛涗唬鍙锋垨鑰呭敮涓�鏍囪瘑鐩稿叧鐨勫瓧娈电殑key浣跨敤id锛涘簭鍙风浉鍏崇殑瀛楁鐨刱ey浣跨敤number锛涘叾浠栨病鏈変妇渚嬬殑瀛楁浣跨敤绮剧畝鐨勭炕璇戜綔涓哄瓧娈电殑key锛� -7.鎺㈡祴甯т负CADU锛屽叾涓寘鍚悓姝ュご鍜孷CDU锛屾寜鐓т範鎯渶瑕佷娇鐢╒CDU灞傜骇鍖呭惈涓嬩竴灞傜骇涓紶杈撳抚涓诲澶淬�佷紶杈撳抚鎻掑叆鍩熴�佷紶杈撳抚鏁版嵁鍩熴�佷紶杈撳抚灏剧殑缁撴瀯 - +1. 姣忎竴涓瀷鍙烽兘浼氭湁涓�濂楁枃妗o紝闇�鍑嗙‘鍒ゆ柇鏄惁涓哄悓涓�涓瀷鍙风殑鏂囨。鍚庡啀杩涜鏁翠綋鍒嗘瀽锛屾瘡娆″彧鍒嗘瀽鍚屼竴涓瀷鍙风殑鏂囨。銆� +2. 澶у鏁版枃妗g粨鏋勪负锛氬瀷鍙蜂笅鍖呭惈璁惧锛岃澶囦笅鍖呭惈鏁版嵁娴侊紝鏁版嵁娴佷笅鍖呭惈鏁版嵁甯э紝鏁版嵁甯т腑鏈変竴鍧楁槸鍖呭煙锛屽寘鍩熶腑浼氭寕杞藉悇绉嶇被鍨嬬殑鏁版嵁鍖呫�� +3. 鏂囨。閮芥槸瀵逛簬鏁版嵁浼犺緭鍗忚鐨勬弿杩帮紝鍦ㄦ暟鎹祦銆佹暟鎹抚銆佹暟鎹寘绛変紶杈撳疄浣撲腑閮芥弿杩颁簡鍚勪釜瀛楁鐨勫垎甯冦�佸悇涓瓧娈电殑澶у皬鍜屼綅缃瓑淇℃伅锛屼笖澶у皬鍗曚綅涓嶇粺涓�锛岄渶鐞嗚В杩欎簺鍗曚綅锛屽苟灏嗘墍鏈夎緭鍑哄崟浣嶇粺涓�涓� bits锛岄暱搴﹀瓧娈典娇鐢� length 琛ㄧず锛屼綅缃瓧娈典娇鐢� pos 琛ㄧず锛屽鏋滀负鍙橀暱浣跨敤鈥�"鍙橀暱"鈥濊〃绀恒�� +4. 濡傛灉鏈夊眰绾э紝浣跨敤鏍戝舰 JSON 杈撳嚭锛屽鏋滄湁瀛愯妭鐐癸紝瀛愯妭鐐� key 浣跨敤children锛涢渶淇濊瘉涓�娆¤緭鍑虹殑鏁版嵁缁撴瀯缁熶竴锛屽苟涓斿垽鏂瘡涓眰绾ф槸浠�涔堢被鍨嬶紝杈撳嚭绫诲瀷瀛楁锛坱ype锛夛紝绫诲瀷瀛楁鐨� key 浣跨敤 type锛岀被鍨嬪寘鎷細鍨嬪彿锛坧roject锛夈�佽澶囷紙dev锛夈�佸皝瑁呭寘锛坋nc锛夈�佺嚎鎬у寘锛坙inear锛夈�佸弬鏁帮紙para锛夛紝灏佽鍖呭瓙绾ф湁鏁版嵁鍖咃紝鎵�浠ype涓篹nc锛岀嚎鎬у寘瀛愮骇鍙湁鍙傛暟锛屾墍浠ype涓簂inear锛涙瘡涓眰绾ч兘鍖呭惈鍋忕Щ浣嶇疆锛坧os锛夛紝姣忎釜灞傜骇鐨勫亸绉讳綅缃粠0寮�濮嬨�� +5. 鍚嶇О鐩稿叧鐨勫瓧娈电殑 key 浣跨敤name锛涗唬鍙枫�佺紪鍙锋垨鑰呭敮涓�鏍囪瘑鐩稿叧鐨勫瓧娈电殑key浣跨敤id锛宨d鐢辨暟瀛椼�佽嫳鏂囧瓧姣嶃�佷笅鍒掔嚎缁勬垚涓斾互鑻辨枃瀛楁瘝寮�澶达紝闀垮害灏介噺绠�鐭紱搴忓彿鐩稿叧鐨勫瓧娈电殑key浣跨敤number锛涘亸绉讳綅缃浉鍏冲瓧娈电殑key浣跨敤pos锛涘叾浠栨病鏈変妇渚嬬殑瀛楁浣跨敤绮剧畝鐨勭炕璇戜綔涓哄瓧娈电殑key锛涙瘡涓粨鏋勫繀椤诲寘鍚玭ame鍜宨d銆� +6. 閬ユ祴甯т负CADU锛屽叾涓寘鍚悓姝ュご鍜孷CDU锛屾寜鐓т範鎯渶瑕佷娇鐢╒CDU灞傜骇宓屽浼犺緭甯т富瀵煎ご銆佷紶杈撳抚鎻掑叆鍩熴�佷紶杈撳抚鏁版嵁鍩熴�佷紶杈撳抚灏剧殑缁撴瀯銆� +7. 鏁版嵁鍖呭瓧娈靛寘鎷細name銆乮d銆乼ype銆乸os銆乴ength銆乧hildren锛涘弬鏁板瓧娈靛寘鎷細name銆乮d銆乸os銆乼ype銆乴ength锛涘繀椤诲寘鍚玴os鍜宭ength瀛楁銆� +8. 甯哥敤id鍙傝�冿細閬ユ祴锛圱M锛夈�侀仴鎺э紙TC锛夈�佹�荤嚎锛圔US锛夈�佺増鏈彿锛圴er锛夈�佸簲鐢ㄨ繃绋嬫爣璇嗭紙APID锛夈�� +9. 娉ㄦ剰锛氫竴瀹氳璁板緱morkdown鏂囨。涓細灏嗕竴浜涚壒娈婂瓧绗﹁繘琛岃浆涔夛紝浠ユ鏉ヤ繚璇佹枃妗g殑姝g‘鎬э紝杩欎簺杞箟绗﹀彿锛堜篃灏辨槸鍙嶆枩鏉犫�榎鈥欙級涓嶉渶瑕佸湪缁撴灉涓緭鍑恒�� +10. 浠� JSON 鏍煎紡缁勭粐杈撳嚭鍐呭锛岀‘淇濇暟鎹粨鏋勭殑瀹屾暣鎬у拰鍙鎬э紝娉ㄦ剰锛氱敓鎴愮殑JSON璇硶鏍煎紡蹇呴』绗﹀悎json瑙勮寖锛岄伩鍏嶅嚭鐜伴敊璇�� + ## 闄愬埗锛� - 鎵�杈撳嚭鐨勫唴瀹瑰繀椤绘寜鐓SON鏍煎紡杩涜缁勭粐锛屼笉鑳藉亸绂绘鏋惰姹傦紝涓斾弗鏍奸伒寰枃妗e唴瀹硅繘琛岃緭鍑猴紝鍙緭鍑� JSON 锛屼笉瑕佽緭鍑哄叾瀹冩枃瀛椼�� -- 涓嶈緭鍑轰换浣曟敞閲婄瓑鎻忚堪鎬т俊鎭� - +- 涓嶈緭鍑轰换浣曟敞閲婄瓑鎻忚堪鎬т俊鎭�� """ + +g_completion = None + + +def read_from_file(cache_file): + with open(cache_file, 'r', encoding='utf-8') as f: + text = f.read() + return text + + +def save_to_file(text, file_path): + if USE_CACHE: + with open(file_path, 'w', encoding='utf-8') as f: + f.write(text) + + +json_pat = re.compile(r'```json(.*?)```', re.DOTALL) + + +def remove_markdown(text): + # 浣跨敤姝e垯琛ㄨ揪寮忔彁鍙杍son鏂囨湰 + try: + return json_pat.findall(text)[0] + except IndexError: + return text + + +def rt_pkt_map_gen(pkt, trans_ser, rt_pkt_map, pkt_id, vals): + # 閫昏緫灏佽鍖咃紝鏁版嵁鍧椾紶杈撶殑鍙湁涓�涓紝鍙栨暟鐨勬牴鎹甊T鍦板潃銆佸瓙鍦板潃鍜屽抚鍙峰垝鍒� + frame_num = pkt['frameNum'] + if trans_ser == '鏁版嵁鍧椾紶杈�': + # 鏁版嵁鍧椾紶杈撴牴鎹甊T鍦板潃鍜屽瓙鍦板潃鍒掑垎 + key = f'{pkt["rt"]}_{pkt["subAddr"]}' + name = f'{pkt["rt"]}_{pkt["subAddr"]}_{trans_ser}' + else: + # 鍙栨暟鏍规嵁RT鍦板潃銆佸瓙鍦板潃鍜屽抚鍙峰垝鍒� + key = f'{pkt["rt"]}_{pkt["subAddr"]}_{pkt["frameNum"]}' + name = f'{pkt["rt"]}_{pkt["subAddr"]}_甯у彿{frame_num}_{trans_ser}' + # + if key not in rt_pkt_map: + rt_pkt_map[key] = { + "name": name, + "id": pkt_id, + "type": "logic", + "pos": 0, + "content": "CYCLEBUFFER,Message,28,0xFFFF", + "length": "", + "vals": vals, + "children": [] + } + frame = f'{pkt["frameNum"]}' + + interval = f'{pkt["interval"]}'.replace(".", "_") + if trans_ser == '鍙栨暟': + _key = f'RT{pkt["rtAddr"]}Frame{frame.replace("|", "_")}_Per{interval}' + else: + # 鏁版嵁鍧椾紶杈� + if pkt['burst']: + _key = f'RT{pkt["rtAddr"]}FrameALL' + else: + _key = f'RT{pkt["rtAddr"]}Frame{frame}Per{interval}' + + _pkt = next(filter(lambda it: it['name'] == _key, rt_pkt_map[key]['children']), None) + if _pkt is None: + ext_info = None + if trans_ser == '鏁版嵁鍧椾紶杈�' and not pkt['burst']: + # 鏁版嵁鍧椾紶杈撲笖鏈夊懆鏈熺殑鍖呴渶瑕� + ext_info = [{"id": "PeriodTriger", "name": "鏃跺垎澶嶇敤鎬荤嚎瑙﹀彂灞炴��", "val": f"{pkt['interval']}"}, + {"id": "FrameNumber", "name": "鏃跺垎澶嶇敤鍗忚甯у彿", "val": frame}] + _pkt = { + "name": _key, + "id": _key, + "type": "enc", + "pos": 0, + "content": "1:N;EPDU", + "length": "length", + "extInfo": ext_info, + "children": [ + { + "id": "C02_ver", + "name": "閬ユ祴鐗堟湰", + "type": "para", + "pos": 0, + "length": 3, + "dataTy": "INVAR", + "content": "0" + }, + { + "id": "C02_type", + "name": "绫诲瀷", + "type": "para", + "pos": 3, + "length": 1, + "dataTy": "INVAR", + "content": "0" + }, + { + "id": "C02_viceHead", + "name": "鍓澶存爣璇�", + "type": "para", + "pos": 4, + "length": 1, + "content": "1", + "dataTy": "INVAR" + }, + { + "id": "C02_PackSign", + "name": "APID", + "type": "para", + "pos": 5, + "length": 11, + "is_key": True, + "dataTy": "ENUM" + }, + { + "id": "C02_SerCtr_1", + "name": "搴忓垪鏍囪", + "type": "para", + "pos": 16, + "length": 2, + "content": "3" + }, + { + "id": "C02_SerCtr_2", + "name": "鍖呭簭璁℃暟", + "type": "para", + "pos": 18, + "length": 14, + "content": "0:167772:1", + "dataTy": "INCREASE" + }, + { + "id": "C02_PackLen", + "name": "鍖呴暱", + "type": "para", + "pos": 32, + "length": 16, + "content": "1Bytes/C02_Data.length+1", + "dataTy": "LEN" + }, + { + "id": "C02_Ser", + "name": "鏈嶅姟", + "type": "para", + "pos": 48, + "length": 8, + "is_key": True, + "dataTy": "ENUM" + }, + { + "id": "C02_SubSer", + "name": "瀛愭湇鍔�", + "type": "para", + "pos": 56, + "length": 8, + "is_key": True, + "dataTy": "ENUM" + }, + { + "id": "C02_Data", + "name": "鏁版嵁鍖�", + "type": "linear", + "pos": 64, + "length": 'length-current', + "children": [] + }, + ] + } + rt_pkt_map[key]['children'].append(_pkt) + # 鏁版嵁鍖轰笅闈㈢殑鍖� + data_area = next(filter(lambda it: it['name'] == '鏁版嵁鍖�', _pkt['children']), None) + ser_sub_ser: str = pkt['service'] + ser = '' + sub_ser = '' + if ser_sub_ser: + nums = re.findall(r'\d+', ser_sub_ser) + if len(nums) == 2: + ser = nums[0] + sub_ser = nums[1] + if 'children' not in pkt: + pkt['children'] = [] + p_name = pkt['id'] + '_' + pkt['name'] + + data_area['children'].append({ + "name": p_name, + "id": pkt["id"], + "type": "linear", + "pos": 0, + "length": pkt["length"], + "vals": f"0x{pkt['apid']}/{ser}/{sub_ser}/", + "children": pkt['children'], + }) + + +def build_vcid_content(vcs): + _vcs = [] + for vc in vcs: + _vcs.append(vc['name'] + ',' + vc['VCID']) + return ' '.join(_vcs) class DbStructFlow: - files = [] - file_objects = [] + # 宸ョ▼ + proj: TProject = None + # 閬ユ祴婧愬寘鍒楄〃锛屼粎鍖呭悕绉般�佸寘id鍜宧asParams + tm_pkts = [] + # vc婧愬寘 + vc_pkts = [] - def __init__(self, doc_files): + def __init__(self): self.client = OpenAI( api_key=API_KEY, base_url=BASE_URL, # api_key="ollama", # base_url="http://192.168.1.48:11434/v1/", ) - if doc_files: - self.files = doc_files - self.load_file_objs() - self.delete_all_files() - self.upload_files() - - def load_file_objs(self): - file_stk = self.client.files.list() - self.file_objects = file_stk.data - - def delete_all_files(self): - for file_object in self.file_objects: - self.client.files.delete(file_object.id) - - def upload_file(self, file_path): - file_object = self.client.files.create(file=Path(file_path), purpose="file-extract") - return file_object - - def upload_files(self): - self.file_objects = [] - for file_path in self.files: - file_object = self.upload_file(file_path) - self.file_objects.append(file_object) def run(self): # 鐢熸垚鍨嬪彿缁撴瀯 @@ -90,78 +317,24 @@ # 鐢熸垚鏁版嵁娴佺粨鏋� CADU # 鐢熸垚VCDU缁撴瀯 # 鐢熸垚閬ユ祴鏁版嵁鍖呯粨鏋� - proj = self.gen_project([]) - # proj = TProject(C_PROJECT_PK='2e090a487c1a4f7f741be3a437374e2f') + self.proj = self.gen_project() - devs = self.gen_device([], proj) - # with open('datas/璁惧鍒楄〃.json', 'w', encoding='utf8') as f: - # json.dump(devs, f, ensure_ascii=False, indent=4) - # - # proj['devices'] = devs - # - # messages = [] - # cadu = self.gen_tm_frame(messages) - # with open("datas/鎺㈡祴甯�.json", 'w', encoding='utf8') as f: - # json.dump(cadu, f, ensure_ascii=False, indent=4) - # - # messages = [] - # vcs = self.gen_vc(messages) - # with open('datas/铏氭嫙淇¢亾.json', 'w', encoding='utf8') as f: - # json.dump(vcs, f, ensure_ascii=False, indent=4) - # - # messages = [] - # pkt_vcs = self.gen_pkt_vc(messages) - # with open('datas/VC婧愬寘.json', 'w', encoding='utf8') as f: - # json.dump(pkt_vcs, f, ensure_ascii=False, indent=4) - # - # messages = [] - # dev_pkts = self.gen_dev_pkts(messages) - # with open('datas/璁惧婧愬寘.json', 'w', encoding='utf8') as f: - # json.dump(dev_pkts, f, ensure_ascii=False, indent=4) - # - # messages = [] - # _pkts = self.gen_pkts() - # pkts = [] - # for pkt in _pkts: - # _pkt = self.gen_pkt_details(pkt['name']) - # pkts.append(_pkt) - # with open('datas/婧愬寘鍒楄〃.json', 'w', encoding='utf8') as f: - # json.dump(pkts, f, ensure_ascii=False, indent=4) - # - # for dev in devs: - # ds = dev['data_streams'][0] - # _cadu = copy.deepcopy(cadu) - # ds['cadu'] = _cadu - # _vcdu = next(filter(lambda it: it['name'] == '浼犺緭甯�', _cadu['children'])) - # vcdu_data = next(filter(lambda it: it['name'] == '浼犺緭甯ф暟鎹煙', _vcdu['children'])) - # _vcs = copy.deepcopy(vcs) - # vcdu_data['children'] = _vcs - # dev_pkt = next(filter(lambda it: it['name'] == dev['name'], dev_pkts), None) - # if dev_pkt is None: - # continue - # for pkt in dev_pkt['pkts']: - # for vc in _vcs: - # _pkt = next( - # filter(lambda it: it['name'] == pkt['name'] and it['vcs'].__contains__(vc['code']), pkt_vcs), - # None) - # if _pkt: - # if vc.__contains__('pkts') is False: - # vc['pkts'] = [] - # _pkt = next(filter(lambda it: it['name'] == _pkt['name'], pkts), None) - # if _pkt: - # vc['pkts'].append(_pkt) - # - # with open("datas/鍨嬪彿.json", 'w', encoding='utf8') as f: - # json.dump(proj, f, ensure_ascii=False, indent=4) + devs = self.gen_device(self.proj) + + # self.gen_tc() return '' - def _gen(self, msgs, msg): + def _gen(self, msgs, msg, files=None): + if files is None: + files = [file_map['鏂囨。鍚堝苟']] messages = [] if msgs is None else msgs + doc_text = '' + for file in files: + doc_text += '\n' + read_from_file(file) if len(messages) == 0: - # 濡傛灉鏄涓�娆℃彁闂姞鍏ユ枃妗� + # 濡傛灉鏄涓�娆℃彁闂姞鍏ystem娑堟伅 messages.append({'role': 'system', 'content': assistant_msg}) - for file_object in self.file_objects: - messages.append({'role': 'system', 'content': 'fileid://' + file_object.id}) + messages.append({'role': 'user', 'content': "浠ヤ笅鏄枃妗e唴瀹癸細\n" + doc_text}) messages.append({'role': 'user', 'content': msg}) completion = self.client.chat.completions.create( @@ -171,34 +344,71 @@ temperature=0.0, top_p=0, timeout=30 * 60000, - max_completion_tokens=1000000 + max_completion_tokens=1000000, + seed=0 # stream_options={"include_usage": True} ) - + g_completion = completion text = '' for chunk in completion: if chunk.choices[0].delta.content is not None: text += chunk.choices[0].delta.content print(chunk.choices[0].delta.content, end="") print("") + g_completion = None return text - def gen_project(self, messages): - _msg = f""" -鏍规嵁鏂囨。杈撳嚭鍨嬪彿淇℃伅锛屽瀷鍙峰瓧娈靛寘鎷細鍚嶇О鍜屼唬鍙凤紝浠呰緭鍑哄瀷鍙风殑灞炴�э紝涓嶈緭鍑哄叾浠栧眰绾ф暟鎹� - """ - print('鍨嬪彿淇℃伅锛�') - text = self._gen(messages, _msg) - messages.append({'role': 'assistant', 'content': text}) - text = self.remove_markdown(text) - proj_dict = json.loads(text) - # return proj_dict + def generate_text(self, msg, cache_file, msgs=None, files=None, validation=None, try_cnt=5): + if msgs is None: + msgs = [] + if USE_CACHE and os.path.isfile(cache_file): + text = read_from_file(cache_file) + else: + s = time.time() + text = self._gen(msgs, msg, files) + text = remove_markdown(text) + if validation: + try: + validation(text) + except BaseException as e: + print(e) + if try_cnt <= 0: + raise RuntimeError('鐢熸垚澶辫触锛岄噸璇曟鏁板お澶氾紝寮哄埗缁撴潫锛�') + return self.generate_text(msg, cache_file, msgs, files, validation, try_cnt - 1) + save_to_file(text, cache_file) + print(f'鑰楁椂锛歿time.time() - s}') + return text + + def generate_tc_text(self, msg, cache_file, messages=None, files=None, validation=None, try_cnt=5): + if messages is None: + messages = [] + doc_text = '' + for file in files: + doc_text += '\n' + read_from_file(file) + if len(messages) == 0: + # 濡傛灉鏄涓�娆℃彁闂姞鍏ystem娑堟伅 + messages.append({'role': 'user', 'content': "浠ヤ笅鏄枃妗e唴瀹癸細\n" + doc_text}) + return self.generate_text(msg, cache_file, messages, files, validation, try_cnt) + + def gen_project(self): + # _msg = """ + # 鏍规嵁鏂囨。杈撳嚭鍨嬪彿淇℃伅锛屽瀷鍙峰瓧娈靛寘鎷細鍚嶇О鍜屼唬鍙枫�備粎杈撳嚭鍨嬪彿杩欎竴绾с�� + # 渚嬪锛歿"name":"xxx","id":"xxx"} + # """ + # print('鍨嬪彿淇℃伅锛�') + # text = self.generate_text(_msg, 'out/鍨嬪彿淇℃伅.json', files=[file_map['搴旂敤杞欢鐢ㄦ埛闇�姹�']]) + # proj_dict = json.loads(text) + # 宸ョ▼淇℃伅浠庣郴缁熻幏鍙� + proj_dict = { + "id": "JB200001", + "name": "HY-4A" + } code = proj_dict['id'] name = proj_dict['name'] proj = create_project(code, name, code, name, "", datetime.now()) return proj - def gen_device(self, messages, proj): + def gen_device(self, proj): """ 璁惧鍒楄〃鐢熸垚瑙勫垯锛� 1.濡傛枃妗d腑鏈�1553鍗忚鎻忚堪锛屽姞鍏�1553璁惧 @@ -208,28 +418,29 @@ 璁惧绫诲瀷锛氬伐鎺ф満[0]銆�1553B[1] - :param messages: + :param proj: :return: """ proj_pk = proj.C_PROJECT_PK devices = [] _msg = f""" -杈撳嚭鎵�鏈夎澶囧垪琛紝璁惧瀛楁鍖呮嫭鍚嶇О锛坣ame)銆佷唬鍙凤紙code锛夛紝濡傛灉娌℃湁浠e彿鍒欎娇鐢ㄥ悕绉扮殑鑻辨枃缈昏瘧缂╁啓浠f浛涓旂缉鍐欓暱搴︿笉瓒呰繃5涓瓧绗︼紝JSON鏍煎紡锛屽苟涓旂粰姣忎釜璁惧澧炲姞涓変釜瀛楁锛岀涓�涓瓧娈礹asTcTm鈥滄槸鍚﹀寘鍚仴鎺ч仴娴嬧�濓紝鍒ゆ柇璇ヨ澶囨槸鍚﹀寘鍚仴鎺ч仴娴嬬殑鍔熻兘锛涚浜屼釜瀛楁hasTemperatureAnalog鈥滄槸鍚﹀寘鍚俯搴﹂噺銆佹ā鎷熼噺绛夋暟鎹殑閲囬泦鈥濓紝鍒ゆ柇璇ヨ澶囨槸鍚﹀寘鍚俯搴﹂噺绛変俊鎭殑閲囬泦鍔熻兘锛涚涓変釜瀛楁hasBus鈥滄槸鍚︽槸鎬荤嚎璁惧鈥濓紝鍒ゆ柇璇ヨ澶囨槸鍚﹀睘浜庢�荤嚎璁惧锛屾槸鍚︽湁RT鍦板潃锛涙瘡涓瓧娈电殑鍊奸兘浣跨敤true鎴杅alse鏉ヨ〃绀恒�� -浠呰緭鍑篔SON锛屼笉瑕佽緭鍑篔SON浠ュ鐨勪换浣曞瓧绗︺�� +杈撳嚭鍒嗙郴缁熶笅鐨勭‖浠朵骇鍝侊紙璁惧锛夊垪琛紝瀛楁鍖呮嫭锛氬悕绉�(name)銆佷唬鍙�(code)锛岀‖浠朵骇鍝佸悕绉颁竴鑸細鍖呭惈鈥滅鐞嗗崟鍏冣�濇垨鑰呪�滄帴鍙e崟鍏冣�濓紝濡傛灉娌℃湁浠e彿鍒欎娇鐢ㄥ悕绉扮殑鑻辨枃缂╁啓浠f浛缂╁啓闀垮害涓嶈秴杩�5涓瓧绗�; +骞朵笖缁欐瘡涓‖浠朵骇鍝佸鍔犱笁涓瓧娈碉細绗竴涓瓧娈礹asTcTm鈥滄槸鍚﹀寘鍚仴鎺ч仴娴嬧�濓紝鍒ゆ柇璇ョ‖浠朵骇鍝佹槸鍚﹀寘鍚仴鎺ч仴娴嬬殑鍔熻兘銆� +绗簩涓瓧娈礹asTemperatureAnalog鈥滄槸鍚﹀寘鍚俯搴﹂噺銆佹ā鎷熼噺绛夋暟鎹殑閲囬泦鈥濓紝鍒ゆ柇璇ョ‖浠朵骇鍝佹槸鍚﹀寘鍚俯搴﹂噺绛変俊鎭殑閲囬泦鍔熻兘銆� +绗笁涓瓧娈礹asBus鈥滄槸鍚︽槸鎬荤嚎纭欢浜у搧鈥濓紝鍒ゆ柇璇ヨ澶囨槸鍚﹀睘浜庢�荤嚎纭欢浜у搧锛屾槸鍚︽湁RT鍦板潃锛涙瘡涓瓧娈电殑鍊奸兘浣跨敤true鎴杅alse鏉ヨ〃绀恒�� +浠呰緭鍑篔SON锛岀粨鏋勬渶澶栧眰涓烘暟缁勶紝鏁扮粍鍏冪礌涓鸿澶囦俊鎭紝涓嶈杈撳嚭JSON浠ュ鐨勪换浣曞瓧绗︺�� """ print('璁惧鍒楄〃锛�') - text = self._gen(messages, _msg) - text = self.remove_markdown(text) + cache_file = 'out/璁惧鍒楄〃.json' + + def validation(gen_text): + _devs = json.loads(gen_text) + assert isinstance(_devs, list), '鏁版嵁缁撴瀯鏈�澶栧眰涓嶆槸鏁扮粍' + assert next(filter(lambda it: it['name'].endswith('绠$悊鍗曞厓'), _devs), None), '鐢熸垚鐨勮澶囧垪琛ㄤ腑娌℃湁绠$悊鍗曞厓' + + text = self.generate_text(_msg, cache_file, files=[file_map['搴旂敤杞欢鐢ㄦ埛闇�姹�']], validation=validation) devs = json.loads(text) - hasBus = any(d['hasBus'] for d in devs) - if hasBus: - # 鎬荤嚎璁惧 - dev = create_device("B1553", "1553鎬荤嚎", '1', 'StandardProCommunicationDev', proj_pk) - devices.append(dev) - # 鍒涘缓鏁版嵁娴� - ds_u153 = create_data_stream(proj_pk, dev.C_DEV_PK, 'ECSS涓婅鎬荤嚎鏁版嵁', 'U153', 'B153', '0', 'E153', '001') - ds_d153 = create_data_stream(proj_pk, dev.C_DEV_PK, 'ECSS涓嬭鎬荤嚎鏁版嵁', 'D153', 'B153', '1', 'E153', '001') # 绫籗MU璁惧锛屽寘鍚仴娴嬪拰閬ユ帶鍔熻兘锛屽悕绉扮粨灏句负鈥滅鐞嗗崟鍏冣�� like_smu_devs = list(filter(lambda it: it['hasTcTm'] and it['name'].endswith('绠$悊鍗曞厓'), devs)) @@ -237,13 +448,31 @@ dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK) devices.append(dev) # 鍒涘缓鏁版嵁娴� - ds_tmfl = create_data_stream(proj_pk, dev.C_DEV_PK, 'AOS閬ユ祴', 'TMFL', 'TMFL', '1', 'TMFL', '001') - ds_tcfl = create_data_stream(proj_pk, dev.C_DEV_PK, '閬ユ帶鎸囦护', 'TCFL', 'TCFL', '0', 'TCFL', '006') + ds_tmfl, rule_stream, _ = create_data_stream(proj_pk, dev.C_DEV_PK, 'AOS閬ユ祴', 'TMF1', 'TMFL', '1', 'TMF1', + '001') + self.gen_tm_frame(proj_pk, rule_stream.C_RULE_PK, ds_tmfl, rule_stream.C_PATH) + # ds_tcfl, rule_stream, _ = create_data_stream(proj_pk, dev.C_DEV_PK, '閬ユ帶鎸囦护', 'TCFL', 'TCFL', '0', 'TCFL', + # '006') + hasBus = any(d['hasBus'] for d in devs) + if hasBus: + # 鎬荤嚎璁惧 + dev = create_device("1553", "1553鎬荤嚎", '1', 'StandardProCommunicationDev', proj_pk) + create_extend_info(proj_pk, "BusType", "鎬荤嚎绫诲瀷", "ECSS_Standard", dev.C_DEV_PK) + devices.append(dev) + # 鍒涘缓鏁版嵁娴� + ds_u153, rs_u153, rule_enc = create_data_stream(proj_pk, dev.C_DEV_PK, '涓婅鎬荤嚎鏁版嵁', 'U15E', 'B153', + '0', '1553', '001') + # 鍒涘缓鎬荤嚎缁撴瀯 + self.gen_bus(proj_pk, rule_enc, '1553', ds_u153, rs_u153.C_PATH, dev.C_DEV_NAME) + ds_d153, rule_stream, rule_enc = create_data_stream(proj_pk, dev.C_DEV_PK, '涓嬭鎬荤嚎鏁版嵁', 'D15E', 'B153', + '1', '1553', '001', rs_u153.C_RULE_PK) + create_ref_ds_rule_stream(proj_pk, rule_stream.C_STREAM_PK, rule_stream.C_STREAM_ID, + rule_stream.C_STREAM_NAME, rule_stream.C_STREAM_DIR, rs_u153.C_STREAM_PK) # 绫籖TU璁惧锛屽寘鍚俯搴﹂噺鍜屾ā鎷熼噺鍔熻兘锛屽悕绉扮粨灏句负鈥滄帴鍙e崟鍏冣�� - like_rtu_devs = list(filter(lambda it: it['hasTemperatureAnalog'] and it['name'].endswith('鎺ュ彛鍗曞厓'), devs)) - for dev in like_rtu_devs: - dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK) + # like_rtu_devs = list(filter(lambda it: it['hasTemperatureAnalog'] and it['name'].endswith('鎺ュ彛鍗曞厓'), devs)) + # for dev in like_rtu_devs: + # dev = create_device(dev['code'], dev['name'], '0', 'StandardProCommunicationDev', proj.C_PROJECT_PK) # for dev in like_rtu_devs: # dev = create_device(dev['code'], dev['name'], '0', '', proj.C_PROJECT_PK) @@ -252,151 +481,626 @@ # ds_tmfl = create_data_stream(proj.C_PROJECT_PK, '娓╁害閲�', 'TMFL', 'TMFL', '1', 'TMFL', '001') # ds_tcfl = create_data_stream(proj.C_PROJECT_PK, '妯℃嫙閲�', 'TCFL', 'TCFL', '0', 'TCFL', '006') - print() - # 鎬荤嚎璁惧 - # print('鏄惁鏈夋�荤嚎璁惧锛�', end='') - # _msg = "鏂囨。涓弿杩扮殑鏈夋�荤嚎鐩稿叧鍐呭鍚楋紵浠呭洖绛旓細鈥滄湁鈥濇垨鈥滄棤鈥濓紝涓嶈杈撳嚭鍏朵粬鏂囨湰銆�" - # text = self._gen([], _msg) - # if text == "鏈�": - # _msg = f""" - # 鏂囨。涓弿杩扮殑鎬荤嚎鍨嬪彿鏄灏戯紝浠呰緭鍑烘�荤嚎鍨嬪彿涓嶈杈撳嚭鍨嬪彿浠ュ鐨勫叾浠栦换浣曟枃鏈紝鎬荤嚎鍨嬪彿鐢辨暟瀛楀拰鑻辨枃瀛楁瘝缁勬垚銆� - # """ - # print('璁惧ID锛�') - # dev_code = self._gen([], _msg) - # dev = create_device(dev_code, dev_code, '1', '', proj.C_PROJECT_PK) - # devices.append(dev) - - # 绫籗MU杞欢 - # print('鏄惁鏈夌被SMU璁惧锛�', end='') - # _msg = "鏂囨。涓湁鎻忚堪閬ユ祴鍜岄仴鎺у姛鑳藉悧锛熶粎鍥炵瓟锛氣�滄湁鈥濇垨鈥滄棤鈥濓紝涓嶈杈撳嚭鍏朵粬鏂囨湰銆�" - # text = self._gen([], _msg) - # if text == "鏈�": - # # 绯荤粺绠$悊鍗曞厓 - # print('鏄惁鏈夌郴缁熺鐞嗗崟鍏冿紙SMU锛夛細', end='') - # _msg = f"鏂囨。涓湁鎻忚堪绯荤粺绠$悊鍗曞厓锛圫MU锛夊悧锛熶粎鍥炵瓟鈥滄湁鈥濇垨鈥滄棤鈥濓紝涓嶈杈撳嚭鍏朵粬鏂囨湰銆�" - # text = self._gen([], _msg) - # if text == "鏈�": - # dev = create_device("SMU", "绯荤粺绠$悊鍗曞厓", '0', '', proj.C_PROJECT_PK) - # devices.append(dev) - # # 涓績鎺у埗鍗曞厓锛圕TU锛� - # print('鏄惁鏈変腑蹇冩帶鍒跺崟鍏冿紙CTU锛夛細', end='') - # _msg = f"鏂囨。涓湁鎻忚堪涓績鎺у埗鍗曞厓锛圕TU锛夊悧锛熶粎鍥炵瓟鈥滄湁鈥濇垨鈥滄棤鈥濓紝涓嶈杈撳嚭鍏朵粬鏂囨湰銆�" - # text = self._gen([], _msg) - # if text == "鏈�": - # dev = create_device("CTU", "涓績鎺у埗鍗曞厓", '0', '', proj.C_PROJECT_PK) - # devices.append(dev) - # - # # 绫籖TU - # print('鏄惁鏈夌被RTU璁惧锛�', end='') - # _msg = "鏂囨。涓湁鎻忚堪妯℃嫙閲忛噰闆嗗拰娓╁害閲忛噰闆嗗姛鑳藉悧锛熶粎鍥炵瓟锛氣�滄湁鈥濇垨鈥滄棤鈥濓紝涓嶈杈撳嚭鍏朵粬鏂囨湰銆�" - # text = self._gen([], _msg) - # if text == "鏈�": - # dev = create_device("RTU", "杩滅疆鍗曞厓", '0', '', proj.C_PROJECT_PK) - # devices.append(dev) - # device_dicts = json.loads(text) - # for device_dict in device_dicts: - # data_stream = {'name': '鏁版嵁娴�', 'code': 'DS'} - # device_dict['data_streams'] = [data_stream] - # - # return device_dicts return devices - def gen_tm_frame(self, messages): - _msg = f""" -杈撳嚭鎺㈡祴甯х殑缁撴瀯锛屾帰娴嬪抚瀛楁鍖呮嫭锛氭帰娴嬪抚浠e彿(id)銆佹帰娴嬪抚鍚嶇О(name)銆侀暱搴�(length)銆佷笅绾ф暟鎹崟鍏冨垪琛紙children锛夈�備唬鍙峰鏋滄病鏈夊垯鐢ㄥ悕绉扮殑鑻辨枃缈昏瘧锛屽寘鎷笅绾ф暟鎹崟鍏冦�� + def gen_insert_domain_params(self): + _msg = """ +鍒嗘瀽鏂囨。锛岃緭鍑烘彃鍏ュ煙鐨勫弬鏁板垪琛紝灏嗘墍鏈夊弬鏁板叏閮ㄨ緭鍑猴紝涓嶈鏈夐仐婕忋�� +鏁版嵁缁撴瀯鏈�澶栧眰涓烘暟缁勶紝鏁扮粍鍏冪礌涓哄弬鏁颁俊鎭璞★紝鍙傛暟淇℃伅瀛楁鍖呮嫭锛歯ame銆乮d銆乸os銆乴ength銆乼ype銆� +1涓瓧鑺傜殑闀垮害涓�8浣嶏紝浣跨敤B0-B7鏉ヨ〃绀猴紝璇疯鐪熻绠楀弬鏁伴暱搴︺�� +鏂囨。涓綅缃弿杩颁俊鎭彲鑳藉瓨鍦ㄨ法瀛楄妭鐨勬儏鍐碉紝锛屼緥濡傦細"Byte1_B6~Byte2_B0":琛ㄧず浠庣1涓瓧鑺傜殑绗�7浣嶅埌绗�2涓瓧鑺傜殑绗�1浣嶏紝闀垮害鏄�3;"Byte27_B7~Byte28_B0":琛ㄧず浠庣27涓瓧鑺傜殑绗�8浣嶅埌绗�28涓瓧鑺傜殑绗�1浣嶏紝闀垮害鏄�2銆� +""" + print('鎻掑叆鍩熷弬鏁板垪琛細') + files = [file_map['閬ユ祴澶х翰']] + + def validation(gen_text): + params = json.loads(gen_text) + assert isinstance(params, list), '鎻掑叆鍩熷弬鏁板垪琛ㄦ暟鎹粨鏋勬渶澶栧眰蹇呴』鏄暟缁�' + assert len(params), '鎻掑叆鍩熷弬鏁板垪琛ㄤ笉鑳戒负绌�' + + text = self.generate_text(_msg, './out/鎻掑叆鍩熷弬鏁板垪琛�.json', files=files, validation=validation) + return json.loads(text) + + def gen_tm_frame_data(self): + _msg = """ """ - print('鎺㈡祴甯т俊鎭細') - text = self._gen(messages, _msg) - messages.append({'role': 'assistant', 'content': text}) - text = self.remove_markdown(text) - cadu = json.loads(text) + files = [file_map['閬ユ祴澶х翰']] + + def validation(gen_text): + pass + + def gen_tm_frame(self, proj_pk, rule_pk, ds, name_path): + # 鎻掑叆鍩熷弬鏁板垪琛� + insert_domain = self.gen_insert_domain_params() + + # VC婧愬寘鏍煎紡 + vc_pkt_fields = data_templates.vc_pkt_fields # self.gen_pkt_format() + + # 鑾峰彇铏氭嫙淇¢亾 vc + vcs = self.gen_vc() + for vc in vcs: + vc['children'] = [] + vc['VCID'] = str(int(vc['VCID'], 2)) + for field in vc_pkt_fields: + if field['name'] == '鏁版嵁鍩�': + field['children'] = [] + vc['children'].append(dict(field)) + + # VCID 瀛楁鍐呭 + vcid_content = build_vcid_content(vcs) + + # 閬ユ祴甯х粨鏋勭敱妯℃澘鐢熸垚锛屽彧闇�鎻愪緵鐗瑰畾鍙傛暟 + tm_data = { + "vcidContent": vcid_content, + 'insertDomain': insert_domain, + } + cadu = data_templates.get_tm_frame(tm_data) + + # VC婧愬寘 + self.vc_pkts = self.gen_pkt_vc() + # 閬ユ祴婧愬寘璁捐涓殑婧愬寘鍒楄〃 + self.tm_pkts = self.gen_pkts() + + # 澶勭悊VC涓嬮潰鐨勯仴娴嬪寘鏁版嵁 + for vc in vcs: + # 姝C涓嬬殑閬ユ祴鍖呰繃婊� + _vc_pkts = filter(lambda it: it['vcs'].__contains__(vc['id']), self.vc_pkts) + for _pkt in _vc_pkts: + # 鍒ゆ柇閬ユ祴鍖呮槸鍚︽湁璇︾粏瀹氫箟 + if not next(filter(lambda it: it['name'] == _pkt['name'] and it['hasParams'], self.tm_pkts), None): + continue + # 鑾峰彇鍖呰鎯� + _pkt = self.gen_pkt_details(_pkt['name'], _pkt['id']) + epdu = next(filter(lambda it: it['name'] == '鏁版嵁鍩�', vc['children']), None) + if epdu and _pkt: + _pkt['children'] = _pkt['datas'] + _last_par = _pkt['children'][len(_pkt['children']) - 1] + _pkt['length'] = (_last_par['pos'] + _last_par['length']) + _pkt['pos'] = 0 + if 'children' not in epdu: + epdu['children'] = [] + # 娣诲姞瑙f瀽瑙勫垯鍚庣紑闃叉閲嶅 + _pkt['id'] = _pkt['id'] + '_' + vc['VCID'] + # 缁欏寘鍚嶅姞浠e彿鍓嶇紑 + if not _pkt['name'].startswith(_pkt['id']): + _pkt['name'] = _pkt['id'] + '_' + _pkt['name'] + epdu['children'].append(_pkt) + apid_node = next(filter(lambda it: it['name'].__contains__('搴旂敤杩囩▼'), _pkt['headers']), None) + ser_node = next(filter(lambda it: it['name'] == '鏈嶅姟', _pkt['headers']), None) + sub_ser_node = next(filter(lambda it: it['name'] == '瀛愭湇鍔�', _pkt['headers']), None) + _pkt['vals'] = \ + f"{apid_node['content']}/{int(ser_node['content'], 16)}/{int(sub_ser_node['content'], 16)}/" + + # 閲嶆柊璁℃暟璧峰鍋忕Щ + self.compute_length_pos(cadu['children']) + + # 灏嗘暟鎹彃鍏ユ暟鎹簱 + seq = 1 + for cadu_it in cadu['children']: + if cadu_it['name'] == 'VCDU': + # VCDU + # 灏嗕俊閬撴浛鎹㈠埌鏁版嵁鍩熶綅缃� + vc_data = next(filter(lambda it: it['name'].__contains__('鏁版嵁鍩�'), cadu_it['children']), None) + if vc_data: + idx = cadu_it['children'].index(vc_data) + cadu_it['children'].pop(idx) + for vc in vcs: + # 澶勭悊铏氭嫙淇¢亾灞炴�� + vc['type'] = 'logic' + vc['length'] = vc_data['length'] + vc['pos'] = vc_data['pos'] + vc['content'] = 'CCSDSMPDU' + vcid = vc['VCID'] + vc['condition'] = f'VCID=={vcid}' + # 灏嗚櫄鎷熶俊閬撴彃鍏ュ埌VCDU + cadu_it['children'].insert(idx, vc) + idx += 1 + for vc in vcs: + self.compute_length_pos(vc['children']) + + # 璁剧疆VCID鐨刢ontent + vcid_node = next(filter(lambda it: it['name'].__contains__('VCID'), cadu_it['children']), None) + if vcid_node: + vcid_node['content'] = vcid_content + + create_enc_pkt(proj_pk, rule_pk, cadu_it, rule_pk, seq, name_path, ds, '001', 'ENC') + else: + # 鍙傛暟 + create_prop_enc(proj_pk, rule_pk, cadu_it, get_data_ty(cadu_it), seq) + seq += 1 return cadu - def gen_vc(self, messages): - _msg = f""" -杈撳嚭鎺㈡祴铏氭嫙淇¢亾鐨勫垝鍒嗭紝涓嶉渶瑕佹弿杩颁俊鎭紝浣跨敤涓�涓暟缁勮緭鍑猴紝瀛楁鍖呮嫭锛氫唬鍙�(code)銆乿cid銆佸悕绉�(name)銆� - """ + def gen_vc(self): + _msg = """ +璇峰垎鏋愭枃妗d腑鐨勯仴娴嬪寘鏍煎紡锛岃緭鍑洪仴娴嬭櫄鎷熶俊閬撶殑鍒掑垎锛屾暟鎹粨鏋勬渶澶栧眰涓烘暟缁勶紝鏁扮粍鍏冪礌涓鸿櫄鎷熶俊閬撲俊鎭瓧鍏革紝瀛楀吀鍖呭惈浠ヤ笅閿�煎锛� +id: 铏氭嫙淇¢亾浠e彿 +name: 铏氭嫙淇¢亾鍚嶇О +VCID: 铏氭嫙淇¢亾VCID锛堜簩杩涘埗锛� +format: 鏍规嵁铏氭嫙淇¢亾绫诲瀷鑾峰彇瀵瑰簲鐨勬暟鎹寘鐨勬牸寮忕殑鍚嶇О +娣卞叆鐞嗚В鏂囨。涓弿杩扮殑鍏崇郴锛屼緥濡傦細鏂囨。涓弿杩颁簡甯歌閬ユ祴鏄父瑙勬暟鎹殑涓嬩紶淇¢亾锛屽苟涓旇繕鎻忚堪浜嗗垎绯荤粺甯歌閬ユ祴鍙傛暟鍖呭氨鏄疄鏃堕仴娴嬪弬鏁板寘锛屽苟涓旀枃妗d腑瀵瑰疄鏃堕仴娴嬪弬鏁板寘鐨勬牸寮忚繘琛屼簡鎻忚堪锛屾墍浠ュ父瑙勯仴娴媀C搴旇杈撳嚭涓猴細{"id": "1", "name": "甯歌閬ユ祴VC", "VCID": "0", "format": "瀹炴椂閬ユ祴鍙傛暟鍖�"} +""" + + def validation(gen_text): + vcs = json.loads(gen_text) + assert next(filter(lambda it: re.match('^[0-1]+$', it['VCID']), vcs)), '鐢熸垚鐨刅CID蹇呴』鏄簩杩涘埗' + print('铏氭嫙淇¢亾锛�') - text = self._gen(messages, _msg) - messages.append({'role': 'assistant', 'content': text}) - text = self.remove_markdown(text) + text = self.generate_text(_msg, "out/铏氭嫙淇¢亾.json", files=[file_map['閬ユ祴澶х翰']], validation=validation) vcs = json.loads(text) return vcs - def gen_dev_pkts(self, messages): + def gen_dev_pkts(self): _msg = f""" -杈撳嚭鏂囨。涓帰娴嬫簮鍖呯被鍨嬪畾涔夋弿杩扮殑璁惧浠ュ強璁惧涓嬮潰鐨勬帰娴嬪寘锛屾暟鎹粨鏋勶細鏈�澶栧眰涓鸿澶囧垪琛� > 鎺㈡祴鍖呭垪琛�(pkts)锛岃澶囧瓧娈靛寘鎷細鍚嶇О(name)銆佷唬鍙�(id)锛屾簮鍖呭瓧娈靛寘鎷細鍚嶇О(name)銆佷唬鍙�(id) +杈撳嚭鏂囨。涓仴娴嬫簮鍖呯被鍨嬪畾涔夋弿杩扮殑璁惧浠ュ強璁惧涓嬮潰鐨勯仴娴嬪寘锛屾暟鎹粨鏋勶細鏈�澶栧眰涓烘暟缁� > 璁惧 > 閬ユ祴鍖呭垪琛�(pkts)锛岃澶囧瓧娈靛寘鎷細鍚嶇О(name)銆佷唬鍙�(id)锛屾簮鍖呭瓧娈靛寘鎷細鍚嶇О(name)銆佷唬鍙�(id) """ - print('璁惧鎺㈡祴婧愬寘淇℃伅锛�') - file = next(filter(lambda it: it.filename == 'XA-5D鏃犱汉鏈哄垎绯荤粺鎺㈡祴婧愬寘璁捐鎶ュ憡锛堝叕寮�锛�.md', self.file_objects), - None) - messages = [{'role': 'system', 'content': assistant_msg}, {'role': 'system', 'content': 'fileid://' + file.id}] - text = self._gen(messages, _msg) - messages.append({'role': 'assistant', 'content': text}) - text = self.remove_markdown(text) + print('璁惧閬ユ祴婧愬寘淇℃伅锛�') + files = [file_map["閬ユ祴婧愬寘璁捐鎶ュ憡"]] + text = self.generate_text(_msg, 'out/璁惧鏁版嵁鍖�.json', [], files) dev_pkts = json.loads(text) return dev_pkts - def gen_pkt_details(self, pkt_name): + def pkt_in_tm_pkts(self, pkt_name): + cache_file = f'out/鏁版嵁鍖�-{pkt_name}.json' + if os.path.isfile(cache_file): + return True + files = [file_map['閬ユ祴婧愬寘璁捐鎶ュ憡']] + print(f'鏂囨。涓湁鏃犫�渰pkt_name}鈥濈殑瀛楁鎻忚堪锛�', end='') _msg = f""" -杈撳嚭鏂囨。涓弿杩扮殑鈥渰pkt_name}鈥濇帰娴嬪寘銆� -鎺㈡祴鍖呭瓧娈靛寘鎷細鍚嶇О(name)銆佷唬鍙�(id)銆佸寘澶村睘鎬у垪琛�(headers)銆佹暟鎹煙鍙傛暟鍒楄〃(datas)锛� -鍖呭ご灞炴�у瓧娈靛寘鎷細浣嶇疆(pos)銆佸悕绉�(name)銆佷唬鍙�(id)銆佸畾涔�(val)锛� -鏁版嵁鍩熷弬鏁板瓧娈靛寘鎷細浣嶇疆(pos)銆佸悕绉�(name)銆佷唬鍙�(id)銆佸瓧鑺傞『搴�(byteOrder)锛� -濡傛灉娌℃湁浠e彿鐢ㄥ悕绉扮殑鑻辨枃缈昏瘧浠f浛锛屽鏋滄病鏈夊悕绉扮敤浠e彿浠f浛锛� -杈撳嚭鍐呭浠呰緭鍑簀son锛屼笉瑕佽緭鍑轰换浣曞叾浠栧唴瀹癸紒 - """ - print(f'鎺㈡祴婧愬寘鈥渰pkt_name}鈥濅俊鎭細') - file = next(filter(lambda it: it.filename == 'XA-5D鏃犱汉鏈哄垎绯荤粺鎺㈡祴婧愬寘璁捐鎶ュ憡锛堝叕寮�锛�.md', self.file_objects), - None) - messages = [{'role': 'system', 'content': assistant_msg}, {'role': 'system', 'content': 'fileid://' + file.id}] - text = self._gen(messages, _msg) - messages.append({'role': 'assistant', 'content': text}) - text = self.remove_markdown(text) - pkt = json.loads(text) +鏂囨。涓湁閬ユ祴鍖呪�渰pkt_name}鈥濈殑瀛楁琛ㄦ弿杩板悧锛熼仴娴嬪寘鍚嶇О蹇呴』瀹屽叏鍖归厤銆傝緭鍑猴細鈥滄棤鈥濇垨鈥滄湁鈥濓紝涓嶈杈撳嚭鍏朵粬浠讳綍鍐呭銆� +娉ㄦ剰锛氶仴娴嬪寘鐨勫瓧娈佃〃绱ф帴鐫�閬ユ祴鍖呯珷鑺傛爣棰橈紝濡傛灉绔犺妭鏍囬鍚庨潰鐪佺暐浜嗘垨鑰呰瑙亁xx鍒欐槸娌℃湁瀛楁琛ㄦ弿杩般�� +鏍规嵁鏂囨。鍐呭杈撳嚭銆�""" + text = self.generate_text(_msg, f'out/pkts/鏈夋棤鏁版嵁鍖�-{pkt_name}.txt', [], files) + return text == '鏈�' + + def gen_pkt_details(self, pkt_name, pkt_id): + cache_file = f'out/鏁版嵁鍖�-{pkt_name}.json' + files = [file_map['閬ユ祴婧愬寘璁捐鎶ュ憡']] + if not os.path.isfile(cache_file): + _msg = f""" +杈撳嚭鏂囨。涓弿杩扮殑鍚嶇О涓衡�渰pkt_name}鈥濅唬鍙蜂负鈥渰pkt_id}鈥濋仴娴嬪寘锛� +閬ユ祴鍖呭瓧娈靛寘鎷細鍚嶇О(name)銆佷唬鍙�(id)銆佺被鍨�(type)銆佸寘澶村睘鎬у垪琛�(headers)銆佹暟鎹煙鍙傛暟鍒楄〃(datas)锛岀被鍨嬩负 linear锛� +鍖呭ご灞炴�у瓧娈靛寘鎷細鍚嶇О(name)銆佷唬鍙�(id)銆佷綅缃�(pos)銆佸畾涔�(content)銆侀暱搴�(length)銆佺被鍨�(type)锛岀被鍨嬩负 para锛� +鏁版嵁鍩熷弬鏁板瓧娈靛寘鎷細鍙傛暟鍚嶇О(name)銆佸弬鏁颁唬鍙�(id)銆佷綅缃�(pos)銆侀暱搴�(length)銆佸瓧鑺傞『搴�(byteOrder)锛岀被鍨嬩负 para锛� +濡傛灉娌℃湁鍚嶇О鐢ㄤ唬鍙蜂唬鏇匡紝濡傛灉娌℃湁浠e彿鐢ㄥ悕绉扮殑鑻辨枃缈昏瘧浠f浛锛岀炕璇戝敖閲忕畝鐭紱 +浣犻渶瑕佺悊瑙f暟鎹寘鐨勪綅缃俊鎭紝骞朵笖灏嗘墍鏈夎緭鍑哄崟浣嶇粺涓�杞崲涓� bits锛屼綅缃瓧娈电殑杈撳嚭鏍煎紡蹇呴』涓烘暟鍊肩被鍨�; +鏁版嵁缁撴瀯浠呭彧鍖呭惈閬ユ祴鍖咃紝浠呰緭鍑簀son锛屼笉瑕佽緭鍑轰换浣曞叾浠栧唴瀹广��""" + print(f'閬ユ祴婧愬寘鈥渰pkt_name}鈥濅俊鎭細') + + def validation(gen_text): + _pkt = json.loads(gen_text) + assert 'headers' in _pkt, '鍖呯粨鏋勪腑蹇呴』鍖呭惈headers瀛楁' + assert 'datas' in _pkt, '鍖呯粨鏋勪腑蹇呴』鍖呭惈datas瀛楁' + + text = self.generate_text(_msg, cache_file, [], files, validation) + pkt = json.loads(text) + else: + pkt = json.loads(read_from_file(cache_file)) + pkt_len = 0 + for par in pkt['datas']: + par['pos'] = pkt_len + pkt_len += par['length'] + pkt['length'] = pkt_len return pkt def gen_pkts(self): _msg = f""" -杈撳嚭鏂囨。涓弿杩扮殑鎺㈡祴鍖呫�� -鎺㈡祴鍖呭瓧娈靛寘鎷細鍚嶇О(name)銆佷唬鍙�(id)锛� +杈撳嚭鏂囨。涓弿杩扮殑閬ユ祴鍖呫�� +閬ユ祴鍖呭瓧娈靛寘鎷細鍚嶇О(name)銆佷唬鍙�(id)銆乭asParams锛� +鍚嶇О涓笉瑕佸寘鍚唬鍙凤紝 +hasParams琛ㄧず褰撳墠閬ユ祴鍖呮槸鍚︽湁鍙傛暟鍒楄〃锛岄仴娴嬪寘鐨勫弬鏁拌〃绱ф帴鐫�閬ユ祴鍖呯珷鑺傛爣棰橈紝濡傛灉绔犺妭鏍囬鍚庨潰鐪佺暐浜嗘垨鑰呰瑙亁xx鍒欐槸娌℃湁鍙傛暟琛紝 濡傛灉娌℃湁浠e彿鐢ㄥ悕绉扮殑鑻辨枃缈昏瘧浠f浛锛屽鏋滄病鏈夊悕绉扮敤浠e彿浠f浛锛� -椤剁骇缁撴瀯鐩存帴浠庢帰娴嬪寘寮�濮嬶紝涓嶅寘鎷帰娴嬪寘涓嬮潰鐨勫弬鏁般�� - """ - print(f'鎺㈡祴婧愬寘鍒楄〃锛�') - file = next( - filter(lambda it: it.filename == 'XA-5D鏃犱汉鏈哄垎绯荤粺鎺㈡祴婧愬寘璁捐鎶ュ憡锛堝叕寮�锛�.md', self.file_objects), - None) - messages = [{'role': 'system', 'content': assistant_msg}, - {'role': 'system', 'content': 'fileid://' + file.id}] - text = self._gen(messages, _msg) - messages.append({'role': 'assistant', 'content': text}) - text = self.remove_markdown(text) +鏁版嵁缁撴瀯鏈�澶栧眰涓烘暟缁勬暟缁勫厓绱犱负閬ユ祴鍖咃紝涓嶅寘鎷仴娴嬪寘涓嬮潰鐨勫弬鏁般�� +""" + print(f'閬ユ祴婧愬寘鍒楄〃锛�') + files = [file_map['閬ユ祴婧愬寘璁捐鎶ュ憡']] + text = self.generate_text(_msg, 'out/婧愬寘鍒楄〃.json', [], files) pkt = json.loads(text) return pkt - def gen_pkt_vc(self, messages): + def gen_pkt_vc(self): _msg = f""" -鏍规嵁鎺㈡祴婧愬寘涓嬩紶鏃舵満瀹氫箟锛岃緭鍑哄悇涓帰娴嬫簮鍖呬俊鎭垪琛紝椤剁骇缁撴瀯涓烘暟缁勫厓绱犱负鎺㈡祴婧愬寘锛屾簮鍖呭瓧娈靛寘鎷細鍖呬唬鍙�(id)锛屽悕绉�(name)锛屾墍灞炶櫄鎷熶俊閬�(vcs)锛屼笅浼犳椂鏈猴紙timeTags锛� +鏍规嵁閬ユ祴婧愬寘涓嬩紶鏃舵満瀹氫箟锛岃緭鍑哄悇涓仴娴嬫簮鍖呬俊鎭垪琛紝椤剁骇缁撴瀯涓烘暟缁勫厓绱犱负閬ユ祴婧愬寘锛屾簮鍖呭瓧娈靛寘鎷細鍖呬唬鍙�(id)锛屽悕绉�(name)锛屾墍灞炶櫄鎷熶俊閬�(vcs)锛屼笅浼犳椂鏈猴紙timeTags锛� """ - print('鎺㈡祴婧愬寘鎵�灞炶櫄鎷熶俊閬擄細') - text = self._gen(messages, _msg) - messages.append({'role': 'assistant', 'content': text}) - text = self.remove_markdown(text) + files = [file_map['閬ユ祴澶х翰']] + print('閬ユ祴婧愬寘鎵�灞炶櫄鎷熶俊閬擄細') + + def validation(gen_text): + pkts = json.loads(gen_text) + assert len(pkts), 'VC婧愬寘鍒楄〃涓嶈兘涓虹┖' + + text = self.generate_text(_msg, 'out/閬ユ祴VC婧愬寘.json', files=files, validation=validation) pkt_vcs = json.loads(text) return pkt_vcs - def remove_markdown(self, text): - # 鍘绘帀寮�澶寸殑```json - text = re.sub(r'^```json', '', text) - # 鍘绘帀缁撳熬鐨刞``json - text = re.sub(r'```$', '', text) - return text + def gen_pkt_format(self): + _msg = f""" +璇蜂粩缁嗗垎绯绘枃妗o紝杈撳嚭鍚勪釜鏁版嵁鍖呯殑鏍煎紡锛屾暟鎹粨鏋勬渶澶栧眰涓烘暟缁勶紝鏁扮粍鍏冪礌涓烘暟鎹寘鏍煎紡锛屽皢涓诲澶寸殑瀛愮骇鎻愬崌鍒颁富瀵煎ご杩欎竴绾у苟涓斿幓闄や富瀵煎ご锛屾暟鎹寘type涓簂ogic锛屽寘鏁版嵁鍩焧ype涓篴ny銆� +鍖呮牸寮廲hildren鍖呮嫭锛氱増鏈彿(id:Ver)銆佺被鍨�(id:TM_Type)銆佸壇瀵煎ご鏍囧織(id:Vice_Head)銆佸簲鐢ㄨ繃绋嬫爣璇嗙(id:Proc_Sign)銆佸垎缁勬爣蹇�(id:Group_Sign)銆佸寘搴忓垪璁℃暟(id:Package_Count)銆佸寘闀�(id:Pack_Len)銆佹暟鎹煙(id:EPDU_DATA)銆� +children鍏冪礌鐨勫瓧娈靛寘鎷細name銆乮d銆乸os銆乴ength銆乼ype +娉ㄦ剰锛氱敓鎴愮殑JSON璇硶鏍煎紡瑕佸悎娉曘�� +""" + print('閬ユ祴鍖呮牸寮忥細') + text = self.generate_text(_msg, 'out/鏁版嵁鍖呮牸寮�.json', files=[file_map['閬ユ祴澶х翰']]) + pkt_formats = json.loads(text) + return pkt_formats + + def compute_length_pos(self, items: list): + length = 0 + pos = 0 + for child in items: + if 'children' in child: + self.compute_length_pos(child['children']) + child['pos'] = pos + if 'length' in child and isinstance(child['length'], int): + length = length + child['length'] + pos = pos + child['length'] + # node['length'] = length + + def gen_bus(self, proj_pk, rule_enc, rule_id, ds, name_path, dev_name): + _msg = f""" +璇锋瀽鏂囨。锛屽垪鍑烘�荤嚎閫氫俊鍖呬紶杈撶害瀹氫腑鎻忚堪鐨勬墍鏈夋暟鎹寘鍒楄〃锛� +鏁版嵁鍖呭瓧娈靛寘鎷細id銆乶ame銆乤pid(16杩涘埗瀛楃涓�)銆乻ervice(鏈嶅姟瀛愭湇鍔�)銆乴ength(bit闀垮害)銆乮nterval(浼犺緭鍛ㄦ湡)銆乻ubAddr(瀛愬湴鍧�/妯″紡)銆乫rameNum(閫氫俊甯у彿)銆� +transSer(浼犺緭鏈嶅姟)銆乶ote(澶囨敞)銆乺tAddr(鎵�灞濺T鐨勫湴鍧�鍗佽繘鍒�)銆乺t(鎵�灞瀝t鍚嶇О)銆乼hroughBus(鏄惁缁忚繃鎬荤嚎)銆乥urst(鏄惁绐佸彂)銆乼ransDirect(浼犺緭鏂瑰悜)锛� +鏁版嵁缁撴瀯鏈�澶栧眰鏄暟缁勶紝鏁扮粍鍏冪礌涓烘暟鎹寘锛屼互JSON鏍煎紡杈撳嚭锛屼笉瑕佽緭鍑篔SON浠ュ鐨勪换浣曟枃鏈�� +閫氫俊甯у彿锛氫娇鐢ㄦ枃妗d腑鐨勬枃鏈笉瑕佸仛浠讳綍杞崲銆� +subAddr锛氬�间负鈥滄繁搴︹�濄�佲�滃钩閾衡�濄�佲�滄暟瀛椻�濇垨null銆� +鏄惁缁忚繃鎬荤嚎鐨勫垽鏂緷鎹細鈥滃娉ㄢ�濆垪濉啓浜嗗唴瀹圭被浼尖�滀笉缁忚繃鎬荤嚎鈥濈殑鏂囧瓧琛ㄧず涓嶇粡杩囨�荤嚎鍚﹀垯缁忚繃鎬荤嚎銆� +浼犺緭鏈嶅姟鍒嗕笁绉嶏細SetData(缃暟)銆丟etData(鍙栨暟)銆丏ataBlock(鏁版嵁鍧椾紶杈�)銆� +浼犺緭鏂瑰悜鍒嗭細鈥濇敹鈥滃拰鈥濆彂鈥滐紝浼犺緭鏈嶅姟濡傛灉鏄�濆彇鏁扳�滄槸鈥濇敹鈥滐紝濡傛灉鏄�濇暟鎹潡浼犺緭鈥滃垯鏍规嵁鍖呮墍鍦ㄧ殑鍒嗙郴缁熶互鍙婅〃鏍肩殑鈥濅紶杈撴柟鍚戔�滃垪杩涜鍒ゆ柇锛屽垽鏂浜嶴MU鏉ヨ鏄敹杩樻槸鍙戙�� +鏄惁绐佸彂鐨勫垽鏂緷鎹細鏍规嵁琛ㄦ牸涓殑鈥濅紶杈撳懆鏈熲�滃垪杩涜鍒ゆ柇锛屽鏋滃~鍐欎簡绫讳技鈥濈獊鍙戔�滅殑鏂囧瓧琛ㄧず鏄獊鍙戝惁鍒欒〃绀轰笉鏄獊鍙戙�� +""" + print('鎬荤嚎鏁版嵁鍖咃細') + + def validation(gen_text): + json.loads(gen_text) + + text = self.generate_text(_msg, 'out/鎬荤嚎.json', files=[file_map['鎬荤嚎浼犺緭閫氫俊甯у垎閰�']], validation=validation) + pkts = json.loads(text) + # 绛涢�夌粡鎬荤嚎鐨勬暟鎹寘 + pkts = list(filter(lambda it: it['throughBus'], pkts)) + no_apid_pkts = list(filter(lambda it: not it['apid'], pkts)) + # 绛涢�夋湁apid鐨勬暟鎹寘 + pkts = list(filter(lambda it: it['apid'], pkts)) + + pkts2 = [] + for pkt in pkts: + if self.pkt_in_tm_pkts(pkt["name"]): + pkts2.append(pkt) + for pkt in pkts2: + _pkt = self.gen_pkt_details(pkt['name'], pkt['id']) + if _pkt: + pkt['children'] = [] + pkt['children'].extend(_pkt['datas']) + pkt['length'] = _pkt['length'] + rt_pkt_map = {} + for pkt in pkts: + # 鏍规嵁鏁版嵁鍧椾紶杈撳拰鍙栨暟鍒嗙粍 + # 閫昏緫灏佽鍖呯殑瑙f瀽瑙勫垯ID锛歊T[rt鍦板潃]SUB[瀛愬湴鍧�]S(S浠h〃鍙栨暟锛屾柟鍚戞槸AA琛ㄧず鍙戦�侊紱R浠h〃缃暟锛屾柟鍚戞槸BB琛ㄧず鎺ュ彈) + # 鍙栨暟锛氶�昏緫灏佽鍖呮牴鎹瓙鍦板潃鍜屽抚鍙风粍鍚堝垱寤猴紝鏈夊嚑涓粍鍚堝氨鍒涘缓鍑犱釜閫昏緫灏佽鍖� + # 鏁版嵁鍧楋細鍙湁涓�涓�昏緫灏佽鍖� + + # 澶勭悊瀛愬湴鍧� + if pkt['burst']: + # 绐佸彂鍖呭瓙鍦板潃鏄�18~26 + pkt['subAddr'] = 26 + elif pkt['subAddr'] == '骞抽摵' or pkt['subAddr'] is None: + # 骞抽摵锛�11~26锛屾病鏈夊~鍐欑殑榛樿涓哄钩閾� + pkt['subAddr'] = 26 + elif pkt['subAddr'] == '娣卞害': + # 娣卞害锛�11 + pkt['subAddr'] = 11 + + # 澶勭悊甯у彿 + if pkt['burst']: + # 绐佸彂锛欰LL + pkt['frameNum'] = 'ALL' + elif not pkt['frameNum']: + # 鏈� + pkt['frameNum'] = '' + + # todo: 澶勭悊浼犺緭鏂瑰悜 + + rt_addr = pkt['rtAddr'] + sub_addr = pkt['subAddr'] + trans_ser = pkt['transSer'] + + frame_no = pkt['frameNum'].replace('|', ',') + + if trans_ser == 'GetData': + # 鍙栨暟 + pkt_id = f"RT{rt_addr}SUB{sub_addr}" + vals = f"{rt_addr}/{sub_addr}/0xAA/{frame_no}/" + rt_pkt_map_gen(pkt, '鍙栨暟', rt_pkt_map, pkt_id, vals) + elif trans_ser == 'DataBlock': + # 鏁版嵁鍧� + direct = '0xAA' + rt_pkt_map_gen(pkt, '鏁版嵁鍧椾紶杈�', rt_pkt_map, f"RT{rt_addr}SUB{sub_addr}{direct}", + f"{rt_addr}/{sub_addr}/{direct}/ALL/") + _pkts = [] + for k in rt_pkt_map: + _pkts.append(rt_pkt_map[k]) + + bus_items = data_templates.get_bus_datas(_pkts) + seq = 1 + sub_key_nodes = list(filter(lambda it: 'is_key' in it, bus_items)) + has_key = any(sub_key_nodes) + rule_pk = rule_enc.C_ENC_PK + sub_key = '' + key_items = [] + self.compute_length_pos(bus_items) + for item in bus_items: + if item['type'] == 'enc': + if has_key: + _prop_enc = create_any_pkt(proj_pk, rule_pk, item, seq, name_path, ds, 'ENC', sub_key_nodes, + key_items) + else: + _prop_enc, rule_stream, _ = create_enc_pkt(proj_pk, rule_pk, item, rule_enc.C_ENC_PK, seq, + name_path, ds, '001', 'ENC') + else: + # 鍙傛暟 + _prop_enc = create_prop_enc(proj_pk, rule_pk, item, get_data_ty(item), seq) + if item.__contains__('is_key'): + sub_key += _prop_enc.C_ENCITEM_PK + '/' + key_items.append( + {"pk": _prop_enc.C_ENCITEM_PK, + 'id': _prop_enc.C_SEGMENT_ID, + 'name': _prop_enc.C_NAME, + 'val': ''}) + seq += 1 + if sub_key: + rule_enc.C_KEY = sub_key + update_rule_enc(rule_enc) + + def gen_tc(self): + # 鏁版嵁甯ф牸寮� + frame = self.gen_tc_transfer_frame() + # 鏁版嵁鍖呮牸寮� + pkt_format = self.gen_tc_transfer_pkt() + # 鏁版嵁鍖呭垪琛� + pkts = self.gen_tc_transfer_pkts() + for pkt in pkts: + pf = json.loads(json.dumps(pkt_format)) + pf['name'] = pkt['name'] + ph = next(filter(lambda x: x['name'] == '涓诲澶�', pf['children']), None) + apid = next(filter(lambda x: x['name'] == '搴旂敤杩涚▼鏍囪瘑绗�(APID)', ph['children']), None) + apid['value'] = pkt['apid'] + apid['type'] = 'const' + sh = next(filter(lambda x: x['name'] == '鍓澶�', pf['children']), None) + ser = next(filter(lambda x: x['name'] == '鏈嶅姟绫诲瀷', sh['children']), None) + sub_ser = next(filter(lambda x: x['name'] == '鏈嶅姟瀛愮被鍨�', sh['children']), None) + ser['value'] = pkt['server'] + ser['type'] = 'const' + sub_ser['value'] = pkt['subServer'] + sub_ser['type'] = 'const' + frame['subPkts'].append(pf) + self.order = 0 + + def build_def(item: dict): + if item['type'] == 'enum': + return json.dumps({"EnumItems": item['enums'], "CanInput": True}) + elif item['type'] == 'length': + return None + elif item['type'] == 'checkSum': + return json.dumps({"ChecksumType": "CRC-CCITT"}) + elif item['type'] == 'subPkt': + return json.dumps({"CanInput": False}) + elif item['type'] == 'combPkt': + return None + elif 'value' in item: + return item['value'] + + def create_tc_format(parent_pk, field): + field['order'] = self.order + self.order += 1 + field['def'] = build_def(field) + if 'length' in field: + field['bitWidth'] = field['length'] + field['bitOrder'] = None + field['attr'] = 0 + if field['type'] == 'length': + val = field['value'] + field['range'] = val['start'] + "~" + val['end'] + field['formula'] = val['formula'] + ins_format = create_ins_format(self.proj.C_PROJECT_PK, parent_pk, field) + if 'children' in field: + autocode = 1 + if field['type'] == 'pkt': + ins_format = create_ins_format(self.proj.C_PROJECT_PK, ins_format.C_INS_FORMAT_PK, + {'order': self.order, 'type': 'subPkt', + 'def': json.dumps({"CanInput": False})}) + self.order += 1 + for child in field['children']: + child['autocode'] = autocode + autocode += 1 + create_tc_format(ins_format.C_INS_FORMAT_PK, child) + # if 'subPkts' in field: + # for pkt in field['subPkts']: + # ins_format = create_ins_format(self.proj.C_PROJECT_PK, ins_format.C_INS_FORMAT_PK, + # {'order': self.order, 'type': 'subPkt', + # 'def': json.dumps({"CanInput": False})}) + # create_tc_format(ins_format.C_INS_FORMAT_PK, pkt) + + create_tc_format(None, frame) + + def gen_tc_transfer_frame(self): + _msg = ''' +鍒嗘瀽YK浼犻�佸抚鏍煎紡锛屾彁鍙朰K浼犻�佸抚鐨勬暟鎹粨鏋勶紝涓嶅寘鎷暟鎹寘鐨勬暟鎹粨鏋勩�� +## 缁忛獙锛� +瀛楁绫诲瀷鍖呮嫭锛� +1.缁勫悎鍖咃細combPkt锛� +2.鍥哄畾鐮佸瓧锛歝onst锛� +3.闀垮害锛歭ength锛� +4.鏋氫妇鍊硷細enum锛� +5.鏍¢獙鍜岋細checkSum锛� +6.鏁版嵁鍖猴細subPkt銆� + +鏍规嵁瀛楁鎻忚堪鍒嗘瀽瀛楁鐨勭被鍨嬶紝鍒嗘瀽鏂规硶锛� +1.瀛楁鎻忚堪涓槑纭寚瀹氫簡瀛楁鍊肩殑锛岀被鍨嬩负const锛� +2.瀛楁涓病鏈夋槑纭寚瀹氬瓧娈靛�硷紝浣嗘槸缃楀垪浜嗗彇鍊艰寖鍥寸殑锛岀被鍨嬩负enum锛� +3.瀛楁鎻忚堪涓鏋滃瓨鍦ㄥ灞傜骇鎻忚堪鍒欑埗绾у瓧娈电殑绫诲瀷涓篶ombPkt锛� +4.瀛楁濡傛灉鏄拰鈥滈暱搴︹�濇湁鍏筹紝绫诲瀷涓簂ength锛� +5.濡傛灉鍜屾暟鎹煙鏈夊叧锛岀被鍨嬩负subPkt锛� +6.瀛楁濡傛灉鍜屾牎楠屽拰鏈夊叧锛岀被鍨嬩负checkSum銆� + +瀛楁鍊兼彁鍙栨柟娉曪細 +1.瀛楁鎻忚堪涓槑纭寚瀹氫簡瀛楁鍊硷紝 +2.闀垮害瀛楁鐨勫�艰鏍规嵁鎻忚堪纭畾璧锋瀛楁鑼冨洿浠ュ強璁$畻鍏紡锛寁alue鏍煎紡渚嬪锛歿"start":"<code>","end":"<code>","formula":"N-1"}锛屾敞鎰忥細start鍜宔nd鐨勫�间负瀛楁code銆� + +## 闄愬埗锛� +- length 鑷姩杞崲涓篵it闀垮害銆� +- value 鏍规嵁瀛楁鎻忚堪鎻愬彇銆� +- enums 鏈変簺瀛楁鏄灇涓惧�硷紝鏍规嵁瀛楁鎻忚堪鎻愬彇锛屾灇涓惧厓绱犵殑鏁版嵁缁撴瀯涓簕"n":"","v":"","c":""}銆� +- 杈撳嚭鍐呭蹇呴』涓轰弗鏍肩殑json锛屼笉鑳借緭鍑洪櫎json浠ュ鐨勪换浣曞唴瀹广�� + +瀛楁鏁版嵁缁撴瀯锛� +涓诲澶� + 鐗堟湰鍙枫�侀�氳繃鏍囧織銆佹帶鍒跺懡浠ゆ爣蹇椼�佺┖闂蹭綅銆丠TQ鏍囪瘑銆佽櫄鎷熶俊閬撴爣璇嗐�佸抚闀裤�佸抚搴忓垪鍙� +浼犻�佸抚鏁版嵁鍩� +甯у樊閿欐帶鍒跺煙銆� + +# 杈撳嚭鍐呭渚嬪瓙锛� +{ + "name": "YK甯�", + "type": "pkt" + "children":[ + { + "name": "涓诲澶�", + "code": "primaryHeader", + "length": 2, + "value": "00", + "type": "combPkt", + "children": [ + { + "name": "鐗堟湰鍙�", + "code": "verNum" + "length": 1, + "value": "00" + } + ] + } + ], + "subPkts":[] +} +''' + + def validation(gen_text): + json.loads(gen_text) + + text = self.generate_tc_text(_msg, 'out/tc_transfer_frame.json', files=[file_map['鎸囦护鏍煎紡']], + validation=validation) + frame = json.loads(text) + return frame + + def gen_tc_transfer_pkt(self): + _msg = ''' +浠呭垎鏋怸K鍖呮牸寮忥紝鎻愬彇YK鍖呮暟鎹粨鏋勩�� +## 缁忛獙锛� + +瀛楁绫诲瀷鍖呮嫭锛� +1.缁勫悎鍖咃細combPkt锛� +2.鍥哄畾鐮佸瓧锛歝onst锛� +3.闀垮害锛歭ength锛� +4.鏋氫妇鍊硷細enum锛� +5.鏍¢獙鍜岋細checkSum锛� +6.鏁版嵁鍖猴細subPkt銆� + +鏍规嵁瀛楁鎻忚堪鍒嗘瀽瀛楁鐨勭被鍨嬶紝鍒嗘瀽鏂规硶锛� +1.瀛楁鎻忚堪涓槑纭寚瀹氫簡瀛楁鍊肩殑锛岀被鍨嬩负const锛� +2.瀛楁涓病鏈夋槑纭寚瀹氬瓧娈靛�硷紝浣嗘槸缃楀垪浜嗗彇鍊艰寖鍥寸殑锛岀被鍨嬩负enum锛� +3.瀛楁鎻忚堪涓鏋滃瓨鍦ㄥ灞傜骇鎻忚堪鍒欑埗绾у瓧娈电殑绫诲瀷涓篶ombPkt锛� +4.瀛楁濡傛灉鏄拰鈥滈暱搴︹�濇湁鍏筹紝绫诲瀷涓簂ength锛� +5.濡傛灉鍜屾暟鎹煙鏈夊叧锛岀被鍨嬩负subPkt锛� +6.瀛楁濡傛灉鍜屾牎楠屽拰鏈夊叧锛岀被鍨嬩负checkSum銆� + +瀛楁鍊兼彁鍙栨柟娉曪細 +1.瀛楁鎻忚堪涓槑纭寚瀹氫簡瀛楁鍊硷紝 +2.闀垮害瀛楁鐨勫�艰鏍规嵁鎻忚堪纭畾璧锋瀛楁鑼冨洿浠ュ強璁$畻鍏紡锛寁alue鏍煎紡渚嬪锛歿"start":"<code>","end":"<code>","formula":"N-1"}锛屾敞鎰忥細start鍜宔nd鐨勫�间负瀛楁code銆� + +## 闄愬埗锛� +- length 鑷姩杞崲涓篵it闀垮害銆� +- value 鏍规嵁瀛楁鎻忚堪鎻愬彇銆� +- enums 鏈変簺瀛楁鏄灇涓惧�硷紝鏍规嵁瀛楁鎻忚堪鎻愬彇锛屾灇涓惧厓绱犵殑鏁版嵁缁撴瀯涓簕"n":"","v":"","c":""}銆� +- 杈撳嚭鍐呭蹇呴』涓轰弗鏍肩殑json锛屼笉鑳借緭鍑洪櫎json浠ュ鐨勪换浣曞唴瀹广�� + +瀛楁鏁版嵁缁撴瀯锛� +涓诲澶� + 鍖呰瘑鍒� + 鍖呯増鏈彿銆佸寘绫诲瀷銆佹暟鎹尯澶存爣蹇椼�佸簲鐢ㄨ繘绋嬫爣璇嗙(APID) + 鍖呭簭鍒楁帶鍒� + 搴忓垪鏍囧織 + 鍖呭簭鍒楄鏁� + 鍖呴暱 +鍓澶� + CCSDS鍓澶存爣蹇� + YK鍖呯増鏈彿 + 鍛戒护姝g‘搴旂瓟锛圓ck锛� + 鏈嶅姟绫诲瀷 + 鏈嶅姟瀛愮被鍨� + 婧愬湴鍧� +搴旂敤鏁版嵁鍖� +甯у樊閿欐帶鍒跺煙銆� + +# 杈撳嚭鍐呭渚嬪瓙锛� +{ + "name": "YK鍖�", + "type": "pkt" + "children":[ + { + "name": "涓诲澶�", + "code": "primaryHeader", + "length": 2, + "value": "00", + "type": "combPkt", + "children": [ + { + "name": "鐗堟湰鍙�", + "code": "verNum" + "length": 1, + "value": "00" + } + ] + } + ], + "subPkts":[] +} +''' + + def validation(gen_text): + json.loads(gen_text) + + text = self.generate_tc_text(_msg, 'out/tc_transfer_pkt.json', files=[file_map['鎸囦护鏍煎紡']], + validation=validation) + pkt_format = json.loads(text) + return pkt_format + + def gen_tc_transfer_pkts(self): + _msg = ''' +鍒嗘瀽鏂囨。鍒楀嚭鎵�鏈夌殑閬ユ帶婧愬寘銆� +## 鏁版嵁缁撴瀯濡備笅锛� +[{ +"name": "xxx", +"code":"pkt", +"apid":"0xAA", +"server":"0x1", +"subServer":"0x2" +}] +''' + + def validation(gen_text): + json.loads(gen_text) + + text = self.generate_tc_text(_msg, 'out/tc_transfer_pkts.json', files=[file_map['鎸囦护鏍煎紡']], + validation=validation) + pkts = json.loads(text) + return pkts if __name__ == '__main__': - md_file = 'D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\鏂囨。鍚堝苟.md' - md_file2 = 'D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\XA-5D鏃犱汉鏈哄垎绯荤粺鎺㈡祴婧愬寘璁捐鎶ュ憡锛堝叕寮�锛�.md' - # 鍚姩澶фā鍨嬪鐞嗘祦绋� - ret_text = DbStructFlow([md_file, md_file2]).run() + try: + os.makedirs("./out/pkts", exist_ok=True) + # 鍚姩澶фā鍨嬪鐞嗘祦绋� + ret_text = DbStructFlow().run() + except KeyboardInterrupt: + if g_completion: + g_completion.close() diff --git a/db/__init__.py b/knowledgebase/__init__.py similarity index 100% copy from db/__init__.py copy to knowledgebase/__init__.py diff --git a/db/__init__.py b/knowledgebase/db/__init__.py similarity index 100% rename from db/__init__.py rename to knowledgebase/db/__init__.py diff --git a/knowledgebase/db/data_creator.py b/knowledgebase/db/data_creator.py new file mode 100644 index 0000000..9158e8f --- /dev/null +++ b/knowledgebase/db/data_creator.py @@ -0,0 +1,327 @@ +import math + +from knowledgebase.db.db_helper import create_property_enc, \ + create_rule, create_rule_stream, create_rule_enc, create_enc_linear, create_rule_linear, create_property_linear, \ + update_rule_enc, create_extend_info, create_rulekey_info +from knowledgebase.utils import get_bit_mask + +enc_ty_flag_map = { + "DS": "0", + "ENC": "1", + "LOGICENC": "2", + "ANY": "3", + "LINEAR": "4", +} + + +def get_byte_len_str(node: dict): + length = node['length'] + if node['type'] != 'linear': + return length + if isinstance(length, int): + length = f'{math.ceil(length / 8)}' + + # if 'children' in node and len(node['children']): + # last = node['children'][-1:].pop() + # if isinstance(last['length'], int): + # length = last['pos'] + last['length'] + # length = f'{math.ceil(length / 8)}' + return length + + +def check_gen_content(func, check_fun, try_cnt=6): + try: + ret = func() + check_fun(ret) + return ret + except BaseException as e: + if try_cnt <= 0: + print('鐢熸垚澶辫触锛�') + raise e + print(f'鐢熸垚鍐呭鏈夎閲嶆柊鐢熸垚锛岀{6 - try_cnt}娆°��') + return check_gen_content(func, check_fun, try_cnt - 1) + + +def get_data_ty(node: dict): + data_ty = 'INVAR' + if 'dataTy' in node: + data_ty = node['dataTy'] + return data_ty + + +def create_prop_enc(proj_pk, enc_pk, node, ty, seq): + bit_length = node['length'] + if isinstance(bit_length, int): + pos = node['pos'] + byte_length = math.ceil((pos % 8 + bit_length) / 8) + + start = node['pos'] % 8 + end = start + bit_length - 1 + + if start == 0 and bit_length % 8 == 0: + mask = 'ALL' + else: + mask = hex(get_bit_mask(start, end)) + else: + mask = 'ALL' + byte_length = bit_length + para_id = f'{node["id"]}' + offset = f'{node["pos"] // 8}' + content = None + if 'content' in node: + content = node['content'] + cond = None + if 'condition' in node: + cond = node['condition'] + prop_enc = create_property_enc(proj_pk, enc_pk, node['id'], node['name'], ty, content, f'{offset}', + f'{byte_length}', '1', mask, cond, seq, '', para_id) + return prop_enc + + +def create_prop_linear(proj_pk, linear_pk, node, seq): + bit_length = node['length'] + if isinstance(bit_length, int): + byte_length = math.ceil(bit_length / 8) + start = node['pos'] % 8 + end = start + bit_length - 1 + + mask = hex(get_bit_mask(start, end)) + else: + mask = 'ALL' + byte_length = bit_length + para_id = f'{node["id"]}' + offset = f'{node["pos"] // 8}' + return create_property_linear(proj_pk, linear_pk, para_id, node['name'], 'INVAR', '0', f'{offset}', + f'{byte_length}', None, mask, None, None, None, None, None, seq) + + +def create_key_liner_pkt(proj_pk, rule_pk, node, parent_rule_pk, seq, name_path, ds, content, + actual_parent_pk=None): + # 鍒涘缓绾挎�у寘锛岀埗绾у寘鍚瓙鍖呬富閿瓧娈电殑鎯呭喌 + # 鍒涘缓瑙f瀽瑙勫垯 + rule_name = node['rule_name'] if 'rule_name' in node else node['name'] + rule = create_rule(proj_pk, rule_pk, node['id'], rule_name, get_byte_len_str(node), parent_rule_pk, + enc_ty_flag_map['LINEAR'], actual_parent_pk) + rule_linear = create_rule_linear(proj_pk, rule_pk, node['id'], node['name'], get_byte_len_str(node), content) + # 鍒涘缓t_rule_stream + rule_stream = create_rule_stream(proj_pk, + rule.C_RULE_PK, + ds.C_STREAM_PK, + ds.C_STREAM_ID, + ds.C_NAME, + ds.C_STREAM_DIR, + f"{name_path}{node['name']}/") + if 'children' in node: + seq = 1 + for child in node['children']: + # 鍒涘缓绾挎�у寘鍙傛暟 + create_prop_linear(proj_pk, rule_linear.C_LINEAR_PK, child, seq) + seq = seq + 1 + return rule + + +def create_liner_pkt(proj_pk, linear_pk, node, parent_rule_pk, seq, name_path, ds, content): + # 鍒涘缓绾挎�у寘 + prop_enc = create_prop_enc(proj_pk, linear_pk, node, 'LINEAR', seq) + # 鍒涘缓 enc_linear + enc_linear = create_enc_linear(proj_pk, prop_enc.C_ENCITEM_PK, '002') + rule_pk = enc_linear.C_LINEAR_PK + # 鍒涘缓瑙f瀽瑙勫垯 + length = get_byte_len_str(node) + rule_name = node['rule_name'] if 'rule_name' in node else node['name'] + rule = create_rule(proj_pk, rule_pk, node['id'], rule_name, length, parent_rule_pk, + enc_ty_flag_map['LINEAR']) + rule_linear = create_rule_linear(proj_pk, rule_pk, node['id'], node['name'], length, content) + # 鍒涘缓t_rule_stream + rule_stream = create_rule_stream(proj_pk, + rule.C_RULE_PK, + ds.C_STREAM_PK, + ds.C_STREAM_ID, + ds.C_NAME, + ds.C_STREAM_DIR, + f"{name_path}{node['name']}/") + if 'children' in node: + seq = 1 + for child in node['children']: + # 鍒涘缓绾挎�у寘鍙傛暟 + create_prop_linear(proj_pk, rule_linear.C_LINEAR_PK, child, seq) + seq = seq + 1 + + +def create_any_pkt(proj_pk, linear_pk, node, seq, name_path, ds, pkt_ty, sub_key_nodes, key_items=None): + # any娌℃湁t_rule銆乼_rule_enc銆乼_rule_linear + prop_enc = create_prop_enc(proj_pk, linear_pk, node, pkt_ty, seq) + rule_name = node['rule_name'] if 'rule_name' in node else node['name'] + length = get_byte_len_str(node) + rule = create_rule(proj_pk, prop_enc.C_ENCITEM_PK, node['id'], rule_name, length, prop_enc.C_ENC_PK, + enc_ty_flag_map['ANY'], None) + if 'children' in node: + child_seq = 1 + for child in node['children']: + vals = None + if 'vals' in child: + values = [] + vals = child['vals'] + if vals.endswith("/"): + vals = vals[:-1] + values.extend(vals.split("/")) + for i in range(0, len(key_items)): + key_items[i]['val'] = values[i] + node_name = '銆�' + for i in range(0, len(values)): + sub_key_node = sub_key_nodes[i] + val = values[i] + node_name += f'{sub_key_node["name"]}={val}' + node_name += '銆�' + child['rule_name'] = child['name'] + node_name + + if child['type'] == 'enc': # 灏佽鍖� + enc_linear = create_enc_linear(proj_pk, prop_enc.C_ENCITEM_PK, '001', vals) + _, __, _rule = create_enc_pkt(proj_pk, enc_linear.C_LINEAR_PK, child, enc_linear.C_ENCITEM_PK, + child_seq, name_path, ds, '001', 'ENC', parent_has_key=True, + actual_parent_pk=prop_enc.C_ENC_PK) + if key_items: + for it in key_items: + create_rulekey_info(proj_pk, _rule.C_RULE_PK, _rule.C_RULE_ID, _rule.C_RULE_NAME, it['pk'], + it['id'], it['name'], it['val']) + elif child['type'] == 'linear': # 绾挎�у寘 + # 鏌ヨ宸叉湁鐨剅ule + _rule = None # find_rule_by_rule_id(child['id']) + enc_linear = create_enc_linear(proj_pk, prop_enc.C_ENCITEM_PK, '002', vals, + _rule.C_RULE_PK if _rule else None) + if not _rule: + _rule = create_key_liner_pkt(proj_pk, enc_linear.C_LINEAR_PK, child, enc_linear.C_ENCITEM_PK, + child_seq, name_path, ds, None, prop_enc.C_ENC_PK) + if key_items: + for it in key_items: + create_rulekey_info(proj_pk, _rule.C_RULE_PK, _rule.C_RULE_ID, _rule.C_RULE_NAME, + it['pk'], it['id'], it['name'], it['val']) + else: + # 鍒涘缓瑙f瀽瑙勫垯 + rule_name = node['rule_name'] if 'rule_name' in node else node['name'] + _rule = create_rule(proj_pk, _rule.C_RULE_PK, child['id'], rule_name, child['length'], + prop_enc.C_ENCITEM_PK, enc_ty_flag_map['LINEAR'], prop_enc.C_ENC_PK) + # rule_linear = create_rule_linear(proj_pk, enc_linear.C_LINEAR_PK, node['id'], node['name'], + # node['length'], None) + # 鍒涘缓t_rule_stream + rule_stream = create_rule_stream(proj_pk, + _rule.C_RULE_PK, + ds.C_STREAM_PK, + ds.C_STREAM_ID, + ds.C_NAME, + ds.C_STREAM_DIR, + f"{name_path}{child['name']}/") + elif child['type'] == 'logic': # 閫昏緫灏佽鍖� + enc_linear = create_enc_linear(proj_pk, prop_enc.C_ENCITEM_PK, '005', vals) + _, __, _rule = create_enc_pkt(proj_pk, enc_linear.C_LINEAR_PK, child, enc_linear.C_ENCITEM_PK, + child_seq, + name_path, ds, '005', 'ENC', True, prop_enc.C_ENC_PK) + if key_items: + for it in key_items: + create_rulekey_info(proj_pk, _rule.C_RULE_PK, _rule.C_RULE_ID, _rule.C_RULE_NAME, + it['pk'], it['id'], it['name'], it['val']) + child_seq += 1 + return prop_enc + + +def create_enc_pkt(proj_pk, linear_pk, node, parent_rule_pk, seq, name_path, ds, ty, pkt_ty, + parent_has_key=False, actual_parent_pk=None): + """ + 鍒涘缓灏佽鍖� + :param enc_pk: + :param proj_pk: + :param node: + :param parent_rule_pk: + :param seq: + :param name_path: + :param ds: + :param ty: + :param is_logic_enc: + :return: + """ + prop_enc = None + key_items = [] + length = get_byte_len_str(node) + # 鏌ヨ宸叉湁鐨剅ule + if not parent_has_key: + # 鍒涘缓灏佽鍖� + prop_enc = create_prop_enc(proj_pk, linear_pk, node, pkt_ty, seq) + encitem_pk = prop_enc.C_ENCITEM_PK + vals = None + if 'vals' in node: + vals = node['vals'] + '/' + enc_linear = create_enc_linear(proj_pk, encitem_pk, ty, vals) + rule_pk = enc_linear.C_LINEAR_PK + else: + rule_pk = linear_pk + # 鍒涘缓灏佽鍖呬笅闈㈢殑瑙f瀽瑙勫垯 + + rule_name = node['rule_name'] if 'rule_name' in node else node['name'] + rule = create_rule(proj_pk, rule_pk, node['id'], rule_name, length, parent_rule_pk, + enc_ty_flag_map[pkt_ty], actual_parent_pk) + rule_enc = create_rule_enc(proj_pk, rule.C_RULE_PK, node['id'], node['name'], + node['content'] if 'content' in node else None) + name_path = f"{name_path}{node['name']}/" + rule_stream = create_rule_stream(proj_pk, + rule.C_RULE_PK, + ds.C_STREAM_PK, + ds.C_STREAM_ID, + ds.C_NAME, + ds.C_STREAM_DIR, + name_path) + + if 'extInfo' in node and node['extInfo']: + for info in node['extInfo']: + create_extend_info(proj_pk, info['id'], info['name'], info['val'], rule.C_RULE_PK) + + if 'children' in node: + child_seq = 1 + sub_key_nodes = list(filter(lambda it: it.__contains__('is_key'), node['children'])) + has_key = len(sub_key_nodes) > 0 + sub_key = '' + for child in node['children']: + if child['type'] == 'enc': # 灏佽鍖� + if has_key: + create_any_pkt(proj_pk, rule.C_RULE_PK, child, child_seq, name_path, ds, 'ENC', sub_key_nodes) + else: + create_enc_pkt(proj_pk, rule.C_RULE_PK, child, rule_pk, child_seq, + name_path, ds, '001', 'ENC') + elif child['type'] == 'para': # 鏁版嵁娈靛弬鏁� + _prop_enc = create_prop_enc(proj_pk, rule_enc.C_ENC_PK, child, get_data_ty(child), child_seq) + is_key = False + if 'is_key' in child: + is_key = child['is_key'] + if is_key: + sub_key += _prop_enc.C_ENCITEM_PK + '/' + key_items.append( + {"pk": _prop_enc.C_ENCITEM_PK, + 'id': _prop_enc.C_SEGMENT_ID, + 'name': _prop_enc.C_NAME, + 'val': ''}) + elif child['type'] == 'linear': # 绾挎�у寘 + if has_key: + create_any_pkt(proj_pk, rule.C_RULE_PK, child, child_seq, name_path, ds, 'LINEAR', sub_key_nodes, + key_items) + else: + create_liner_pkt(proj_pk, rule.C_RULE_PK, child, rule_pk, child_seq, + name_path, ds, None) + elif child['type'] == 'logic': # 閫昏緫灏佽鍖� + if has_key: + create_any_pkt(proj_pk, rule.C_RULE_PK, child, child_seq, name_path, ds, 'LOGICENC', sub_key_nodes, + key_items) + else: + create_enc_pkt(proj_pk, rule.C_RULE_PK, child, rule_pk, child_seq, name_path, ds, + '005', 'LOGICENC') + elif child['type'] == 'any': # 浠绘剰鍖� + if has_key: + create_any_pkt(proj_pk, rule.C_RULE_PK, child, child_seq, name_path, ds, 'ANY', sub_key_nodes, + key_items) + else: + create_enc_pkt(proj_pk, rule.C_RULE_PK, child, rule_pk, child_seq, name_path, ds, + '005', 'ANY') + child_seq += 1 + if sub_key: + rule_enc.C_KEY = sub_key + update_rule_enc(rule_enc) + + return prop_enc, rule_stream, rule diff --git a/knowledgebase/db/db_helper.py b/knowledgebase/db/db_helper.py new file mode 100644 index 0000000..cf94ba0 --- /dev/null +++ b/knowledgebase/db/db_helper.py @@ -0,0 +1,408 @@ +import uuid + +from sqlalchemy.orm import sessionmaker, scoped_session + +from knowledgebase.db.models import engine, TProject, TDevice, TDataStream, TDevStream, TRule, TRuleEnc, TPropertyEnc, \ + TPropertyLinear, TRuleStream, TEncLinear, TRuleLinear, TParameter, TParameterType, TExtendInfo, TRulekeyInfo, \ + TInsFormat + +from hashlib import md5 + +# 鍒涘缓涓�涓細璇濆伐鍘� +session_factory = sessionmaker(bind=engine) +# 鍒涘缓涓�涓細璇濆璞� +Session = scoped_session(session_factory) +session = Session() + +_para_id_map = {} + + +def get_pk(): + n = uuid.uuid4().hex + pk = md5(n.encode('utf-8')).hexdigest() + return pk + + +def create_project(sat_id, sat_name, proj_code, proj_name, desc, date_time, ) -> TProject: + """ + 鍒涘缓project + :param sat_id: + :param sat_name: + :param proj_code: + :param proj_name: + :param desc: + :param date_time: + :return: 鍒涘缓瀹屾垚鐨刾roject + """ + project = TProject(C_PROJECT_PK=get_pk(), C_SAT_ID=sat_id, C_SAT_NAME=sat_name, C_PROJECT_CODE=proj_code, + C_DESCRIPTION=desc, C_HASH=uuid.uuid4().int & 0xffffffff, C_PROJECT_NAME=proj_name, + C_DATETIME=date_time, + C_CREATEOR='') + session.add(project) + session.commit() + return project + + +def create_device(device_id, device_name, device_type, dll, project_pk): + """ + 鍒涘缓device + :param device_id: + :param device_name: + :param device_type: + :param dll: + :param project_pk: + :return: + """ + device = TDevice(C_DEV_PK=get_pk(), C_DEV_ID=device_id, C_DEV_NAME=device_name, C_DEV_TYPE=device_type, C_DLL=dll, + C_PROJECT_PK=project_pk) + session.add(device) + session.commit() + return device + + +def create_extend_info(proj_pk, prop_id, prop_name, val, fk): + ext_info = TExtendInfo( + C_PK=get_pk(), + C_PROJECT_PK=proj_pk, + C_PROPERTY_ID=prop_id, + C_PROPERTY_NAME=prop_name, + C_VAL=val, + C_FOREIGN_PK=fk + ) + session.add(ext_info) + session.commit() + + +def create_data_stream(proj_pk, dev_pk, name, code, data_ty, direct, rule_id, rule_ty, rule_pk=None): + """ + 鍒涘缓data_stream + :param proj_pk: + :param dev_pk: + :param name: + :param code: + :param data_ty: + :param direct: + :param rule_id: + :param rule_ty: + :return: + """ + ds = TDataStream(C_STREAM_PK=get_pk(), + C_PROJECT_PK=proj_pk, + C_STREAM_ID=code, + C_DATA_TYPE=data_ty, + C_STREAM_DIR=direct, + C_NAME=name, + C_DESCRIPTION='', + C_RULE_ID=rule_id, + C_RULE_TYPE=rule_ty) + session.add(ds) + link = TDevStream(C_PK=get_pk(), C_DEV_PK=dev_pk, C_STREAM_PK=ds.C_STREAM_PK, C_PROJECT_PK=proj_pk) + session.add(link) + rule_enc = None + # 鍒涘缓瑙f瀽瑙勫垯 + if rule_pk is None: + rule_pk = get_pk() + if rule_ty == '001': + # 灏佽鍖� + rule_enc = create_rule_enc(proj_pk, rule_pk, rule_id, rule_id) + + rule = create_rule(proj_pk, ds.C_STREAM_PK, rule_id, name, None, None, '0') + rule = create_rule(proj_pk, rule_pk, rule_id, rule_id, None, ds.C_STREAM_PK, '1') + # rule stream + rule_stream = create_rule_stream(proj_pk, + rule_pk, + ds.C_STREAM_PK, + ds.C_STREAM_ID, + ds.C_NAME, + ds.C_STREAM_DIR, + f"{ds.C_NAME}/{rule_id}/") + session.add(rule_stream) + session.commit() + return ds, rule_stream, rule_enc + + +def create_rule(proj_pk, rule_pk, rule_id, rule_name, rule_len, parent_pk, flag, actual_parent_pk=None): + rule = TRule( + C_PK=get_pk(), + C_PROJECT_PK=proj_pk, + C_RULE_PK=rule_pk, + C_RULE_ID=rule_id, + C_RULE_NAME=rule_name, + C_RULE_LENGTH=rule_len, + C_PARENT_PK=parent_pk, + C_FLAG=flag, + C_ACTUAL_PARENT_PK=actual_parent_pk + ) + session.add(rule) + session.commit() + return rule + + +def find_rule_by_rule_id(rule_id): + return session.query(TRule).filter(TRule.C_RULE_ID == rule_id).first() + + +def create_rule_stream(proj_pk, rule_pk, stream_pk, stream_id, stream_name, stream_dir, _path): + rule_stream = TRuleStream( + C_PK=get_pk(), + C_PROJECT_PK=proj_pk, + C_RULE_PK=rule_pk, + C_STREAM_PK=stream_pk, + C_STREAM_ID=stream_id, + C_STREAM_NAME=stream_name, + C_STREAM_DIR=stream_dir, + C_PATH=_path + ) + session.add(rule_stream) + session.commit() + return rule_stream + + +def create_ref_ds_rule_stream(proj_pk, stream_pk, stream_id, stream_name, stream_dir, target_stream_pk): + items: list = session.query(TRuleStream).filter(TRuleStream.C_STREAM_PK == target_stream_pk).all() + for it in items: + _path = it.C_PATH + if len(_path.split('/')) == 3: + continue + _path = f'{stream_name}/{stream_id}/'.join(_path.split('/')[2:]) + '/' + create_rule_stream(proj_pk, it.C_RULE_PK, stream_pk, stream_id, stream_name, stream_dir, _path) + + +def create_rule_enc(proj_pk, enc_pk, enc_id, name, content=None): + rule_enc = TRuleEnc( + C_ENC_PK=enc_pk, + C_PROJECT_PK=proj_pk, + C_ENC_ID=enc_id, + C_NAME=name, + C_CONTENT=content, + ) + session.add(rule_enc) + session.commit() + return rule_enc + + +def create_rule_linear(proj_pk, linear_pk, linear_id, name, length, content): + rule_linear = TRuleLinear( + C_LINEAR_PK=linear_pk, + C_PROJECT_PK=proj_pk, + C_LINEAR_ID=linear_id, + C_NAME=name, + C_LENGTH=length, + C_PACKAGE_TYPE=None, + C_REL_LINEAR_PK=None, + C_CONTENT=content + ) + session.add(rule_linear) + session.commit() + return rule_linear + + +def create_property_enc(proj_pk, enc_pk, segment_id, name, ty, content, offset, + length, msb_first, mask, cond, seq, rel_enc_item_pk, para_id): + property_enc = TPropertyEnc( + C_ENCITEM_PK=get_pk(), + C_ENC_PK=enc_pk, + C_SEGMENT_ID=segment_id, + C_NAME=name, + C_TYPE=ty, + C_CONTENT=content, + C_PUBLISH=None, + C_OFFSET=offset, + C_LENGTH=length, + C_MSBFIRST=msb_first, + C_MASK=mask, + C_CONDITION=cond, + C_PROJECT_PK=proj_pk, + C_SEQ=seq, + C_REL_ENCITEM_PK=rel_enc_item_pk, + C_PAR_ID=para_id + ) + session.add(property_enc) + para = TParameter( + C_PAR_PK=get_pk(), + C_PROJECT_PK=proj_pk, + C_PAR_CODE=segment_id, + C_PAR_NAME=name, + C_SUBSYS=None, + C_TYPE='0', + C_UNIT=None, + C_VALUE_RANGE=None, + C_DIS_REQUIRE=None, + C_MODULUS=None, + C_PARAMS=None, + C_PRECISION='0', + C_REG_PK=None, + C_METHOD_PK=None + ) + session.add(para) + if ty == 'ENUM' and content: + items: list = content.split(' ') + for item in items: + idx = items.index(item) + name, val = item.split(',') + pt = TParameterType( + C_PK=get_pk(), + C_TYPE_ID=f'{idx}', + C_TYPE_NAME=name, + C_VALUE=val, + C_DATA_TYPE=None, + C_PAR_PK=para.C_PAR_PK, + C_PROJECT_PK=proj_pk + ) + session.add(pt) + session.commit() + return property_enc + + +def get_para_id(para_id): + for i in range(1, 9999): + _id = f'{i}'.zfill(4) + _para_id = para_id + '_' + _id + if _para_id not in _para_id_map: + _para_id_map[_para_id] = True + return _para_id + + +def create_property_linear(proj_pk, linear_pk, para_id, name, ty, content, offset, + length, msb_first, mask, cond, calc_expr, simuval, reg_par, params, seq): + property_linear = TPropertyLinear( + C_PK=get_pk(), + C_LINEAR_PK=linear_pk, + C_PAR_ID=para_id, + C_TYPE=ty, + C_CONTENT=content, + C_OFFSET=offset, + C_LENGTH=length, + C_MSBFIRST=msb_first, + C_MASK=mask, + C_CONDITION=cond, + C_CALC_EXPR=calc_expr, + C_PAR_PK=get_pk(), + C_SIMUVAL=simuval, + C_REG_PAR=reg_par, + C_PARAMS=params, + C_PROJECT_PK=proj_pk, + C_SEQ=seq, + C_REL_PK=None + ) + session.add(property_linear) + if para_id in _para_id_map: + get_para_id(para_id) + para = TParameter( + C_PAR_PK=property_linear.C_PAR_PK, + C_PROJECT_PK=proj_pk, + C_PAR_CODE=para_id, + C_PAR_NAME=name, + C_SUBSYS=None, + C_TYPE=None, + C_UNIT=None, + C_VALUE_RANGE=None, + C_DIS_REQUIRE=None, + C_MODULUS=None, + C_PARAMS=None, + C_PRECISION='0', + C_REG_PK=None, + C_METHOD_PK=None + ) + session.add(para) + if ty == 'ENUM' and content: + items: list = content.split(' ') + for item in items: + idx = items.index(item) + name, val = item.split(',') + pt = TParameterType( + C_PK=get_pk(), + C_TYPE_ID=f'{idx}', + C_TYPE_NAME=name, + C_VALUE=val, + C_DATA_TYPE=None, + C_PAR_PK=para.C_PAR_PK, + C_PROJECT_PK=proj_pk + ) + session.add(pt) + session.commit() + return property_linear + + +def create_enc_linear(proj_pk, enc_item_pk, ty, vals=None, linear_pk=None): + """ + 鍒涘缓 t_enc_linear + :param proj_pk: 宸ョ▼pk + :param enc_item_pk: + :param ty: 001:灏佽鍖咃紝002:绾挎�у寘 + :param vals: 閫昏緫灏佽鍖呯殑key鍊� + :return: + """ + if linear_pk is None: + linear_pk = get_pk() + enc_linear = TEncLinear( + C_PK=get_pk(), + C_LINEAR_PK=linear_pk, + C_ENCITEM_PK=enc_item_pk, + C_VALS=vals, + C_PROJECT_PK=proj_pk, + C_TYPE=ty, + C_FOLDER_PK=None + ) + session.add(enc_linear) + session.commit() + return enc_linear + + +def update_rule_enc(rule_enc): + # 鏇存柊 + session.query(TRuleEnc).filter(TRuleEnc.C_ENC_PK == rule_enc.C_ENC_PK).update({ + TRuleEnc.C_KEY: rule_enc.C_KEY + }) + session.commit() + + +def create_rulekey_info(proj_pk, rule_pk, rule_id, rule_name, key_pk, key_id, key_name, key_val): + info = TRulekeyInfo( + C_PK=get_pk(), + C_PROJECT_PK=proj_pk, + C_RULE_PK=rule_pk, + C_RULE_ID=rule_id, + C_RULE_NAME=rule_name, + C_KEY_PK=key_pk, + C_KEY_ID=key_id, + C_KEY_NAME=key_name, + C_KEY_VAL=key_val + ) + session.add(info) + session.commit() + + +ins_ty = { + "pkt": 1, + "subPkt": 22, + "combPkt": 12, + "const": 15, + "length": 17, + "enum": 26, + "checkSum": 20, +} + + +def create_ins_format(proj_pk: str, parent_pk: str, info: dict) -> TInsFormat: + ins_format = TInsFormat( + C_INS_FORMAT_PK=get_pk(), + C_PROJECT_PK=proj_pk, + C_PARENT_PK=parent_pk, + C_ORDER=info['order'] if 'order' in info else 0, + C_AUTOCODE=info['autocode'] if 'autocode' in info else None, + C_NAME=info['name'] if 'name' in info else '', + C_CODE=info['code'] if 'code' in info else '', + C_TYPE=ins_ty[info['type']] if 'type' in info else 0, + C_DEF=info['def'] if 'def' in info else None, + C_BIT_WIDTH=info['bitWidth'] if 'bitWidth' in info else 0, + C_BIT_ORDER=info['bitOrder'] if 'bitOrder' in info else 0, + C_ATTR=info['attr'] if 'attr' in info else 0, + C_RANGE=info['range'] if 'range' in info else None, + C_CONDITION='', + C_FORMULA=info['formula'] if 'formula' in info else '', + C_NUMBER='', + ) + session.add(ins_format) + session.commit() + return ins_format diff --git a/db/models.py b/knowledgebase/db/models.py similarity index 99% rename from db/models.py rename to knowledgebase/db/models.py index 624ef9a..9577b28 100644 --- a/db/models.py +++ b/knowledgebase/db/models.py @@ -1,6 +1,7 @@ # coding: utf-8 from sqlalchemy import create_engine, Column, DateTime, Integer, Text from sqlalchemy.ext.declarative import declarative_base +import os Base = declarative_base() metadata = Base.metadata @@ -473,5 +474,7 @@ C_EDIT = Column(Integer) +if os.path.isfile("db.db"): + os.remove("db.db") engine = create_engine('sqlite:///db.db', echo=True) metadata.create_all(engine) diff --git a/knowledgebase/markitdown/__about__.py b/knowledgebase/markitdown/__about__.py new file mode 100644 index 0000000..a365900 --- /dev/null +++ b/knowledgebase/markitdown/__about__.py @@ -0,0 +1,4 @@ +# SPDX-FileCopyrightText: 2024-present Adam Fourney <adamfo@microsoft.com> +# +# SPDX-License-Identifier: MIT +__version__ = "0.0.1a3" diff --git a/knowledgebase/markitdown/__init__.py b/knowledgebase/markitdown/__init__.py new file mode 100644 index 0000000..482f428 --- /dev/null +++ b/knowledgebase/markitdown/__init__.py @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: 2024-present Adam Fourney <adamfo@microsoft.com> +# +# SPDX-License-Identifier: MIT + +from ._markitdown import MarkItDown, FileConversionException, UnsupportedFormatException + +__all__ = [ + "MarkItDown", + "FileConversionException", + "UnsupportedFormatException", +] diff --git a/knowledgebase/markitdown/__main__.py b/knowledgebase/markitdown/__main__.py new file mode 100644 index 0000000..b6cf963 --- /dev/null +++ b/knowledgebase/markitdown/__main__.py @@ -0,0 +1,82 @@ +# SPDX-FileCopyrightText: 2024-present Adam Fourney <adamfo@microsoft.com> +# +# SPDX-License-Identifier: MIT +import argparse +import sys +from textwrap import dedent +from .__about__ import __version__ +from ._markitdown import MarkItDown, DocumentConverterResult + + +def main(): + parser = argparse.ArgumentParser( + description="Convert various file formats to markdown.", + prog="markitdown", + formatter_class=argparse.RawDescriptionHelpFormatter, + usage=dedent( + """ + SYNTAX: + + markitdown <OPTIONAL: FILENAME> + If FILENAME is empty, markitdown reads from stdin. + + EXAMPLE: + + markitdown example.pdf + + OR + + cat example.pdf | markitdown + + OR + + markitdown < example.pdf + + OR to save to a file use + + markitdown example.pdf -o example.md + + OR + + markitdown example.pdf > example.md + """ + ).strip(), + ) + + parser.add_argument( + "-v", + "--version", + action="version", + version=f"%(prog)s {__version__}", + help="show the version number and exit", + ) + + parser.add_argument("filename", nargs="?") + parser.add_argument( + "-o", + "--output", + help="Output file name. If not provided, output is written to stdout.", + ) + args = parser.parse_args() + + if args.filename is None: + markitdown = MarkItDown() + result = markitdown.convert_stream(sys.stdin.buffer) + _handle_output(args, result) + else: + markitdown = MarkItDown() + result = markitdown.convert(args.filename) + _handle_output(args, result) + + +def _handle_output(args, result: DocumentConverterResult): + """Handle output to stdout or file""" + if args.output: + with open(args.output, "w", encoding="utf-8") as f: + f.write(result.text_content) + else: + print(result.text_content) + + +if __name__ == "__main__": + main() diff --git a/knowledgebase/markitdown/_markitdown.py b/knowledgebase/markitdown/_markitdown.py new file mode 100644 index 0000000..8f68ec5 --- /dev/null +++ b/knowledgebase/markitdown/_markitdown.py @@ -0,0 +1,1708 @@ +# type: ignore +import base64 +import binascii +import copy +import html +import json +import mimetypes +import os +import re +import shutil +import subprocess +import sys +import tempfile +import traceback +import zipfile +from xml.dom import minidom +from typing import Any, Dict, List, Optional, Union +from pathlib import Path +from urllib.parse import parse_qs, quote, unquote, urlparse, urlunparse +from warnings import warn, resetwarnings, catch_warnings + +import mammoth +import markdownify +import olefile +import pandas as pd +import pdfminer +import pdfminer.high_level +import pptx + +# File-format detection +import puremagic +import requests +from bs4 import BeautifulSoup +from charset_normalizer import from_path +from bs4 import BeautifulSoup + +# Optional Transcription support +IS_AUDIO_TRANSCRIPTION_CAPABLE = False +try: + # Using warnings' catch_warnings to catch + # pydub's warning of ffmpeg or avconv missing + with catch_warnings(record=True) as w: + import pydub + + if w: + raise ModuleNotFoundError + import speech_recognition as sr + + IS_AUDIO_TRANSCRIPTION_CAPABLE = True +except ModuleNotFoundError: + pass +finally: + resetwarnings() + +# Optional YouTube transcription support +try: + from youtube_transcript_api import YouTubeTranscriptApi + + IS_YOUTUBE_TRANSCRIPT_CAPABLE = True +except ModuleNotFoundError: + pass + + +class _CustomMarkdownify(markdownify.MarkdownConverter): + """ + A custom version of markdownify's MarkdownConverter. Changes include: + + - Altering the default heading style to use '#', '##', etc. + - Removing javascript hyperlinks. + - Truncating images with large data:uri sources. + - Ensuring URIs are properly escaped, and do not conflict with Markdown syntax + """ + + def __init__(self, **options: Any): + options["heading_style"] = options.get("heading_style", markdownify.ATX) + # Explicitly cast options to the expected type if necessary + super().__init__(**options) + + def convert_hn(self, n: int, el: Any, text: str, convert_as_inline: bool) -> str: + """Same as usual, but be sure to start with a new line""" + if not convert_as_inline: + if not re.search(r"^\n", text): + return "\n" + super().convert_hn(n, el, text, convert_as_inline) # type: ignore + + return super().convert_hn(n, el, text, convert_as_inline) # type: ignore + + def convert_a(self, el: Any, text: str, convert_as_inline: bool): + """Same as usual converter, but removes Javascript links and escapes URIs.""" + prefix, suffix, text = markdownify.chomp(text) # type: ignore + if not text: + return "" + href = el.get("href") + title = el.get("title") + + # Escape URIs and skip non-http or file schemes + if href: + try: + parsed_url = urlparse(href) # type: ignore + if parsed_url.scheme and parsed_url.scheme.lower() not in ["http", "https", "file"]: # type: ignore + return "%s%s%s" % (prefix, text, suffix) + href = urlunparse(parsed_url._replace(path=quote(unquote(parsed_url.path)))) # type: ignore + except ValueError: # It's not clear if this ever gets thrown + return "%s%s%s" % (prefix, text, suffix) + + # For the replacement see #29: text nodes underscores are escaped + if ( + self.options["autolinks"] + and text.replace(r"\_", "_") == href + and not title + and not self.options["default_title"] + ): + # Shortcut syntax + return "<%s>" % href + if self.options["default_title"] and not title: + title = href + title_part = ' "%s"' % title.replace('"', r"\"") if title else "" + return ( + "%s[%s](%s%s)%s" % (prefix, text, href, title_part, suffix) + if href + else text + ) + + def convert_img(self, el: Any, text: str, convert_as_inline: bool) -> str: + """Same as usual converter, but removes data URIs""" + + alt = el.attrs.get("alt", None) or "" + src = el.attrs.get("knowledgebase", None) or "" + title = el.attrs.get("title", None) or "" + title_part = ' "%s"' % title.replace('"', r"\"") if title else "" + if ( + convert_as_inline + and el.parent.name not in self.options["keep_inline_images_in"] + ): + return alt + + # Remove dataURIs + if src.startswith("data:"): + src = src.split(",")[0] + "..." + + return "" % (alt, src, title_part) + + def convert_soup(self, soup: Any) -> str: + return super().convert_soup(soup) # type: ignore + + +class DocumentConverterResult: + """The result of converting a document to text.""" + + def __init__(self, title: Union[str, None] = None, text_content: str = ""): + self.title: Union[str, None] = title + self.text_content: str = text_content + + +class DocumentConverter: + """Abstract superclass of all DocumentConverters.""" + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + raise NotImplementedError() + + +class PlainTextConverter(DocumentConverter): + """Anything with content type text/plain""" + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Guess the content type from any file extension that might be around + content_type, _ = mimetypes.guess_type( + "__placeholder" + kwargs.get("file_extension", "") + ) + + # Only accept text files + if content_type is None: + return None + elif all( + not content_type.lower().startswith(type_prefix) + for type_prefix in ["text/", "application/json"] + ): + return None + + text_content = str(from_path(local_path).best()) + return DocumentConverterResult( + title=None, + text_content=text_content, + ) + + +class HtmlConverter(DocumentConverter): + """Anything with content type text/html""" + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Bail if not html + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + + result = None + with open(local_path, "rt", encoding="utf-8") as fh: + result = self._convert(fh.read()) + + return result + + def _convert(self, html_content: str) -> Union[None, DocumentConverterResult]: + """Helper function that converts and HTML string.""" + + # Parse the string + soup = BeautifulSoup(html_content, "html.parser") + + # Remove javascript and style blocks + for script in soup(["script", "style"]): + script.extract() + + # Print only the main content + body_elm = soup.find("body") + webpage_text = "" + if body_elm: + webpage_text = _CustomMarkdownify().convert_soup(body_elm) + else: + webpage_text = _CustomMarkdownify().convert_soup(soup) + + assert isinstance(webpage_text, str) + + return DocumentConverterResult( + title=None if soup.title is None else soup.title.string, + text_content=webpage_text, + ) + + +class RSSConverter(DocumentConverter): + """Convert RSS / Atom type to markdown""" + + def convert( + self, local_path: str, **kwargs + ) -> Union[None, DocumentConverterResult]: + # Bail if not RSS type + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".xml", ".rss", ".atom"]: + return None + try: + doc = minidom.parse(local_path) + except BaseException as _: + return None + result = None + if doc.getElementsByTagName("rss"): + # A RSS feed must have a root element of <rss> + result = self._parse_rss_type(doc) + elif doc.getElementsByTagName("feed"): + root = doc.getElementsByTagName("feed")[0] + if root.getElementsByTagName("entry"): + # An Atom feed must have a root element of <feed> and at least one <entry> + result = self._parse_atom_type(doc) + else: + return None + else: + # not rss or atom + return None + + return result + + def _parse_atom_type( + self, doc: minidom.Document + ) -> Union[None, DocumentConverterResult]: + """Parse the type of an Atom feed. + + Returns None if the feed type is not recognized or something goes wrong. + """ + try: + root = doc.getElementsByTagName("feed")[0] + title = self._get_data_by_tag_name(root, "title") + subtitle = self._get_data_by_tag_name(root, "subtitle") + entries = root.getElementsByTagName("entry") + md_text = f"# {title}\n" + if subtitle: + md_text += f"{subtitle}\n" + for entry in entries: + entry_title = self._get_data_by_tag_name(entry, "title") + entry_summary = self._get_data_by_tag_name(entry, "summary") + entry_updated = self._get_data_by_tag_name(entry, "updated") + entry_content = self._get_data_by_tag_name(entry, "content") + + if entry_title: + md_text += f"\n## {entry_title}\n" + if entry_updated: + md_text += f"Updated on: {entry_updated}\n" + if entry_summary: + md_text += self._parse_content(entry_summary) + if entry_content: + md_text += self._parse_content(entry_content) + + return DocumentConverterResult( + title=title, + text_content=md_text, + ) + except BaseException as _: + return None + + def _parse_rss_type( + self, doc: minidom.Document + ) -> Union[None, DocumentConverterResult]: + """Parse the type of an RSS feed. + + Returns None if the feed type is not recognized or something goes wrong. + """ + try: + root = doc.getElementsByTagName("rss")[0] + channel = root.getElementsByTagName("channel") + if not channel: + return None + channel = channel[0] + channel_title = self._get_data_by_tag_name(channel, "title") + channel_description = self._get_data_by_tag_name(channel, "description") + items = channel.getElementsByTagName("item") + if channel_title: + md_text = f"# {channel_title}\n" + if channel_description: + md_text += f"{channel_description}\n" + if not items: + items = [] + for item in items: + title = self._get_data_by_tag_name(item, "title") + description = self._get_data_by_tag_name(item, "description") + pubDate = self._get_data_by_tag_name(item, "pubDate") + content = self._get_data_by_tag_name(item, "content:encoded") + + if title: + md_text += f"\n## {title}\n" + if pubDate: + md_text += f"Published on: {pubDate}\n" + if description: + md_text += self._parse_content(description) + if content: + md_text += self._parse_content(content) + + return DocumentConverterResult( + title=channel_title, + text_content=md_text, + ) + except BaseException as _: + print(traceback.format_exc()) + return None + + def _parse_content(self, content: str) -> str: + """Parse the content of an RSS feed item""" + try: + # using bs4 because many RSS feeds have HTML-styled content + soup = BeautifulSoup(content, "html.parser") + return _CustomMarkdownify().convert_soup(soup) + except BaseException as _: + return content + + def _get_data_by_tag_name( + self, element: minidom.Element, tag_name: str + ) -> Union[str, None]: + """Get data from first child element with the given tag name. + Returns None when no such element is found. + """ + nodes = element.getElementsByTagName(tag_name) + if not nodes: + return None + fc = nodes[0].firstChild + if fc: + return fc.data + return None + + +class WikipediaConverter(DocumentConverter): + """Handle Wikipedia pages separately, focusing only on the main document content.""" + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Bail if not Wikipedia + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + url = kwargs.get("url", "") + if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url): + return None + + # Parse the file + soup = None + with open(local_path, "rt", encoding="utf-8") as fh: + soup = BeautifulSoup(fh.read(), "html.parser") + + # Remove javascript and style blocks + for script in soup(["script", "style"]): + script.extract() + + # Print only the main content + body_elm = soup.find("div", {"id": "mw-content-text"}) + title_elm = soup.find("span", {"class": "mw-page-title-main"}) + + webpage_text = "" + main_title = None if soup.title is None else soup.title.string + + if body_elm: + # What's the title + if title_elm and len(title_elm) > 0: + main_title = title_elm.string # type: ignore + assert isinstance(main_title, str) + + # Convert the page + webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup( + body_elm + ) + else: + webpage_text = _CustomMarkdownify().convert_soup(soup) + + return DocumentConverterResult( + title=main_title, + text_content=webpage_text, + ) + + +class YouTubeConverter(DocumentConverter): + """Handle YouTube specially, focusing on the video title, description, and transcript.""" + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Bail if not YouTube + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + url = kwargs.get("url", "") + if not url.startswith("https://www.youtube.com/watch?"): + return None + + # Parse the file + soup = None + with open(local_path, "rt", encoding="utf-8") as fh: + soup = BeautifulSoup(fh.read(), "html.parser") + + # Read the meta tags + assert soup.title is not None and soup.title.string is not None + metadata: Dict[str, str] = {"title": soup.title.string} + for meta in soup(["meta"]): + for a in meta.attrs: + if a in ["itemprop", "property", "name"]: + metadata[meta[a]] = meta.get("content", "") + break + + # We can also try to read the full description. This is more prone to breaking, since it reaches into the page implementation + try: + for script in soup(["script"]): + content = script.text + if "ytInitialData" in content: + lines = re.split(r"\r?\n", content) + obj_start = lines[0].find("{") + obj_end = lines[0].rfind("}") + if obj_start >= 0 and obj_end >= 0: + data = json.loads(lines[0][obj_start : obj_end + 1]) + attrdesc = self._findKey(data, "attributedDescriptionBodyText") # type: ignore + if attrdesc: + metadata["description"] = str(attrdesc["content"]) + break + except Exception: + pass + + # Start preparing the page + webpage_text = "# YouTube\n" + + title = self._get(metadata, ["title", "og:title", "name"]) # type: ignore + assert isinstance(title, str) + + if title: + webpage_text += f"\n## {title}\n" + + stats = "" + views = self._get(metadata, ["interactionCount"]) # type: ignore + if views: + stats += f"- **Views:** {views}\n" + + keywords = self._get(metadata, ["keywords"]) # type: ignore + if keywords: + stats += f"- **Keywords:** {keywords}\n" + + runtime = self._get(metadata, ["duration"]) # type: ignore + if runtime: + stats += f"- **Runtime:** {runtime}\n" + + if len(stats) > 0: + webpage_text += f"\n### Video Metadata\n{stats}\n" + + description = self._get(metadata, ["description", "og:description"]) # type: ignore + if description: + webpage_text += f"\n### Description\n{description}\n" + + if IS_YOUTUBE_TRANSCRIPT_CAPABLE: + transcript_text = "" + parsed_url = urlparse(url) # type: ignore + params = parse_qs(parsed_url.query) # type: ignore + if "v" in params: + assert isinstance(params["v"][0], str) + video_id = str(params["v"][0]) + try: + youtube_transcript_languages = kwargs.get( + "youtube_transcript_languages", ("en",) + ) + # Must be a single transcript. + transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=youtube_transcript_languages) # type: ignore + transcript_text = " ".join([part["text"] for part in transcript]) # type: ignore + # Alternative formatting: + # formatter = TextFormatter() + # formatter.format_transcript(transcript) + except Exception: + pass + if transcript_text: + webpage_text += f"\n### Transcript\n{transcript_text}\n" + + title = title if title else soup.title.string + assert isinstance(title, str) + + return DocumentConverterResult( + title=title, + text_content=webpage_text, + ) + + def _get( + self, + metadata: Dict[str, str], + keys: List[str], + default: Union[str, None] = None, + ) -> Union[str, None]: + for k in keys: + if k in metadata: + return metadata[k] + return default + + def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json type + if isinstance(json, list): + for elm in json: + ret = self._findKey(elm, key) + if ret is not None: + return ret + elif isinstance(json, dict): + for k in json: + if k == key: + return json[k] + else: + ret = self._findKey(json[k], key) + if ret is not None: + return ret + return None + + +class IpynbConverter(DocumentConverter): + """Converts Jupyter Notebook (.ipynb) files to Markdown.""" + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Bail if not ipynb + extension = kwargs.get("file_extension", "") + if extension.lower() != ".ipynb": + return None + + # Parse and convert the notebook + result = None + with open(local_path, "rt", encoding="utf-8") as fh: + notebook_content = json.load(fh) + result = self._convert(notebook_content) + + return result + + def _convert(self, notebook_content: dict) -> Union[None, DocumentConverterResult]: + """Helper function that converts notebook JSON content to Markdown.""" + try: + md_output = [] + title = None + + for cell in notebook_content.get("cells", []): + cell_type = cell.get("cell_type", "") + source_lines = cell.get("source", []) + + if cell_type == "markdown": + md_output.append("".join(source_lines)) + + # Extract the first # heading as title if not already found + if title is None: + for line in source_lines: + if line.startswith("# "): + title = line.lstrip("# ").strip() + break + + elif cell_type == "code": + # Code cells are wrapped in Markdown code blocks + md_output.append(f"```python\n{''.join(source_lines)}\n```") + elif cell_type == "raw": + md_output.append(f"```\n{''.join(source_lines)}\n```") + + md_text = "\n\n".join(md_output) + + # Check for title in notebook metadata + title = notebook_content.get("metadata", {}).get("title", title) + + return DocumentConverterResult( + title=title, + text_content=md_text, + ) + + except Exception as e: + raise FileConversionException( + f"Error converting .ipynb file: {str(e)}" + ) from e + + +class BingSerpConverter(DocumentConverter): + """ + Handle Bing results pages (only the organic search results). + NOTE: It is better to use the Bing API + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a Bing SERP + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + url = kwargs.get("url", "") + if not re.search(r"^https://www\.bing\.com/search\?q=", url): + return None + + # Parse the query parameters + parsed_params = parse_qs(urlparse(url).query) + query = parsed_params.get("q", [""])[0] + + # Parse the file + soup = None + with open(local_path, "rt", encoding="utf-8") as fh: + soup = BeautifulSoup(fh.read(), "html.parser") + + # Clean up some formatting + for tptt in soup.find_all(class_="tptt"): + if hasattr(tptt, "string") and tptt.string: + tptt.string += " " + for slug in soup.find_all(class_="algoSlug_icon"): + slug.extract() + + # Parse the algorithmic results + _markdownify = _CustomMarkdownify() + results = list() + for result in soup.find_all(class_="b_algo"): + # Rewrite redirect urls + for a in result.find_all("a", href=True): + parsed_href = urlparse(a["href"]) + qs = parse_qs(parsed_href.query) + + # The destination is contained in the u parameter, + # but appears to be base64 encoded, with some prefix + if "u" in qs: + u = ( + qs["u"][0][2:].strip() + "==" + ) # Python 3 doesn't care about extra padding + + try: + # RFC 4648 / Base64URL" variant, which uses "-" and "_" + a["href"] = base64.b64decode(u, altchars="-_").decode("utf-8") + except UnicodeDecodeError: + pass + except binascii.Error: + pass + + # Convert to markdown + md_result = _markdownify.convert_soup(result).strip() + lines = [line.strip() for line in re.split(r"\n+", md_result)] + results.append("\n".join([line for line in lines if len(line) > 0])) + + webpage_text = ( + f"## A Bing search for '{query}' found the following results:\n\n" + + "\n\n".join(results) + ) + + return DocumentConverterResult( + title=None if soup.title is None else soup.title.string, + text_content=webpage_text, + ) + + +class PdfConverter(DocumentConverter): + """ + Converts PDFs to Markdown. Most style information is ignored, so the results are essentially plain-text. + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a PDF + extension = kwargs.get("file_extension", "") + if extension.lower() != ".pdf": + return None + + return DocumentConverterResult( + title=None, + text_content=pdfminer.high_level.extract_text(local_path), + ) + + +class DocxConverter(HtmlConverter): + """ + Converts DOCX files to Markdown. Style information (e.g.m headings) and tables are preserved where possible. + """ + + def table_unmerge(self,html): + # 瑙f瀽HTML + soup = BeautifulSoup(html, "html.parser") + + # 鑾峰彇鎵�鏈夎〃鏍� + tables = soup.find_all("table") + + # 閬嶅巻姣忎釜琛ㄦ牸 + for table in tables: + # 鑾峰彇琛ㄦ牸鐨勮鏁板拰鍒楁暟 + rows = table.find_all("tr") + row_count = len(rows) + col_count = max([len(row.find_all(["td", "th"])) for row in rows]) + + # 鍒涘缓涓�涓簩缁存暟缁勬潵瀛樺偍琛ㄦ牸鐨勬暟鎹� + data = [] + for i in range(row_count): + data.append([]) + + # 閬嶅巻姣忎釜鍗曞厓鏍� + for i, row in enumerate(rows): + cells = row.find_all(["td", "th"]) + for j, cell in enumerate(cells): + # 鑾峰彇涔嬪墠鐨勬墍鏈夊悎骞跺崟鍏冩牸鏁伴噺 + # 鑾峰彇鍗曞厓鏍肩殑琛屽垪鏁� + rowspan = int(cell.get("rowspan", 1)) + colspan = int(cell.get("colspan", 1)) + + data[i].append([cell.get_text().strip(), rowspan, colspan]) + + # 姘村钩鍚堝苟 + for i in range(len(data)): + row = data[i] + for j in range(len(row) - 1, -1, -1): + col = row[j] + v, rs, cs = col + col[2] = 1 + for k in range(1, cs): + row.insert(j + k, [v, rs, 1]) + # 鍨傜洿鍚堝苟 + for i in range(len(data)): + row = data[i] + for j in range(len(row)): + col = row[j] + v, rs, cs = col + col[1] = 1 + for k in range(1, rs): + data[i + k].insert(j, [v, 1, cs]) + + # 灏哾ata杞负value浜岀淮鏁扮粍 + result = [] + for i in range(len(data)): + row = data[i] + result.append([]) + for j in range(len(row)): + col = row[j] + v, rs, cs = col + result[i].append(v) + + # 灏嗚〃鏍肩殑鏁版嵁杞崲涓篋ataFrame + df = pd.DataFrame(result) + + # 灏咲ataFrame杞崲涓篐TML琛ㄦ牸 + html_table = df.to_html(index=False, header=False) + + # 灏咹TML琛ㄦ牸鏇挎崲鍘熸潵鐨勮〃鏍� + table.replace_with(BeautifulSoup(html_table, "html.parser")) + return str(soup) + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a DOCX + extension = kwargs.get("file_extension", "") + if extension.lower() != ".docx": + return None + + result = None + with open(local_path, "rb") as docx_file: + style_map = kwargs.get("style_map", None) + + result = mammoth.convert_to_html(docx_file, style_map=style_map) + html_content = self.table_unmerge(result.value) + result = self._convert(html_content) + + return result + + +class XlsxConverter(HtmlConverter): + """ + Converts XLSX files to Markdown, with each sheet presented as a separate Markdown table. + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a XLSX + extension = kwargs.get("file_extension", "") + if extension.lower() != ".xlsx": + return None + + sheets = pd.read_excel(local_path, sheet_name=None, engine="openpyxl") + md_content = "" + for s in sheets: + md_content += f"## {s}\n" + html_content = sheets[s].to_html(index=False) + md_content += self._convert(html_content).text_content.strip() + "\n\n" + + return DocumentConverterResult( + title=None, + text_content=md_content.strip(), + ) + + +class XlsConverter(HtmlConverter): + """ + Converts XLS files to Markdown, with each sheet presented as a separate Markdown table. + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a XLS + extension = kwargs.get("file_extension", "") + if extension.lower() != ".xls": + return None + + sheets = pd.read_excel(local_path, sheet_name=None, engine="xlrd") + md_content = "" + for s in sheets: + md_content += f"## {s}\n" + html_content = sheets[s].to_html(index=False) + md_content += self._convert(html_content).text_content.strip() + "\n\n" + + return DocumentConverterResult( + title=None, + text_content=md_content.strip(), + ) + + +class PptxConverter(HtmlConverter): + """ + Converts PPTX files to Markdown. Supports heading, tables and images with alt text. + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a PPTX + extension = kwargs.get("file_extension", "") + if extension.lower() != ".pptx": + return None + + md_content = "" + + presentation = pptx.Presentation(local_path) + slide_num = 0 + for slide in presentation.slides: + slide_num += 1 + + md_content += f"\n\n<!-- Slide number: {slide_num} -->\n" + + title = slide.shapes.title + for shape in slide.shapes: + # Pictures + if self._is_picture(shape): + # https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069 + alt_text = "" + try: + alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "") + except Exception: + pass + + # A placeholder name + filename = re.sub(r"\W", "", shape.name) + ".jpg" + md_content += ( + "\n\n" + ) + + # Tables + if self._is_table(shape): + html_table = "<html><body><table>" + first_row = True + for row in shape.table.rows: + html_table += "<tr>" + for cell in row.cells: + if first_row: + html_table += "<th>" + html.escape(cell.text) + "</th>" + else: + html_table += "<td>" + html.escape(cell.text) + "</td>" + html_table += "</tr>" + first_row = False + html_table += "</table></body></html>" + md_content += ( + "\n" + self._convert(html_table).text_content.strip() + "\n" + ) + + # Charts + if shape.has_chart: + md_content += self._convert_chart_to_markdown(shape.chart) + + # Text areas + elif shape.has_text_frame: + if shape == title: + md_content += "# " + shape.text.lstrip() + "\n" + else: + md_content += shape.text + "\n" + + md_content = md_content.strip() + + if slide.has_notes_slide: + md_content += "\n\n### Notes:\n" + notes_frame = slide.notes_slide.notes_text_frame + if notes_frame is not None: + md_content += notes_frame.text + md_content = md_content.strip() + + return DocumentConverterResult( + title=None, + text_content=md_content.strip(), + ) + + def _is_picture(self, shape): + if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PICTURE: + return True + if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PLACEHOLDER: + if hasattr(shape, "image"): + return True + return False + + def _is_table(self, shape): + if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.TABLE: + return True + return False + + def _convert_chart_to_markdown(self, chart): + md = "\n\n### Chart" + if chart.has_title: + md += f": {chart.chart_title.text_frame.text}" + md += "\n\n" + data = [] + category_names = [c.label for c in chart.plots[0].categories] + series_names = [s.name for s in chart.series] + data.append(["Category"] + series_names) + + for idx, category in enumerate(category_names): + row = [category] + for series in chart.series: + row.append(series.values[idx]) + data.append(row) + + markdown_table = [] + for row in data: + markdown_table.append("| " + " | ".join(map(str, row)) + " |") + header = markdown_table[0] + separator = "|" + "|".join(["---"] * len(data[0])) + "|" + return md + "\n".join([header, separator] + markdown_table[1:]) + + +class MediaConverter(DocumentConverter): + """ + Abstract class for multi-modal media (e.g., images and audio) + """ + + def _get_metadata(self, local_path): + exiftool = shutil.which("exiftool") + if not exiftool: + return None + else: + try: + result = subprocess.run( + [exiftool, "-json", local_path], capture_output=True, text=True + ).stdout + return json.loads(result)[0] + except Exception: + return None + + +class WavConverter(MediaConverter): + """ + Converts WAV files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` is installed). + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a WAV + extension = kwargs.get("file_extension", "") + if extension.lower() != ".wav": + return None + + md_content = "" + + # Add metadata + metadata = self._get_metadata(local_path) + if metadata: + for f in [ + "Title", + "Artist", + "Author", + "Band", + "Album", + "Genre", + "Track", + "DateTimeOriginal", + "CreateDate", + "Duration", + ]: + if f in metadata: + md_content += f"{f}: {metadata[f]}\n" + + # Transcribe + if IS_AUDIO_TRANSCRIPTION_CAPABLE: + try: + transcript = self._transcribe_audio(local_path) + md_content += "\n\n### Audio Transcript:\n" + ( + "[No speech detected]" if transcript == "" else transcript + ) + except Exception: + md_content += ( + "\n\n### Audio Transcript:\nError. Could not transcribe this audio." + ) + + return DocumentConverterResult( + title=None, + text_content=md_content.strip(), + ) + + def _transcribe_audio(self, local_path) -> str: + recognizer = sr.Recognizer() + with sr.AudioFile(local_path) as source: + audio = recognizer.record(source) + return recognizer.recognize_google(audio).strip() + + +class Mp3Converter(WavConverter): + """ + Converts MP3 files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` AND `pydub` are installed). + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a MP3 + extension = kwargs.get("file_extension", "") + if extension.lower() != ".mp3": + return None + + md_content = "" + + # Add metadata + metadata = self._get_metadata(local_path) + if metadata: + for f in [ + "Title", + "Artist", + "Author", + "Band", + "Album", + "Genre", + "Track", + "DateTimeOriginal", + "CreateDate", + "Duration", + ]: + if f in metadata: + md_content += f"{f}: {metadata[f]}\n" + + # Transcribe + if IS_AUDIO_TRANSCRIPTION_CAPABLE: + handle, temp_path = tempfile.mkstemp(suffix=".wav") + os.close(handle) + try: + sound = pydub.AudioSegment.from_mp3(local_path) + sound.export(temp_path, format="wav") + + _args = dict() + _args.update(kwargs) + _args["file_extension"] = ".wav" + + try: + transcript = super()._transcribe_audio(temp_path).strip() + md_content += "\n\n### Audio Transcript:\n" + ( + "[No speech detected]" if transcript == "" else transcript + ) + except Exception: + md_content += "\n\n### Audio Transcript:\nError. Could not transcribe this audio." + + finally: + os.unlink(temp_path) + + # Return the result + return DocumentConverterResult( + title=None, + text_content=md_content.strip(), + ) + + +class ImageConverter(MediaConverter): + """ + Converts images to markdown via extraction of metadata (if `exiftool` is installed), OCR (if `easyocr` is installed), and description via a multimodal LLM (if an llm_client is configured). + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not an image + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".jpg", ".jpeg", ".png"]: + return None + + md_content = "" + + # Add metadata + metadata = self._get_metadata(local_path) + if metadata: + for f in [ + "ImageSize", + "Title", + "Caption", + "Description", + "Keywords", + "Artist", + "Author", + "DateTimeOriginal", + "CreateDate", + "GPSPosition", + ]: + if f in metadata: + md_content += f"{f}: {metadata[f]}\n" + + # Try describing the image with GPTV + llm_client = kwargs.get("llm_client") + llm_model = kwargs.get("llm_model") + if llm_client is not None and llm_model is not None: + md_content += ( + "\n# Description:\n" + + self._get_llm_description( + local_path, + extension, + llm_client, + llm_model, + prompt=kwargs.get("llm_prompt"), + ).strip() + + "\n" + ) + + return DocumentConverterResult( + title=None, + text_content=md_content, + ) + + def _get_llm_description(self, local_path, extension, client, model, prompt=None): + if prompt is None or prompt.strip() == "": + prompt = "Write a detailed caption for this image." + + data_uri = "" + with open(local_path, "rb") as image_file: + content_type, encoding = mimetypes.guess_type("_dummy" + extension) + if content_type is None: + content_type = "image/jpeg" + image_base64 = base64.b64encode(image_file.read()).decode("utf-8") + data_uri = f"data:{content_type};base64,{image_base64}" + + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": { + "url": data_uri, + }, + }, + ], + } + ] + + response = client.chat.completions.create(model=model, messages=messages) + return response.choices[0].message.content + + +class OutlookMsgConverter(DocumentConverter): + """Converts Outlook .msg files to markdown by extracting email metadata and content. + + Uses the olefile package to parse the .msg file structure and extract: + - Email headers (From, To, Subject) + - Email body content + """ + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Bail if not a MSG file + extension = kwargs.get("file_extension", "") + if extension.lower() != ".msg": + return None + + try: + msg = olefile.OleFileIO(local_path) + # Extract email metadata + md_content = "# Email Message\n\n" + + # Get headers + headers = { + "From": self._get_stream_data(msg, "__substg1.0_0C1F001F"), + "To": self._get_stream_data(msg, "__substg1.0_0E04001F"), + "Subject": self._get_stream_data(msg, "__substg1.0_0037001F"), + } + + # Add headers to markdown + for key, value in headers.items(): + if value: + md_content += f"**{key}:** {value}\n" + + md_content += "\n## Content\n\n" + + # Get email body + body = self._get_stream_data(msg, "__substg1.0_1000001F") + if body: + md_content += body + + msg.close() + + return DocumentConverterResult( + title=headers.get("Subject"), text_content=md_content.strip() + ) + + except Exception as e: + raise FileConversionException( + f"Could not convert MSG file '{local_path}': {str(e)}" + ) + + def _get_stream_data( + self, msg: olefile.OleFileIO, stream_path: str + ) -> Union[str, None]: + """Helper to safely extract and decode stream data from the MSG file.""" + try: + if msg.exists(stream_path): + data = msg.openstream(stream_path).read() + # Try UTF-16 first (common for .msg files) + try: + return data.decode("utf-16-le").strip() + except UnicodeDecodeError: + # Fall back to UTF-8 + try: + return data.decode("utf-8").strip() + except UnicodeDecodeError: + # Last resort - ignore errors + return data.decode("utf-8", errors="ignore").strip() + except Exception: + pass + return None + + +class ZipConverter(DocumentConverter): + """Converts ZIP files to markdown by extracting and converting all contained files. + + The converter extracts the ZIP contents to a temporary directory, processes each file + using appropriate converters based on file extensions, and then combines the results + into a single markdown document. The temporary directory is cleaned up after processing. + + Example output format: + ```markdown + Content from the zip file `example.zip`: + + ## File: docs/readme.txt + + This is the content of readme.txt + Multiple lines are preserved + + ## File: images/example.jpg + + ImageSize: 1920x1080 + DateTimeOriginal: 2024-02-15 14:30:00 + Description: A beautiful landscape photo + + ## File: data/report.xlsx + + ## Sheet1 + | Column1 | Column2 | Column3 | + |---------|---------|---------| + | data1 | data2 | data3 | + | data4 | data5 | data6 | + ``` + + Key features: + - Maintains original file structure in headings + - Processes nested files recursively + - Uses appropriate converters for each file type + - Preserves formatting of converted content + - Cleans up temporary files after processing + """ + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Bail if not a ZIP + extension = kwargs.get("file_extension", "") + if extension.lower() != ".zip": + return None + + # Get parent converters list if available + parent_converters = kwargs.get("_parent_converters", []) + if not parent_converters: + return DocumentConverterResult( + title=None, + text_content=f"[ERROR] No converters available to process zip contents from: {local_path}", + ) + + extracted_zip_folder_name = ( + f"extracted_{os.path.basename(local_path).replace('.zip', '_zip')}" + ) + extraction_dir = os.path.normpath( + os.path.join(os.path.dirname(local_path), extracted_zip_folder_name) + ) + md_content = f"Content from the zip file `{os.path.basename(local_path)}`:\n\n" + + try: + # Extract the zip file safely + with zipfile.ZipFile(local_path, "r") as zipObj: + # Safeguard against path traversal + for member in zipObj.namelist(): + member_path = os.path.normpath(os.path.join(extraction_dir, member)) + if ( + not os.path.commonprefix([extraction_dir, member_path]) + == extraction_dir + ): + raise ValueError( + f"Path traversal detected in zip file: {member}" + ) + + # Extract all files safely + zipObj.extractall(path=extraction_dir) + + # Process each extracted file + for root, dirs, files in os.walk(extraction_dir): + for name in files: + file_path = os.path.join(root, name) + relative_path = os.path.relpath(file_path, extraction_dir) + + # Get file extension + _, file_extension = os.path.splitext(name) + + # Update kwargs for the file + file_kwargs = kwargs.copy() + file_kwargs["file_extension"] = file_extension + file_kwargs["_parent_converters"] = parent_converters + + # Try converting the file using available converters + for converter in parent_converters: + # Skip the zip converter to avoid infinite recursion + if isinstance(converter, ZipConverter): + continue + + result = converter.convert(file_path, **file_kwargs) + if result is not None: + md_content += f"\n## File: {relative_path}\n\n" + md_content += result.text_content + "\n\n" + break + + # Clean up extracted files if specified + if kwargs.get("cleanup_extracted", True): + shutil.rmtree(extraction_dir) + + return DocumentConverterResult(title=None, text_content=md_content.strip()) + + except zipfile.BadZipFile: + return DocumentConverterResult( + title=None, + text_content=f"[ERROR] Invalid or corrupted zip file: {local_path}", + ) + except ValueError as ve: + return DocumentConverterResult( + title=None, + text_content=f"[ERROR] Security error in zip file {local_path}: {str(ve)}", + ) + except Exception as e: + return DocumentConverterResult( + title=None, + text_content=f"[ERROR] Failed to process zip file {local_path}: {str(e)}", + ) + + +class FileConversionException(BaseException): + pass + + +class UnsupportedFormatException(BaseException): + pass + + +class MarkItDown: + """(In preview) An extremely simple text-based document reader, suitable for LLM use. + This reader will convert common file-types or webpages to Markdown.""" + + def __init__( + self, + requests_session: Optional[requests.Session] = None, + llm_client: Optional[Any] = None, + llm_model: Optional[str] = None, + style_map: Optional[str] = None, + # Deprecated + mlm_client: Optional[Any] = None, + mlm_model: Optional[str] = None, + ): + if requests_session is None: + self._requests_session = requests.Session() + else: + self._requests_session = requests_session + + # Handle deprecation notices + ############################# + if mlm_client is not None: + if llm_client is None: + warn( + "'mlm_client' is deprecated, and was renamed 'llm_client'.", + DeprecationWarning, + ) + llm_client = mlm_client + mlm_client = None + else: + raise ValueError( + "'mlm_client' is deprecated, and was renamed 'llm_client'. Do not use both at the same time. Just use 'llm_client' instead." + ) + + if mlm_model is not None: + if llm_model is None: + warn( + "'mlm_model' is deprecated, and was renamed 'llm_model'.", + DeprecationWarning, + ) + llm_model = mlm_model + mlm_model = None + else: + raise ValueError( + "'mlm_model' is deprecated, and was renamed 'llm_model'. Do not use both at the same time. Just use 'llm_model' instead." + ) + ############################# + + self._llm_client = llm_client + self._llm_model = llm_model + self._style_map = style_map + + self._page_converters: List[DocumentConverter] = [] + + # Register converters for successful browsing operations + # Later registrations are tried first / take higher priority than earlier registrations + # To this end, the most specific converters should appear below the most generic converters + self.register_page_converter(PlainTextConverter()) + self.register_page_converter(HtmlConverter()) + self.register_page_converter(RSSConverter()) + self.register_page_converter(WikipediaConverter()) + self.register_page_converter(YouTubeConverter()) + self.register_page_converter(BingSerpConverter()) + self.register_page_converter(DocxConverter()) + self.register_page_converter(XlsxConverter()) + self.register_page_converter(XlsConverter()) + self.register_page_converter(PptxConverter()) + self.register_page_converter(WavConverter()) + self.register_page_converter(Mp3Converter()) + self.register_page_converter(ImageConverter()) + self.register_page_converter(IpynbConverter()) + self.register_page_converter(PdfConverter()) + self.register_page_converter(ZipConverter()) + self.register_page_converter(OutlookMsgConverter()) + + def convert( + self, source: Union[str, requests.Response, Path], **kwargs: Any + ) -> DocumentConverterResult: # TODO: deal with kwargs + """ + Args: + - source: can be a string representing a path either as string pathlib path object or url, or a requests.response object + - extension: specifies the file extension to use when interpreting the file. If None, infer from source (path, uri, content-type, etc.) + """ + + # Local path or url + if isinstance(source, str): + if ( + source.startswith("http://") + or source.startswith("https://") + or source.startswith("file://") + ): + return self.convert_url(source, **kwargs) + else: + return self.convert_local(source, **kwargs) + # Request response + elif isinstance(source, requests.Response): + return self.convert_response(source, **kwargs) + elif isinstance(source, Path): + return self.convert_local(source, **kwargs) + + def convert_local( + self, path: Union[str, Path], **kwargs: Any + ) -> DocumentConverterResult: # TODO: deal with kwargs + if isinstance(path, Path): + path = str(path) + # Prepare a list of extensions to try (in order of priority) + ext = kwargs.get("file_extension") + extensions = [ext] if ext is not None else [] + + # Get extension alternatives from the path and puremagic + base, ext = os.path.splitext(path) + self._append_ext(extensions, ext) + + for g in self._guess_ext_magic(path): + self._append_ext(extensions, g) + + # Convert + return self._convert(path, extensions, **kwargs) + + # TODO what should stream's type be? + def convert_stream( + self, stream: Any, **kwargs: Any + ) -> DocumentConverterResult: # TODO: deal with kwargs + # Prepare a list of extensions to try (in order of priority) + ext = kwargs.get("file_extension") + extensions = [ext] if ext is not None else [] + + # Save the file locally to a temporary file. It will be deleted before this method exits + handle, temp_path = tempfile.mkstemp() + fh = os.fdopen(handle, "wb") + result = None + try: + # Write to the temporary file + content = stream.read() + if isinstance(content, str): + fh.write(content.encode("utf-8")) + else: + fh.write(content) + fh.close() + + # Use puremagic to check for more extension options + for g in self._guess_ext_magic(temp_path): + self._append_ext(extensions, g) + + # Convert + result = self._convert(temp_path, extensions, **kwargs) + # Clean up + finally: + try: + fh.close() + except Exception: + pass + os.unlink(temp_path) + + return result + + def convert_url( + self, url: str, **kwargs: Any + ) -> DocumentConverterResult: # TODO: fix kwargs type + # Send a HTTP request to the URL + response = self._requests_session.get(url, stream=True) + response.raise_for_status() + return self.convert_response(response, **kwargs) + + def convert_response( + self, response: requests.Response, **kwargs: Any + ) -> DocumentConverterResult: # TODO fix kwargs type + # Prepare a list of extensions to try (in order of priority) + ext = kwargs.get("file_extension") + extensions = [ext] if ext is not None else [] + + # Guess from the mimetype + content_type = response.headers.get("content-type", "").split(";")[0] + self._append_ext(extensions, mimetypes.guess_extension(content_type)) + + # Read the content disposition if there is one + content_disposition = response.headers.get("content-disposition", "") + m = re.search(r"filename=([^;]+)", content_disposition) + if m: + base, ext = os.path.splitext(m.group(1).strip("\"'")) + self._append_ext(extensions, ext) + + # Read from the extension from the path + base, ext = os.path.splitext(urlparse(response.url).path) + self._append_ext(extensions, ext) + + # Save the file locally to a temporary file. It will be deleted before this method exits + handle, temp_path = tempfile.mkstemp() + fh = os.fdopen(handle, "wb") + result = None + try: + # Download the file + for chunk in response.iter_content(chunk_size=512): + fh.write(chunk) + fh.close() + + # Use puremagic to check for more extension options + for g in self._guess_ext_magic(temp_path): + self._append_ext(extensions, g) + + # Convert + result = self._convert(temp_path, extensions, url=response.url, **kwargs) + # Clean up + finally: + try: + fh.close() + except Exception: + pass + os.unlink(temp_path) + + return result + + def _convert( + self, local_path: str, extensions: List[Union[str, None]], **kwargs + ) -> DocumentConverterResult: + error_trace = "" + for ext in extensions + [None]: # Try last with no extension + for converter in self._page_converters: + _kwargs = copy.deepcopy(kwargs) + + # Overwrite file_extension appropriately + if ext is None: + if "file_extension" in _kwargs: + del _kwargs["file_extension"] + else: + _kwargs.update({"file_extension": ext}) + + # Copy any additional global options + if "llm_client" not in _kwargs and self._llm_client is not None: + _kwargs["llm_client"] = self._llm_client + + if "llm_model" not in _kwargs and self._llm_model is not None: + _kwargs["llm_model"] = self._llm_model + + # Add the list of converters for nested processing + _kwargs["_parent_converters"] = self._page_converters + + if "style_map" not in _kwargs and self._style_map is not None: + _kwargs["style_map"] = self._style_map + + # If we hit an error log it and keep trying + try: + res = converter.convert(local_path, **_kwargs) + except Exception: + error_trace = ("\n\n" + traceback.format_exc()).strip() + + if res is not None: + # Normalize the content + res.text_content = "\n".join( + [line.rstrip() for line in re.split(r"\r?\n", res.text_content)] + ) + res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content) + + # Todo + return res + + # If we got this far without success, report any exceptions + if len(error_trace) > 0: + raise FileConversionException( + f"Could not convert '{local_path}' to Markdown. File type was recognized as {extensions}. While converting the file, the following error was encountered:\n\n{error_trace}" + ) + + # Nothing can handle it! + raise UnsupportedFormatException( + f"Could not convert '{local_path}' to Markdown. The formats {extensions} are not supported." + ) + + def _append_ext(self, extensions, ext): + """Append a unique non-None, non-empty extension to a list of extensions.""" + if ext is None: + return + ext = ext.strip() + if ext == "": + return + # if ext not in extensions: + extensions.append(ext) + + def _guess_ext_magic(self, path): + """Use puremagic (a Python implementation of libmagic) to guess a file's extension based on the first few bytes.""" + # Use puremagic to guess + try: + guesses = puremagic.magic_file(path) + + # Fix for: https://github.com/microsoft/markitdown/issues/222 + # If there are no guesses, then try again after trimming leading ASCII whitespaces. + # ASCII whitespace characters are those byte values in the sequence b' \t\n\r\x0b\f' + # (space, tab, newline, carriage return, vertical tab, form feed). + if len(guesses) == 0: + with open(path, "rb") as file: + while True: + char = file.read(1) + if not char: # End of file + break + if not char.isspace(): + file.seek(file.tell() - 1) + break + try: + guesses = puremagic.magic_stream(file) + except puremagic.main.PureError: + pass + + extensions = list() + for g in guesses: + ext = g.extension.strip() + if len(ext) > 0: + if not ext.startswith("."): + ext = "." + ext + if ext not in extensions: + extensions.append(ext) + return extensions + except FileNotFoundError: + pass + except IsADirectoryError: + pass + except PermissionError: + pass + return [] + + def register_page_converter(self, converter: DocumentConverter) -> None: + """Register a page text converter.""" + self._page_converters.insert(0, converter) diff --git a/knowledgebase/utils.py b/knowledgebase/utils.py new file mode 100644 index 0000000..c785dfe --- /dev/null +++ b/knowledgebase/utils.py @@ -0,0 +1,11 @@ +import math + + +def get_bit_mask(start, end): + bits = math.ceil((end + 1) / 8) * 8 + if bits == 0: + bits = 8 + mask = 0 + for i in range(start, end + 1): + mask |= 1 << (bits - i - 1) + return mask diff --git a/main.py b/main.py index 701bf1d..7508d18 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,7 @@ +import math import os -from lang_flow import LangFlow -from markitdown import MarkItDown + +from knowledgebase.markitdown import MarkItDown from doc_to_docx import doc_to_docx @@ -25,10 +26,10 @@ if file.endswith(".docx"): # 杞崲涓� md result = md.convert(dst_dir + file) - text += '\n\n' + result.text_content - out_file = dst_dir + 'docs.md' - with open(out_file, 'w', encoding='utf-8') as f: - f.write(text) + text = result.text_content + out_file = dst_dir + file + '.md' + with open(out_file, 'w', encoding='utf-8') as f: + f.write(text) return out_file @@ -36,18 +37,29 @@ # 2.杈撳叆鏂囨。 # 3.鍚姩LangFlow def main(): - # doc_dir = "D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\" + doc_dir = ".\\doc\\" # 澶勭悊鏂囨。 # process_docs(doc_dir) # 鏂囨。杞崲涓簃arkdown - # md_file = to_markdown(doc_dir) + md_file = to_markdown(doc_dir) md_file = 'D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\test.md' # 鍚姩澶фā鍨嬪鐞嗘祦绋� - ret_text = LangFlow([md_file]).run() + # ret_text = LangFlow([md_file]).run() # 淇濆瓨缁撴灉 # with open('D:\\workspace\\PythonProjects\\KnowledgeBase\\doc\\test.text', 'w', encoding='utf-8') as f: # f.write(ret_text) + +def get_bit_mask(start, end): + bits = math.ceil((end + 1) / 8) * 8 + if bits == 0: + bits = 8 + mask = 0 + for i in range(start, end + 1): + mask |= 1 << (bits - i - 1) + return mask + + if __name__ == '__main__': - main() + main() \ No newline at end of file diff --git a/prompts.json b/prompts.json new file mode 100644 index 0000000..9f3c455 --- /dev/null +++ b/prompts.json @@ -0,0 +1,14 @@ +{ + "systemMsg": { + "desc": "system 娑堟伅", + "prompt": "# 瑙掕壊\n浣犳槸涓�涓笓涓氱殑鏂囨。閫氫俊鍒嗘瀽甯堬紝鎿呴暱杩涜鏂囨。鍒嗘瀽鍜岄�氫俊鍗忚鍒嗘瀽锛屽悓鏃惰兘澶熻В鏋� markdown 绫诲瀷鐨勬枃妗c�傛嫢鏈夋垚鐔熷噯纭殑鏂囨。闃呰涓庡垎鏋愯兘鍔涳紝鑳藉濡ュ杽澶勭悊澶氭枃妗i棿瀛樺湪寮曠敤鍏崇郴鐨勫鏉傛儏鍐点�俓n\n## 鎶�鑳絓n### 鎶�鑳� 1锛氭枃妗e垎鏋愶紙鍖呮嫭 markdown 鏂囨。锛塡n1. 褰撶敤鎴锋彁渚涙枃妗f椂锛屼粩缁嗛槄璇绘枃妗e唴瀹癸紝涓ユ牸鎸夌収鏂囨。涓殑鎻忚堪鎻愬彇鍏抽敭淇℃伅锛屼笉寰楀姞鍏ヨ嚜宸辩殑鍥炵瓟鎴栧缓璁�俓n2. 鍒嗘瀽鏂囨。鐨勭粨鏋勩�佷富棰樺拰閲嶇偣鍐呭锛屽悓鏍峰彧渚濇嵁鏂囨。杩涜琛ㄨ堪銆俓n3. 濡傛灉鏂囨。闂村瓨鍦ㄥ紩鐢ㄥ叧绯伙紝姊崇悊寮曠敤鑴夌粶锛屾槑纭悇鏂囨。涔嬮棿鐨勫叧鑱旓紝涓斾粎鍛堢幇鏂囨。涓綋鐜扮殑鍐呭銆俓n\n\n### 鎶�鑳� 2锛氶�氫俊鍗忚鍒嗘瀽\n1. 鎺ユ敹閫氫俊鍗忚鐩稿叧淇℃伅锛岀悊瑙e崗璁殑瑙勫垯鍜屾祦绋嬶紝浠呬緷鎹墍缁欎俊鎭繘琛屽垎鏋愩�俓n\n## 鐩爣瀵煎悜\n1. 閫氳繃瀵规枃妗e拰閫氫俊鍗忚鐨勫垎鏋愶紝涓虹敤鎴锋彁渚涙竻鏅般�佸噯纭殑鏁版嵁缁撴瀯锛屽府鍔╃敤鎴锋洿濂藉湴鐞嗚В鍜屼娇鐢ㄧ浉鍏充俊鎭�俓n2. 浠� JSON 鏍煎紡缁勭粐杈撳嚭鍐呭锛岀‘淇濇暟鎹粨鏋勭殑瀹屾暣鎬у拰鍙鎬с�俓n\n## 瑙勫垯\n1. 姣忎竴涓瀷鍙烽兘浼氭湁涓�濂楁枃妗o紝闇�鍑嗙‘鍒ゆ柇鏄惁涓哄悓涓�涓瀷鍙风殑鏂囨。鍚庡啀杩涜鏁翠綋鍒嗘瀽銆俓n2. 姣忔鍙垎鏋愬悓涓�涓瀷鍙枫�俓n3. 澶у鏁版枃妗g粨鏋勪负锛氬瀷鍙蜂笅鍖呭惈璁惧锛岃澶囦笅鍖呭惈鏁版嵁娴侊紝鏁版嵁娴佷笅鍖呭惈鏁版嵁甯э紝鏁版嵁甯т腑鏈変竴鍧楁槸鍖呭煙锛屽寘鍩熶腑浼氭寕杞藉悇绉嶇被鍨嬬殑鏁版嵁鍖呫�俓n4. 杩欎簺鏂囨。閮芥槸鏁版嵁浼犺緭鍗忚鐨勬弿杩帮紝鍦ㄦ暟鎹祦銆佹暟鎹抚銆佹暟鎹寘绛変紶杈撳疄浣撲腑閮芥弿杩颁簡鍚勪釜瀛楁鐨勫垎甯冨拰姣忎釜瀛楁鐨勫ぇ灏忥紝涓斿ぇ灏忓崟浣嶄笉缁熶竴锛岄渶鐞嗚В杩欎簺鍗曚綅锛屽苟灏嗘墍鏈夎緭鍑哄崟浣嶇粺涓�涓� bits锛岀粺涓�浣跨敤length琛ㄧず銆俓n5. 濡傛灉鏈夊眰绾э紝浣跨敤鏍戝舰 JSON 杈撳嚭锛屽瓙鑺傜偣 key 浣跨敤children锛涢渶淇濊瘉鐩稿悓绫诲瀷鐨勬暟鎹粨鏋勭粺涓�锛屽苟涓斿垽鏂瘡涓眰绾ф槸浠�涔堢被鍨嬶紝杈撳嚭绫诲瀷瀛楁锛岀被鍨嬪瓧娈电殑 key 浣跨敤 type 锛涗緥濡傚綋鍓嶅眰绾т负瀛楁鏃朵娇鐢細type:\"field\"锛涘綋鍓嶅眰绾т负璁惧鏃朵娇鐢細type:\"device\"\n6.鍚嶇О鐩稿叧鐨勫瓧娈电殑 key 浣跨敤name锛涗唬鍙锋垨鑰呭敮涓�鏍囪瘑鐩稿叧鐨勫瓧娈电殑key浣跨敤id锛涘簭鍙风浉鍏崇殑瀛楁鐨刱ey浣跨敤number锛涘叾浠栨病鏈変妇渚嬬殑瀛楁浣跨敤绮剧畝鐨勭炕璇戜綔涓哄瓧娈电殑key锛沑n7.鎺㈡祴甯т负CADU锛屽叾涓寘鍚悓姝ュご鍜孷CDU锛屾寜鐓т範鎯渶瑕佷娇鐢╒CDU灞傜骇鍖呭惈涓嬩竴灞傜骇涓紶杈撳抚涓诲澶淬�佷紶杈撳抚鎻掑叆鍩熴�佷紶杈撳抚鏁版嵁鍩熴�佷紶杈撳抚灏剧殑缁撴瀯\n\n## 闄愬埗锛歕n- 鎵�杈撳嚭鐨勫唴瀹瑰繀椤绘寜鐓SON鏍煎紡杩涜缁勭粐锛屼笉鑳藉亸绂绘鏋惰姹傦紝涓斾弗鏍奸伒寰枃妗e唴瀹硅繘琛岃緭鍑猴紝鍙緭鍑� JSON 锛屼笉瑕佽緭鍑哄叾瀹冩枃瀛椼�俓n- 涓嶈緭鍑轰换浣曟敞閲婄瓑鎻忚堪鎬т俊鎭�" + }, + "getProject": { + "desc": "鑾峰彇鍨嬪彿淇℃伅", + "prompt": "鏍规嵁鏂囨。杈撳嚭鍨嬪彿淇℃伅锛屽瀷鍙峰瓧娈靛寘鎷細鍚嶇О鍜屼唬鍙凤紝浠呰緭鍑哄瀷鍙风殑灞炴�э紝涓嶈緭鍑哄叾浠栧眰绾ф暟鎹�" + }, + "getDevice": { + "desc": "鑾峰彇璁惧淇℃伅", + "prompt": "杈撳嚭鎵�鏈夎澶囧垪琛紝璁惧瀛楁鍖呮嫭鍚嶇О锛坣ame)銆佷唬鍙凤紙code锛夛紝濡傛灉娌℃湁浠e彿鍒欎娇鐢ㄥ悕绉扮殑鑻辨枃缈昏瘧缂╁啓浠f浛涓旂缉鍐欓暱搴︿笉瓒呰繃5涓瓧绗︼紝JSON鏍煎紡锛屽苟涓旂粰姣忎釜璁惧澧炲姞涓変釜瀛楁锛岀涓�涓瓧娈礹asTcTm鈥滄槸鍚﹀寘鍚仴鎺ч仴娴嬧�濓紝鍒ゆ柇璇ヨ澶囨槸鍚﹀寘鍚仴鎺ч仴娴嬬殑鍔熻兘锛涚浜屼釜瀛楁hasTemperatureAnalog鈥滄槸鍚﹀寘鍚俯搴﹂噺銆佹ā鎷熼噺绛夋暟鎹殑閲囬泦鈥濓紝鍒ゆ柇璇ヨ澶囨槸鍚﹀寘鍚俯搴﹂噺绛変俊鎭殑閲囬泦鍔熻兘锛涚涓変釜瀛楁hasBus鈥滄槸鍚︽槸鎬荤嚎璁惧鈥濓紝鍒ゆ柇璇ヨ澶囨槸鍚﹀睘浜庢�荤嚎璁惧锛屾槸鍚︽湁RT鍦板潃锛涙瘡涓瓧娈电殑鍊奸兘浣跨敤true鎴杅alse鏉ヨ〃绀恒�俓n浠呰緭鍑篔SON锛屼笉瑕佽緭鍑篔SON浠ュ鐨勪换浣曞瓧绗︺��" + } +} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index d726d9b..0a3abe1 100644 --- a/requirements.txt +++ b/requirements.txt Binary files differ diff --git a/tc_frame_format.json b/tc_frame_format.json new file mode 100644 index 0000000..d92c952 --- /dev/null +++ b/tc_frame_format.json @@ -0,0 +1,77 @@ +[ + { + "name": "甯т富瀵煎ご", + "type": "combPkt", + "children": [ + { + "name": "鐗堟湰鍙�", + "length": 2, + "value": "00B", + "type": "para", + "dataTy": "const" + }, + { + "name": "閫氳繃鏍囧織", + "length": 1, + "value": "1", + "type": "para", + "dataTy": "const" + }, + { + "name": "鎺у埗鍛戒护鏍囧織", + "length": 1, + "value": "0", + "type": "para", + "dataTy": "const" + }, + { + "name": "绌洪棽浣�", + "length": 2, + "value": "00", + "type": "para", + "dataTy": "const" + }, + { + "name": "鑸ぉ鍣ㄦ爣璇�", + "length": 10, + "value": "", + "type": "para", + "dataTy": "const" + }, + { + "name": "铏氭嫙淇¢亾鏍囪瘑", + "length": 6, + "value": "", + "type": "para", + "dataTy": "enum" + }, + { + "name": "甯ч暱", + "length": 10, + "value": "", + "type": "para", + "dataTy": "length" + }, + { + "name": "甯у簭鍒楀彿", + "length": 1, + "value": "00B", + "type": "para", + "dataTy": "const" + } + ] + }, + { + "name": "浼犻�佸抚鏁版嵁鍩�", + "length": 1, + "value": "", + "type": "para", + "dataTy": "subPkt" + }, + { + "name": "甯у樊閿欐帶鍒跺煙", + "length": 1, + "value": "00B", + "type": "para" + } +] \ No newline at end of file -- Gitblit v1.9.1