背景

洛玖机器人的插件系统基于 FFI 消息总线:每个插件是独立的动态库(DLL/SO),运行在自己的 OS 线程中,通过 luo9_coreextern "C" 函数进行进程内 pub/sub 通信。

传统的消息分发是广播模式——宿主 publish 一条消息,所有 subscriber 同时收到。这简单高效,但有一个问题:无法控制消息的分发顺序,也无法让某个插件”拦截”消息,阻止后续插件处理。

最近为洛玖实现了插件优先级和消息阻断机制,关键在于:插件代码零修改

核心思路:从广播到定向分发

原来的模型

1
2
3
4
5
6
7
8
宿主 dispatch_message()


Bus::topic("luo9_message").publish(json)

├──► 插件A 的队列 (fan-out 副本)
├──► 插件B 的队列 (fan-out 副本)
└──► 插件C 的队列 (fan-out 副本)

所有插件同时收到消息,没有顺序,没有阻断。

新的模型

1
2
3
4
5
6
7
8
9
宿主 priority_dispatch_message()

▼ 按优先级降序遍历

├──► Bus::publish_to("luo9_message", json, [插件A的sub_id])
│ (插件A 是高优先级,block_enabled=true)
│ → 插件A 收到消息后,停止分发

✗ 插件B 和 插件C 不会收到这条消息

关键变化:用 publish_to(定向推送)替代 publish(广播)。

技术实现

1. Bus 层:新增 publish_tounsubscribe

luo9_core 的 Bus 实现中,新增了两个核心函数:

publish_to(定向发布):与 publish 的广播行为不同,publish_to 只向指定的 subscriber ID 列表推送消息。内部实现会跳过已标记为 dead 的 subscriber,并在推送完成后通过 notify_all 唤醒可能阻塞的消费者线程。

unsubscribe(取消订阅):将 subscriber 标记为 dead,移除其消息队列,然后唤醒阻塞在 wait_pop 上的线程。被唤醒的线程会检测到 dead 状态并返回哨兵消息,从而让插件优雅退出。

Bus 内部维护了一个 per-topic 的 dead set,publishpublish_topopwait_pop 都会在操作前检查该 set,确保已取消订阅的 subscriber 不会再收到任何消息。

2. 宿主层:预分配 subscriber

传统模式下,每个插件启动时自行调用 Bus::topic("luo9_message").subscribe() 创建 subscriber。

新模式下,宿主在加载插件时预先为插件创建 subscriber

1
2
3
4
5
6
7
8
9
fn create_subscribers(plugin_name: &str) -> HashMap<String, usize> {
let topics = ["luo9_message", "luo9_notice", "luo9_meta_event", "luo9_task", "luo9_send"];
let mut ids = HashMap::new();
for topic in &topics {
let id = Bus::topic(topic).subscribe().unwrap();
ids.insert(topic.to_string(), id);
}
ids
}

然后通过 FFI 将这些 subscriber ID 传递给插件的 SDK:

1
2
3
4
5
6
7
8
#[repr(C)]
pub struct PluginSubscribers {
pub message_sub_id: i32,
pub meta_event_sub_id: i32,
pub notice_sub_id: i32,
pub task_sub_id: i32,
pub send_sub_id: i32,
}

宿主在 spawn 插件线程时,先调用 luo9_init_subscribers 传递映射:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fn run_plugin(lib: Arc<Library>, plugin_name: &str, subscriber_ids: HashMap<String, usize>) {
// 1. 传递预分配的 subscriber ID
if let Ok(init_fn) = lib.get::<InitSubscribersFn>(b"luo9_init_subscribers\0") {
let subs = PluginSubscribersRaw {
message_sub_id: subscriber_ids.get("luo9_message").copied().unwrap_or(0) as i32,
// ... 其他 topic
};
init_fn(&subs);
}

// 2. 调用插件入口
let plugin_main: Symbol<unsafe extern "C" fn()> = lib.get(b"plugin_main\0").unwrap();
plugin_main();
}

3. SDK 层:透明替换

插件的 SDK 中,Topic::subscribe() 会检查是否有预分配的 subscriber ID:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
pub static PRECREATED_SUBSCRIBERS: OnceLock<Mutex<HashMap<String, usize>>> = OnceLock::new();

impl<'a> Topic<'a> {
pub fn subscribe(&self) -> Result<usize, BusError> {
// 检查是否有预分配的 subscriber_id
if let Some(map) = PRECREATED_SUBSCRIBERS.get() {
if let Some(&id) = map.lock().unwrap().get(self.name) {
return Ok(id); // 直接返回预分配的 ID
}
}
// 否则正常创建 subscriber
let ret = unsafe { luo9_bus_subscribe(topic.as_ptr()) };
// ...
}
}

这就是插件代码零修改的秘密:插件照常调用 Bus::topic("luo9_message").subscribe(),但 SDK 内部返回的是宿主预分配的 ID。插件完全感知不到区别。

4. 优先级分发

宿主维护一个按优先级排序的分发列表,存储在 RwLock 中实现无锁快速读取:

1
2
3
4
5
6
7
8
9
10
static DISPATCH_LIST: RwLock<Vec<DispatchEntry>> = RwLock::new(Vec::new());

pub struct DispatchEntry {
pub name: String,
pub priority: i32,
pub block_enabled: bool,
pub message_sub_id: Option<usize>,
pub notice_sub_id: Option<usize>,
pub meta_event_sub_id: Option<usize>,
}

注意 message_sub_id 的类型是 Option<usize> 而非 usize。这里有一个关键的设计考量——subscriber_id 从 0 开始

分发逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
pub fn priority_dispatch_message(msg: Message) {
let payload = serde_json::to_string(&PluginData::Message(msg)).unwrap();
let list = DISPATCH_LIST.read().unwrap();

for entry in list.iter() {
// 跳过未订阅该 topic 的插件
let Some(sub_id) = entry.message_sub_id else {
continue;
};

// 定向推送到这个插件的 subscriber 队列
Bus::topic("luo9_message")
.publish_to(&payload, &[sub_id])
.unwrap();

// 如果这个插件启用了阻断,停止分发
if entry.block_enabled {
break;
}
}
}

5. subscriber_id 从 0 开始:一个踩坑记录

实现过程中遇到了一个隐蔽的 bug:第一个加载的插件 subscriber_id 为 0,被错误地当作”未订阅”处理。

最初的设计中,DispatchEntry 使用 usize 类型:

1
2
3
pub struct DispatchEntry {
pub message_sub_id: usize, // 0 表示"未订阅"
}

分发时用 == 0 判断是否跳过:

1
2
3
if entry.message_sub_id == 0 {
continue; // 跳过未订阅的插件
}

这在大多数语言中不会有问题(ID 通常从 1 开始),但 luo9_core 的 subscriber_id 是 per-topic 从 0 开始递增的。第一个加载的插件 plugin_doro 获得了 luo9_message 的 subscriber_id = 0,结果被错误跳过。

修复方案:将 DispatchEntry 改为 Option<usize>,用 None 表示”未订阅”,用 Some(0) 表示”合法的 subscriber_id 0”:

1
2
// 之前:message_sub_id: h.subscriber_ids.get("luo9_message").copied().unwrap_or(0),
// 之后:message_sub_id: h.subscriber_ids.get("luo9_message").copied(),

分发时用模式匹配替代数值比较:

1
2
3
// 之前:if entry.message_sub_id == 0 { continue; }
// 之后:
let Some(sub_id) = entry.message_sub_id else { continue };

这个教训很简单:永远不要用魔术值表示”无”,用 Option

6. 热重载:Sentinel 机制

禁用插件时,需要让插件线程退出。但插件的 plugin_main 通常阻塞在 wait_pop 上(Condvar 等待),如何唤醒它?

答案是 sentinel(哨兵消息)luo9_core 定义了一个特殊的哨兵字符串 __luo9_unsubscribed__,当 subscriber 被取消订阅后,wait_pop 会返回该哨兵而非正常消息。

禁用流程:

1
2
3
4
5
6
// PluginHandle
pub fn unsubscribe_all(&self) {
for (topic, &sub_id) in &self.subscriber_ids {
Bus::topic(topic).unsubscribe(sub_id);
}
}

unsubscribe 会:

  1. 将 subscriber_id 加入 dead set
  2. 移除 subscriber(队列被丢弃)
  3. notify_all 唤醒阻塞的 wait_pop

被唤醒的 wait_pop 检测到 subscriber 已被移除且在 dead set 中,返回哨兵消息。SDK 的 wait_pop 识别到哨兵后返回 Err(BusError::Unsubscribed),插件的循环自然退出:

1
2
3
4
5
6
7
8
9
// 插件代码(无需修改)
loop {
let msg = match Bus::topic("luo9_message").wait_pop(sub_id) {
Ok(msg) => msg,
Err(BusError::Unsubscribed) => break, // 收到取消订阅信号,退出
Err(e) => { error!("错误: {:?}", e); continue; }
};
// 处理消息...
}

插件线程退出后,Arc<Library> 的引用计数归零,dlclose 自动执行,动态库被卸载。热重载时只需重新加载 .dll、创建新 subscriber、spawn 新线程。

7. 配置持久化

插件的优先级和阻断配置通过主配置文件持久化:

1
2
3
4
5
6
7
8
9
[[plugins.plugins]]
name = "plugin_doro"
priority = 3
block_enabled = true

[[plugins.plugins]]
name = "plugin_epic"
priority = 1
block_enabled = false

宿主启动时,init_global_manager 将配置中的 priorityblock_enabled 应用到每个插件的 PluginHandlePluginInfo

1
2
3
4
5
6
7
for mut handle in handles {
if let Some(entry) = config_entries.iter().find(|e| e.name == handle.name) {
handle.priority = entry.priority;
handle.block_enabled = entry.block_enabled;
}
manager.register_handle(handle);
}

WebUI 修改优先级或阻断时,同步写回配置文件,确保重启后配置不丢失。

架构图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
┌─────────────────────────────────────┐
│ 宿主 (luo9_bot) │
│ │
│ PluginManager │
│ ├── PluginHandle A (priority=10) │
│ │ ├── subscriber_ids: { │
│ │ │ message: 0, notice: 0 } │
│ │ ├── block_enabled: true │
│ │ └── thread_handle: JoinHandle │
│ ├── PluginHandle B (priority=5) │
│ │ └── subscriber_ids: { │
│ │ message: 1, notice: 1 } │
│ └── PluginHandle C (priority=0) │
│ └── subscriber_ids: { │
│ message: 2, notice: 2 } │
│ │
│ DISPATCH_LIST: RwLock<Vec<...>> │
│ (按 priority 降序排列) │
│ │
│ priority_dispatch_message() │
│ │ │
│ ├─► publish_to(msg, [0]) ← A │
│ │ A.block_enabled → break │
│ ├─► publish_to(msg, [1]) ← B │
│ └─► publish_to(msg, [2]) ← C │
└──────────────┬──────────────────────┘

┌──────────────▼──────────────────────┐
│ luo9_core (FFI 消息总线) │
│ │
│ 每个 topic 维护独立的 subscriber │
│ 队列,per-topic 编号从 0 开始。 │
│ │
│ publish_to: 定向推送到指定队列 │
│ unsubscribe: 标记 dead + 唤醒线程 │
│ wait_pop: dead 时返回哨兵消息 │
└─────────────────────────────────────┘

subscriber ID 是 per-topic 的——每个 topic 独立编号,从 0 开始。插件 A 在 luo9_message 上是 0,在 luo9_notice 上也是 0,但它们是不同的 subscriber。

总结

改动 插件感知
Core Bus 新增 publish_tounsubscribe、dead set、sentinel
SDK subscribe() 返回预分配 ID,wait_pop 识别 sentinel
宿主 优先级分发、热重载、句柄管理、配置持久化
插件 零改动 完全透明

核心设计原则:将复杂性下沉到基础设施层,让上层保持简单。 插件开发者只需要关心业务逻辑,优先级、阻断、热重载全部由宿主和 SDK 自动处理。


这套机制的灵感来自操作系统的进程调度:高优先级进程先执行,可以抢占低优先级进程的 CPU 时间。只不过这里”抢占”的是消息的接收权。