Python


Timeout装饰器

timeout装饰器实现了超时回调的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import threading
import asyncio


class Timeout:
def __init__(self, wait, on_timeout):
self.wait = wait
self.on_timeout = on_timeout
self.timer = None

def __call__(self, fn):
def wrapped(*args, **kwargs):
# 执行原函数
result = fn(*args, **kwargs)
# 取消之前的定时器
if self.timer is not None:
self.timer.cancel()
# 创建新的定时器
self.timer = threading.Timer(self.wait, self._on_timer)
self.timer.start()
return result
return wrapped

def _on_timer(self):
asyncio.run(self.on_timeout())

应用样例(active_message):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async def call_back():
global message_package
message, group_id, user_id = message_package['message'], message_package['group_id'], message_package['user_id']
message_package = {
"message": "",
"group_id": "",
"user_id": ""
}
await message_reply(message, group_id, user_id)

@Timeout(wait=10, on_timeout=call_back)
async def active_message(message, group_id, user_id):
global message_package
message_package['message'] += f"{message}\n"
message_package['group_id'] = str(group_id)
message_package['user_id'] = str(user_id)

async def group_handle(message, group_id, user_id):
if message == "开启对话":
return await start_conversation(group_id, user_id)
elif message == "停止对话":
return await stop_conversation(group_id, user_id)
elif message == "遗忘对话":
return await forget_conversation(group_id, user_id)
elif message == "重启对话":
return await restart_conversation(group_id, user_id)
elif user_id in active_conversations:
await active_message(message, group_id, user_id)
else:
return "对话未开启,请输入'开启对话'以开始聊天。"


Record装饰器

record装饰器实现了记录消息到db数据库的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import sqlite3
import os
import stat
import platform
from config import get_value
value = get_value()

def Record(func):
# __group_message_record__
async def __group_message_record__(message_objects):
if message_objects['message_type'] == 'group':
message_time = message_objects['time']
message = message_objects['message']
group_id = message_objects['group_id']
user_id = message_objects['user_id']

# 群文件夹
data_path = value.data_path + '/{group_id}/'.format(group_id=group_id)
group_path = data_path
if not os.path.exists(data_path):
os.makedirs(data_path)
if platform.system() != 'Windows':
os.chmod(data_path, stat.S_IRWXO)

data_path = group_path + '/chat_record.db'
record_file = data_path
if not os.path.isfile(data_path):
conn = sqlite3.connect(data_path)
cursor = conn.cursor()

cursor.execute('''
CREATE TABLE IF NOT EXISTS record (
id INTEGER PRIMARY KEY AUTOINCREMENT,
qq_id TEXT NOT NULL,
content TEXT NOT NULL,
time TEXT NOT NULL
)
''')

conn.commit()
conn.close()
else:
conn = sqlite3.connect(record_file)
cursor = conn.cursor()

cursor.execute('''
INSERT INTO record (qq_id, content, time)
VALUES (?, ?, ?)
''', (user_id, message, message_time))

conn.commit()
conn.close()
if message_objects['message_type'] == 'private':
print("私聊消息")
await func(message_objects)
return __group_message_record__

message_objects消息格式样例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"self_id": "BOT_ID",
"user_id": "USER_ID",
"time": 1730539514,
"message_id": 1756106583,
"message_seq": 1756106583,
"real_id": 1756106583,
"message_type": "group",
"sender": {
"user_id": "USER_ID",
"nickname": "用户A",
"card": "洛",
"role": "owner"
},
"raw_message": "[CQ:at,qq=USER_ID] 你好世界[CQ:face,id=63]",
"font": 14,
"sub_type": "normal",
"message": "[CQ:at,qq=USER_ID] 你好世界[CQ:face,id=63]",
"message_format": "string",
"post_type": "message",
"group_id": GROUP_ID
}

应用样例:

1
2
3
4
5
6
7
8
9
10
@Record
async def message_handle(message_objects):
if message_objects['message_type'] == 'group':
message = message_objects['message']
group_id = message_objects['group_id']
user_id = message_objects['user_id']

await plugin_manager.handle_group_message(message, group_id, user_id)
if message_objects['message_type'] == 'private':
pass


Driver装饰器

driver装饰器用于插件启动前,停止前函数调用声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from typing import Callable, Coroutine, Any, List

class Driver:
def __init__(self):
self._startup_callbacks: List[Callable[[], Coroutine[Any, Any, None]]] = []
self._shutdown_callbacks: List[Callable[[], Coroutine[Any, Any, None]]] = []

def on_startup(self, func: Callable[[], Coroutine[Any, Any, None]]):
self._startup_callbacks.append(func)
return func

def on_shutdown(self, func: Callable[[], Coroutine[Any, Any, None]]):
self._shutdown_callbacks.append(func)
return func

async def run_startup(self):
for callback in self._startup_callbacks:
await callback()

async def run_shutdown(self):
for callback in self._shutdown_callbacks:
await callback()

driver = Driver()

调用样例:

1
2
3
4
5
6
7
8
9
10
11
12
from luo9 import get_driver

driver = get_driver()

@driver.on_startup
async def _():
print("插件启动前处理")


@driver.on_shutdown
async def _():
print("插件停止前处理")

被on_startup标记的函数会在bot启动前(插件载入阶段)进行调用

被on_shutdown标记的函数会在bot停止前(插件停止阶段)进行调用