logologo
开始
手册
开发
插件
API
首页
English
简体中文
日本語
한국어
Español
Português
Deutsch
Français
Русский
Italiano
Türkçe
Українська
Tiếng Việt
Bahasa Indonesia
ไทย
Polski
Nederlands
Čeština
العربية
עברית
हिन्दी
Svenska
开始
手册
开发
插件
API
首页
logologo
集群模式
概述
准备工作
Kubernetes 部署
运维流程
服务拆分
开发参考
Previous Page服务拆分

#插件开发

#背景问题

在单节点环境中,插件通常可以通过进程内状态、事件或任务来完成需求;而在集群模式下,同一插件可能同时运行在多个实例上,面临以下典型问题:

  • 状态一致性:配置或运行时数据若只存储在内存中,很难在实例间同步,容易出现脏读或重复执行。
  • 任务调度:长耗时任务若无明确排队与确认机制,会造成多个实例并发执行同一任务。
  • 竞争条件:涉及 schema 变更或资源分配时,需要序列化操作,避免出现并发写入导致的冲突。

NocoBase 核心在应用层预置了多种中间件接口,帮助插件在集群环境下复用统一能力。下面将结合源码介绍缓存、同步消息、消息队列与分布式锁的用法及最佳实践。

#解决方案

#缓存组件(Cache)

对于要保存在内存中的数据,建议使用系统内置的缓存组件进行管理。

  • 通过 app.cache 获取默认缓存实例。
  • Cache 提供 set/get/del/reset 等基础操作,还支持 wrap 与 wrapWithCondition 封装缓存逻辑,以及 mset/mget/mdel 等批量方法。
  • 集群部署时建议将共享数据放入具备持久化能力的存储(如 Redis),并合理设置 ttl,避免实例重启导致缓存丢失。

示例:plugin-auth 中的缓存初始化与使用

在插件中创建并使用缓存
// packages/plugins/@nocobase/plugin-auth/src/server/plugin.ts
async load() {
  this.cache = await this.app.cacheManager.createCache({
    name: 'auth',
    prefix: 'auth',
    store: 'redis',
  });

  await this.cache.wrap('token:config', async () => {
    const repo = this.app.db.getRepository('tokenPolicies');
    return repo.findOne({ filterByTk: 'default' });
  }, 60 * 1000);
}

#同步信号管理器(SyncMessageManager)

如果内存中的状态无法使用分布式缓存(如无法序列化),那么当状态随用户操作发生变化时,需要将变化通过同步信号通知到其他实例,以保持状态一致。

  • 插件基类已实现 sendSyncMessage,内部调用 app.syncMessageManager.publish 并自动为通道加上应用级前缀,避免通道冲突。
  • publish 可指定 transaction,消息会在数据库事务提交后再发送,保证状态与消息同步。
  • 通过 handleSyncMessage 处理其他实例发来的消息,可在 beforeLoad 阶段订阅,对配置变更、Schema 同步等场景十分适用。

示例:plugin-data-source-main 通过同步消息保持多节点 schema 一致

在插件内同步
export class PluginDataSourceMainServer extends Plugin {
  async handleSyncMessage(message) {
    if (message.type === 'syncCollection') {
      await this.app.db.getRepository('collections').load(message.collectionName);
    }
  }

  private sendSchemaChange(data, options) {
    this.sendSyncMessage(data, options); // 自动调用 app.syncMessageManager.publish
  }
}

#消息广播管理器(PubSubManager)

消息广播是同步信号的底层组件,也支持直接使用。当需要在实例间广播消息时,可通过该组件实现。

  • app.pubSubManager.subscribe(channel, handler, { debounce }) 可在实例间订阅通道;debounce 选项用于去抖动,避免重复广播导致的频繁回调。
  • publish 支持 skipSelf(默认 true)与 onlySelf,用于控制消息是否回发到本实例。
  • 需在应用启动前配置适配器(如 Redis、RabbitMQ 等),否则默认不会连接外部消息系统。

示例:plugin-async-task-manager 使用 PubSub 广播任务取消事件

广播任务取消信号
const channel = `${plugin.name}.task.cancel`;

await this.app.pubSubManager.subscribe(channel, async ({ id }) => {
  this.logger.info(`Task ${id} cancelled on other node`);
  await this.stopLocalTask(id);
});

await this.app.pubSubManager.publish(channel, { id: taskId }, { skipSelf: true });

#消息队列组件(EventQueue)

消息队列用于调度异步任务,适合处理长耗时或可重试的操作。

  • 通过 app.eventQueue.subscribe(channel, { idle, process, concurrency }) 声明消费者,process 返回 Promise,可使用 AbortSignal.timeout 控制超时。
  • publish 会自动补齐应用名前缀,并支持 timeout、maxRetries 等选项,默认适配内存队列,可根据需要切换到 RabbitMQ 等扩展适配器。
  • 在集群中,确保所有节点使用同一适配器,以避免任务在节点间割裂。

示例:plugin-async-task-manager 使用 EventQueue 调度任务

在队列中分发异步任务
this.app.eventQueue.subscribe(`${plugin.name}.task`, {
  concurrency: this.concurrency,
  idle: this.idle,
  process: async (payload, { signal }) => {
    await this.runTask(payload.id, { signal });
  },
});

await this.app.eventQueue.publish(`${plugin.name}.task`, { id: taskId }, { maxRetries: 3 });

#分布式锁管理器(LockManager)

在需要避免竞态操作时,可以使用分布式锁来序列化对资源的访问。

  • 默认提供基于进程的 local 适配器,可注册 Redis 等分布式实现;通过 app.lockManager.runExclusive(key, fn, ttl) 或 acquire/tryAcquire 控制并发。
  • ttl 用于兜底释放锁,防止异常情况下锁被永远持有。
  • 常见场景包括:Schema 变更、防止重复任务、限流等。

示例:plugin-data-source-main 使用分布式锁保护字段删除流程

序列化字段删除操作
const lockKey = `${this.name}:fields.beforeDestroy:${collectionName}`;
await this.app.lockManager.runExclusive(lockKey, async () => {
  await fieldModel.remove(options);
  this.sendSyncMessage({ type: 'removeField', collectionName, fieldName });
});

#开发建议

  • 内存状态一致性:尽量避免在开发中使用内存状态,改用缓存或同步消息保持状态一致。
  • 优先复用内置接口:统一使用 app.cache、app.syncMessageManager 等能力,避免在插件中重复实现跨节点通信逻辑。
  • 关注事务边界:带事务的操作应使用 transaction.afterCommit(syncMessageManager.publish 已内置)以保证数据与消息一致。
  • 制定退避策略:对于队列与广播任务,合理设置 timeout、maxRetries、debounce,防止在异常情况下产生新的流量洪峰。
  • 配套监控与日志:善用应用日志记录通道名称、消息载荷、锁 key 等信息,方便排查集群下的偶发问题。

通过以上能力,插件可以在不同实例间安全共享状态、同步配置、调度任务,满足集群部署场景下的稳定性与一致性要求。