logologo
开始
手册
开发
插件
API
English
简体中文
开始
手册
开发
插件
API
English
简体中文
logologo
集群模式
概述
准备工作
Kubernetes 部署
运维流程
服务拆分
开发参考
Previous Page服务拆分

#插件开发

#背景问题

在单节点环境中,插件通常可以通过进程内状态、事件或任务来完成需求;而在集群模式下,同一插件可能同时运行在多个实例上,面临以下典型问题:

  • 状态一致性:配置或运行时数据若只存储在内存中,很难在实例间同步,容易出现脏读或重复执行。
  • 任务调度:长耗时任务若无明确排队与确认机制,会造成多个实例并发执行同一任务。
  • 竞争条件:涉及 schema 变更或资源分配时,需要序列化操作,避免出现并发写入导致的冲突。

NocoBase 核心在应用层预置了多种中间件接口,帮助插件在集群环境下复用统一能力。下面将结合源码介绍缓存、同步消息、消息队列与分布式锁的用法及最佳实践。

#解决方案

#缓存组件(Cache)

对于要保存在内存中的数据,建议使用系统内置的缓存组件进行管理。

  • 通过 app.cache 获取默认缓存实例。
  • Cache 提供 set/get/del/reset 等基础操作,还支持 wrap 与 wrapWithCondition 封装缓存逻辑,以及 mset/mget/mdel 等批量方法。
  • 集群部署时建议将共享数据放入具备持久化能力的存储(如 Redis),并合理设置 ttl,避免实例重启导致缓存丢失。

示例:plugin-auth 中的缓存初始化与使用

在插件中创建并使用缓存
// packages/plugins/@nocobase/plugin-auth/src/server/plugin.ts
async load() {
  this.cache = await this.app.cacheManager.createCache({
    name: 'auth',
    prefix: 'auth',
    store: 'redis',
  });

  await this.cache.wrap('token:config', async () => {
    const repo = this.app.db.getRepository('tokenPolicies');
    return repo.findOne({ filterByTk: 'default' });
  }, 60 * 1000);
}

#同步信号管理器(SyncMessageManager)

如果内存中的状态无法使用分布式缓存(如无法序列化),那么当状态随用户操作发生变化时,需要将变化通过同步信号通知到其他实例,以保持状态一致。

  • 插件基类已实现 sendSyncMessage,内部调用 app.syncMessageManager.publish 并自动为通道加上应用级前缀,避免通道冲突。
  • publish 可指定 transaction,消息会在数据库事务提交后再发送,保证状态与消息同步。
  • 通过 handleSyncMessage 处理其他实例发来的消息,可在 beforeLoad 阶段订阅,对配置变更、Schema 同步等场景十分适用。

示例:plugin-data-source-main 通过同步消息保持多节点 schema 一致

在插件内同步
export class PluginDataSourceMainServer extends Plugin {
  async handleSyncMessage(message) {
    if (message.type === 'syncCollection') {
      await this.app.db.getRepository('collections').load(message.collectionName);
    }
  }

  private sendSchemaChange(data, options) {
    this.sendSyncMessage(data, options); // 自动调用 app.syncMessageManager.publish
  }
}

#消息广播管理器(PubSubManager)

消息广播是同步信号的底层组件,也支持直接使用。当需要在实例间广播消息时,可通过该组件实现。

  • app.pubSubManager.subscribe(channel, handler, { debounce }) 可在实例间订阅通道;debounce 选项用于去抖动,避免重复广播导致的频繁回调。
  • publish 支持 skipSelf(默认 true)与 onlySelf,用于控制消息是否回发到本实例。
  • 需在应用启动前配置适配器(如 Redis、RabbitMQ 等),否则默认不会连接外部消息系统。

示例:plugin-async-task-manager 使用 PubSub 广播任务取消事件

广播任务取消信号
const channel = `${plugin.name}.task.cancel`;

await this.app.pubSubManager.subscribe(channel, async ({ id }) => {
  this.logger.info(`Task ${id} cancelled on other node`);
  await this.stopLocalTask(id);
});

await this.app.pubSubManager.publish(channel, { id: taskId }, { skipSelf: true });

#消息队列组件(EventQueue)

消息队列用于调度异步任务,适合处理长耗时或可重试的操作。

  • 通过 app.eventQueue.subscribe(channel, { idle, process, concurrency }) 声明消费者,process 返回 Promise,可使用 AbortSignal.timeout 控制超时。
  • publish 会自动补齐应用名前缀,并支持 timeout、maxRetries 等选项,默认适配内存队列,可根据需要切换到 RabbitMQ 等扩展适配器。
  • 在集群中,确保所有节点使用同一适配器,以避免任务在节点间割裂。

示例:plugin-async-task-manager 使用 EventQueue 调度任务

在队列中分发异步任务
this.app.eventQueue.subscribe(`${plugin.name}.task`, {
  concurrency: this.concurrency,
  idle: this.idle,
  process: async (payload, { signal }) => {
    await this.runTask(payload.id, { signal });
  },
});

await this.app.eventQueue.publish(`${plugin.name}.task`, { id: taskId }, { maxRetries: 3 });

#分布式锁管理器(LockManager)

在需要避免竞态操作时,可以使用分布式锁来序列化对资源的访问。

  • 默认提供基于进程的 local 适配器,可注册 Redis 等分布式实现;通过 app.lockManager.runExclusive(key, fn, ttl) 或 acquire/tryAcquire 控制并发。
  • ttl 用于兜底释放锁,防止异常情况下锁被永远持有。
  • 常见场景包括:Schema 变更、防止重复任务、限流等。

示例:plugin-data-source-main 使用分布式锁保护字段删除流程

序列化字段删除操作
const lockKey = `${this.name}:fields.beforeDestroy:${collectionName}`;
await this.app.lockManager.runExclusive(lockKey, async () => {
  await fieldModel.remove(options);
  this.sendSyncMessage({ type: 'removeField', collectionName, fieldName });
});

#开发建议

  • 内存状态一致性:尽量避免在开发中使用内存状态,改用缓存或同步消息保持状态一致。
  • 优先复用内置接口:统一使用 app.cache、app.syncMessageManager 等能力,避免在插件中重复实现跨节点通信逻辑。
  • 关注事务边界:带事务的操作应使用 transaction.afterCommit(syncMessageManager.publish 已内置)以保证数据与消息一致。
  • 制定退避策略:对于队列与广播任务,合理设置 timeout、maxRetries、debounce,防止在异常情况下产生新的流量洪峰。
  • 配套监控与日志:善用应用日志记录通道名称、消息载荷、锁 key 等信息,方便排查集群下的偶发问题。

通过以上能力,插件可以在不同实例间安全共享状态、同步配置、调度任务,满足集群部署场景下的稳定性与一致性要求。