Python
Timeout装饰器 timeout装饰器实现了超时回调的功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import threadingimport asyncioclass Timeout : def __init__ (self, wait, on_timeout ): self.wait = wait self.on_timeout = on_timeout self.timer = None def __call__ (self, fn ): def wrapped (*args, **kwargs ): result = fn(*args, **kwargs) if self.timer is not None : self.timer.cancel() self.timer = threading.Timer(self.wait, self._on_timer) self.timer.start() return result return wrapped def _on_timer (self ): asyncio.run(self.on_timeout())
应用样例(active_message):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 async def call_back (): global message_package message, group_id, user_id = message_package['message' ], message_package['group_id' ], message_package['user_id' ] message_package = { "message" : "" , "group_id" : "" , "user_id" : "" } await message_reply(message, group_id, user_id)@Timeout(wait=10 , on_timeout=call_back ) async def active_message (message, group_id, user_id ): global message_package message_package['message' ] += f"{message} \n" message_package['group_id' ] = str (group_id) message_package['user_id' ] = str (user_id) async def group_handle (message, group_id, user_id ): if message == "开启对话" : return await start_conversation(group_id, user_id) elif message == "停止对话" : return await stop_conversation(group_id, user_id) elif message == "遗忘对话" : return await forget_conversation(group_id, user_id) elif message == "重启对话" : return await restart_conversation(group_id, user_id) elif user_id in active_conversations: await active_message(message, group_id, user_id) else : return "对话未开启,请输入'开启对话'以开始聊天。"
Record装饰器 record装饰器实现了记录消息到db数据库的功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 import sqlite3import osimport statimport platformfrom config import get_value value = get_value()def Record (func ): async def __group_message_record__ (message_objects ): if message_objects['message_type' ] == 'group' : message_time = message_objects['time' ] message = message_objects['message' ] group_id = message_objects['group_id' ] user_id = message_objects['user_id' ] data_path = value.data_path + '/{group_id}/' .format (group_id=group_id) group_path = data_path if not os.path.exists(data_path): os.makedirs(data_path) if platform.system() != 'Windows' : os.chmod(data_path, stat.S_IRWXO) data_path = group_path + '/chat_record.db' record_file = data_path if not os.path.isfile(data_path): conn = sqlite3.connect(data_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS record ( id INTEGER PRIMARY KEY AUTOINCREMENT, qq_id TEXT NOT NULL, content TEXT NOT NULL, time TEXT NOT NULL ) ''' ) conn.commit() conn.close() else : conn = sqlite3.connect(record_file) cursor = conn.cursor() cursor.execute(''' INSERT INTO record (qq_id, content, time) VALUES (?, ?, ?) ''' , (user_id, message, message_time)) conn.commit() conn.close() if message_objects['message_type' ] == 'private' : print ("私聊消息" ) await func(message_objects) return __group_message_record__
message_objects消息格式样例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 { "self_id" : "BOT_ID" , "user_id" : "USER_ID" , "time" : 1730539514 , "message_id" : 1756106583 , "message_seq" : 1756106583 , "real_id" : 1756106583 , "message_type" : "group" , "sender" : { "user_id" : "USER_ID" , "nickname" : "用户A" , "card" : "洛" , "role" : "owner" } , "raw_message" : "[CQ:at,qq=USER_ID] 你好世界[CQ:face,id=63]" , "font" : 14 , "sub_type" : "normal" , "message" : "[CQ:at,qq=USER_ID] 你好世界[CQ:face,id=63]" , "message_format" : "string" , "post_type" : "message" , "group_id" : GROUP_ID}
应用样例:
1 2 3 4 5 6 7 8 9 10 @Record async def message_handle (message_objects ): if message_objects['message_type' ] == 'group' : message = message_objects['message' ] group_id = message_objects['group_id' ] user_id = message_objects['user_id' ] await plugin_manager.handle_group_message(message, group_id, user_id) if message_objects['message_type' ] == 'private' : pass
Driver装饰器 driver装饰器用于插件启动前,停止前函数调用声明
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from typing import Callable , Coroutine , Any , List class Driver : def __init__ (self ): self._startup_callbacks: List [Callable [[], Coroutine [Any , Any , None ]]] = [] self._shutdown_callbacks: List [Callable [[], Coroutine [Any , Any , None ]]] = [] def on_startup (self, func: Callable [[], Coroutine [Any , Any , None ]] ): self._startup_callbacks.append(func) return func def on_shutdown (self, func: Callable [[], Coroutine [Any , Any , None ]] ): self._shutdown_callbacks.append(func) return func async def run_startup (self ): for callback in self._startup_callbacks: await callback() async def run_shutdown (self ): for callback in self._shutdown_callbacks: await callback() driver = Driver()
调用样例:
1 2 3 4 5 6 7 8 9 10 11 12 from luo9 import get_driver driver = get_driver()@driver.on_startup async def _ (): print ("插件启动前处理" )@driver.on_shutdown async def _ (): print ("插件停止前处理" )
被on_startup标记的函数会在bot启动前(插件载入阶段)进行调用
被on_shutdown标记的函数会在bot停止前(插件停止阶段)进行调用