logologo
Начало
Руководство
Разработка
Плагины
API
English
简体中文
日本語
한국어
Deutsch
Français
Español
Português
Русский
Italiano
Türkçe
Українська
Tiếng Việt
Bahasa Indonesia
ไทย
Polski
Nederlands
Čeština
العربية
עברית
हिन्दी
Svenska
Начало
Руководство
Разработка
Плагины
API
logologo
Кластерный режим
Обзор
Подготовка
Развертывание в Kubernetes
Процессы эксплуатации
Разделение сервисов
Справочник для разработчиков
Previous PageРазделение сервисов
Уведомление о переводе ИИ

Эта документация была автоматически переведена ИИ.

#Разработка плагинов

#Проблематика

В одноузловой среде плагины обычно могут выполнять свои задачи, используя внутрипроцессное состояние, события или задачи. Однако в кластерном режиме один и тот же плагин может одновременно работать на нескольких экземплярах, сталкиваясь со следующими типичными проблемами:

  • Согласованность состояния: Если данные конфигурации или данные времени выполнения хранятся только в памяти, их сложно синхронизировать между экземплярами, что может привести к «грязному чтению» или повторному выполнению операций.
  • Планирование задач: Без четкого механизма постановки в очередь и подтверждения длительные задачи могут выполняться одновременно несколькими экземплярами.
  • Состояния гонки: Операции, связанные с изменением схемы или распределением ресурсов, требуют сериализации для предотвращения конфликтов, вызванных одновременной записью.

Ядро NocoBase предоставляет на уровне приложения различные интерфейсы промежуточного ПО, которые помогают плагинам использовать унифицированные возможности в кластерной среде. Ниже мы рассмотрим использование и лучшие практики для кэширования, синхронных сообщений, очередей сообщений и распределенных блокировок, с примерами из исходного кода.

#Решения

#Компонент кэширования (Cache)

Для данных, которые необходимо хранить в памяти, рекомендуется использовать встроенный компонент кэширования системы.

  • Получите экземпляр кэша по умолчанию через app.cache.
  • Cache предоставляет базовые операции, такие как set/get/del/reset, а также поддерживает wrap и wrapWithCondition для инкапсуляции логики кэширования, и пакетные методы, такие как mset/mget/mdel.
  • При развертывании в кластере рекомендуется размещать общие данные в постоянном хранилище (например, Redis) и устанавливать разумное значение ttl, чтобы предотвратить потерю кэша при перезапуске экземпляра.

Пример: Инициализация и использование кэша в plugin-auth

Создание
// packages/plugins/@nocobase/plugin-auth/src/server/plugin.ts
async load() {
  this.cache = await this.app.cacheManager.createCache({
    name: 'auth',
    prefix: 'auth',
    store: 'redis',
  });

  await this.cache.wrap('token:config', async () => {
    const repo = this.app.db.getRepository('tokenPolicies');
    return repo.findOne({ filterByTk: 'default' });
  }, 60 * 1000);
}

#Менеджер синхронных сообщений (SyncMessageManager)

Если состояние в памяти не может быть управляемо с помощью распределенного кэша (например, его невозможно сериализовать), то при изменении состояния в результате действий пользователя необходимо уведомить об этих изменениях другие экземпляры с помощью синхронного сигнала для поддержания согласованности состояния.

  • Базовый класс плагина уже реализует sendSyncMessage, который внутренне вызывает app.syncMessageManager.publish и автоматически добавляет префикс уровня приложения к каналу, чтобы избежать конфликтов каналов.
  • Метод publish может указывать transaction, и сообщение будет отправлено после фиксации транзакции базы данных, что гарантирует синхронизацию состояния и сообщения.
  • Обрабатывайте сообщения, поступающие от других экземпляров, с помощью handleSyncMessage. Подписка на сообщения на этапе beforeLoad очень подходит для сценариев, таких как изменение конфигурации и синхронизация схемы.

Пример: plugin-data-source-main использует синхронные сообщения для поддержания согласованности схемы между несколькими узлами

Синхронизация
export class PluginDataSourceMainServer extends Plugin {
  async handleSyncMessage(message) {
    if (message.type === 'syncCollection') {
      await this.app.db.getRepository('collections').load(message.collectionName);
    }
  }

  private sendSchemaChange(data, options) {
    this.sendSyncMessage(data, options); // Автоматически вызывает app.syncMessageManager.publish
  }
}

#Менеджер публикации/подписки (PubSubManager)

Широковещательная рассылка сообщений является базовым компонентом синхронных сигналов и также может использоваться напрямую. Когда вам нужно транслировать сообщения между экземплярами, вы можете реализовать это с помощью данного компонента.

  • app.pubSubManager.subscribe(channel, handler, { debounce }) позволяет подписываться на канал между экземплярами; опция debounce используется для устранения дребезга, предотвращая частые обратные вызовы, вызванные повторной трансляцией.
  • Метод publish поддерживает skipSelf (по умолчанию true) и onlySelf для управления тем, будет ли сообщение отправлено обратно текущему экземпляру.
  • Адаптер (например, Redis, RabbitMQ и т. д.) должен быть настроен до запуска приложения; в противном случае по умолчанию не будет установлено соединение с внешней системой обмена сообщениями.

Пример: plugin-async-task-manager использует PubSub для трансляции событий отмены задач

Трансляция
const channel = `${plugin.name}.task.cancel`;

await this.app.pubSubManager.subscribe(channel, async ({ id }) => {
  this.logger.info(`Task ${id} cancelled on other node`);
  await this.stopLocalTask(id);
});

await this.app.pubSubManager.publish(channel, { id: taskId }, { skipSelf: true });

#Компонент очереди событий (EventQueue)

Очередь сообщений используется для планирования асинхронных задач и подходит для обработки длительных или повторяемых операций.

  • Объявите потребителя с помощью app.eventQueue.subscribe(channel, { idle, process, concurrency }). Метод process возвращает Promise, и вы можете использовать AbortSignal.timeout для контроля времени ожидания.
  • Метод publish автоматически добавляет префикс имени приложения и поддерживает такие опции, как timeout и maxRetries. По умолчанию он использует адаптер очереди в памяти, но при необходимости может быть переключен на расширенные адаптеры, такие как RabbitMQ.
  • В кластере убедитесь, что все узлы используют один и тот же адаптер, чтобы избежать фрагментации задач между узлами.

Пример: plugin-async-task-manager использует EventQueue для планирования задач

Распределение
this.app.eventQueue.subscribe(`${plugin.name}.task`, {
  concurrency: this.concurrency,
  idle: this.idle,
  process: async (payload, { signal }) => {
    await this.runTask(payload.id, { signal });
  },
});

await this.app.eventQueue.publish(`${plugin.name}.task`, { id: taskId }, { maxRetries: 3 });

#Менеджер распределенных блокировок (LockManager)

Когда необходимо избежать состояний гонки, вы можете использовать распределенную блокировку для сериализации доступа к ресурсу.

  • По умолчанию предоставляется адаптер local на основе процессов. Вы можете зарегистрировать распределенные реализации, такие как Redis; управляйте параллелизмом с помощью app.lockManager.runExclusive(key, fn, ttl) или acquire/tryAcquire.
  • ttl используется для гарантированного снятия блокировки, предотвращая ее постоянное удержание в исключительных ситуациях.
  • Типичные сценарии включают: изменение схемы, предотвращение дублирования задач, ограничение скорости запросов и т. д.

Пример: plugin-data-source-main использует распределенную блокировку для защиты процесса удаления полей

Сериализация
const lockKey = `${this.name}:fields.beforeDestroy:${collectionName}`;
await this.app.lockManager.runExclusive(lockKey, async () => {
  await fieldModel.remove(options);
  this.sendSyncMessage({ type: 'removeField', collectionName, fieldName });
});

#Рекомендации по разработке

  • Согласованность состояния в памяти: Старайтесь избегать использования состояния в памяти при разработке. Вместо этого используйте кэширование или синхронные сообщения для поддержания согласованности состояния.
  • Приоритет повторного использования встроенных интерфейсов: Используйте унифицированные возможности, такие как app.cache, app.syncMessageManager и другие, чтобы избежать повторной реализации логики межузлового взаимодействия в плагинах.
  • Внимание к границам транзакций: Операции с транзакциями должны использовать transaction.afterCommit (встроенный в syncMessageManager.publish) для обеспечения согласованности данных и сообщений.
  • Разработайте стратегию отката (backoff): Для задач в очередях и широковещательных задач разумно устанавливайте значения timeout, maxRetries и debounce, чтобы предотвратить новые пики трафика в исключительных ситуациях.
  • Сопутствующий мониторинг и логирование: Эффективно используйте журналы приложения для записи имен каналов, содержимого сообщений, ключей блокировок и другой информации, чтобы упростить устранение периодически возникающих проблем в кластере.

Благодаря этим возможностям плагины могут безопасно обмениваться состоянием, синхронизировать конфигурации и планировать задачи между различными экземплярами, отвечая требованиям стабильности и согласованности в сценариях кластерного развертывания.