logologo
Початок
Посібник
Розробка
Плагіни
API
English
简体中文
日本語
한국어
Deutsch
Français
Español
Português
Русский
Italiano
Türkçe
Українська
Tiếng Việt
Bahasa Indonesia
ไทย
Polski
Nederlands
Čeština
العربية
עברית
हिन्दी
Svenska
Початок
Посібник
Розробка
Плагіни
API
logologo
Кластерний режим
Огляд
Підготовка
Розгортання в Kubernetes
Процес експлуатації
Розподіл сервісів
Довідник розробника
Previous PageРозподіл сервісів
Повідомлення про переклад ШІ

Ця документація була автоматично перекладена штучним інтелектом.

#Розробка плагінів

#Передумови

У середовищі з одним вузлом плагіни зазвичай можуть виконувати свої завдання за допомогою внутрішньопроцесного стану, подій або завдань. Однак у кластерному режимі той самий плагін може одночасно працювати на кількох екземплярах, стикаючись з такими типовими проблемами:

  • Узгодженість стану: Якщо дані конфігурації або дані під час виконання зберігаються лише в пам'яті, їх важко синхронізувати між екземплярами, що може призвести до "брудного читання" або повторного виконання.
  • Планування завдань: Довготривалі завдання без чіткого механізму черги та підтвердження можуть виконуватися кількома екземплярами одночасно.
  • Умови конкуренції: Операції, що стосуються зміни схеми або розподілу ресурсів, потребують серіалізації, щоб уникнути конфліктів, спричинених паралельним записом.

Ядро NocoBase надає на рівні застосунку різні інтерфейси проміжного програмного забезпечення, щоб допомогти плагінам використовувати єдині можливості в кластерному середовищі. Нижче, з посиланням на вихідний код, ми розглянемо використання та найкращі практики кешування, синхронних повідомлень, черг повідомлень та розподілених блокувань.

#Рішення

#Компонент кешування (Cache)

Для даних, які потрібно зберігати в пам'яті, рекомендується використовувати вбудований компонент кешування системи.

  • Отримайте екземпляр кешу за замовчуванням через app.cache.
  • Cache надає базові операції, такі як set/get/del/reset, а також підтримує wrap та wrapWithCondition для інкапсуляції логіки кешування, а також пакетні методи, такі як mset/mget/mdel.
  • При розгортанні в кластері рекомендується розміщувати спільні дані в постійному сховищі (наприклад, Redis) та встановлювати відповідний ttl, щоб уникнути втрати кешу при перезапуску екземпляра.

Приклад: Ініціалізація та використання кешу в plugin-auth

Створення
// packages/plugins/@nocobase/plugin-auth/src/server/plugin.ts
async load() {
  this.cache = await this.app.cacheManager.createCache({
    name: 'auth',
    prefix: 'auth',
    store: 'redis',
  });

  await this.cache.wrap('token:config', async () => {
    const repo = this.app.db.getRepository('tokenPolicies');
    return repo.findOne({ filterByTk: 'default' });
  }, 60 * 1000);
}

#Менеджер синхронних повідомлень (SyncMessageManager)

Якщо стан у пам'яті не може бути керований за допомогою розподіленого кешу (наприклад, його неможливо серіалізувати), то при зміні стану внаслідок дій користувача необхідно сповіщати про ці зміни інші екземпляри за допомогою синхронного сигналу, щоб підтримувати узгодженість стану.

  • Базовий клас плагіна вже реалізував sendSyncMessage, який внутрішньо викликає app.syncMessageManager.publish та автоматично додає префікс рівня застосунку до каналу, щоб уникнути конфліктів каналів.
  • publish може вказувати transaction; повідомлення буде надіслано після фіксації транзакції бази даних, забезпечуючи синхронізацію стану та повідомлень.
  • Використовуйте handleSyncMessage для обробки повідомлень, що надходять від інших екземплярів. Підписка на етапі beforeLoad дуже підходить для сценаріїв, таких як зміни конфігурації та синхронізація схеми.

Приклад: plugin-data-source-main використовує синхронні повідомлення для підтримки узгодженості схеми між кількома вузлами

Синхронізація
export class PluginDataSourceMainServer extends Plugin {
  async handleSyncMessage(message) {
    if (message.type === 'syncCollection') {
      await this.app.db.getRepository('collections').load(message.collectionName);
    }
  }

  private sendSchemaChange(data, options) {
    this.sendSyncMessage(data, options); // Автоматично викликає app.syncMessageManager.publish
  }
}

#Менеджер широкомовних повідомлень (PubSubManager)

Широкомовна розсилка повідомлень є базовим компонентом синхронних сигналів і також підтримує пряме використання. Коли вам потрібно транслювати повідомлення між екземплярами, ви можете використовувати цей компонент.

  • app.pubSubManager.subscribe(channel, handler, { debounce }) можна використовувати для підписки на канал між екземплярами; опція debounce використовується для усунення деренчання (debounce), щоб уникнути частих зворотних викликів, спричинених повторними трансляціями.
  • publish підтримує skipSelf (за замовчуванням true) та onlySelf для контролю того, чи надсилається повідомлення назад до поточного екземпляра.
  • Перед запуском застосунку необхідно налаштувати адаптер (наприклад, Redis, RabbitMQ тощо), інакше за замовчуванням він не підключатиметься до зовнішньої системи обміну повідомленнями.

Приклад: plugin-async-task-manager використовує PubSub для трансляції подій скасування завдань

Трансляція
const channel = `${plugin.name}.task.cancel`;

await this.app.pubSubManager.subscribe(channel, async ({ id }) => {
  this.logger.info(`Task ${id} cancelled on other node`);
  await this.stopLocalTask(id);
});

await this.app.pubSubManager.publish(channel, { id: taskId }, { skipSelf: true });

#Компонент черги подій (EventQueue)

Черга повідомлень використовується для планування асинхронних завдань, що підходить для обробки довготривалих або операцій, які можна повторити.

  • Оголосіть споживача за допомогою app.eventQueue.subscribe(channel, { idle, process, concurrency }). process повертає Promise, і ви можете використовувати AbortSignal.timeout для контролю часу очікування.
  • publish автоматично додає префікс імені застосунку та підтримує такі опції, як timeout і maxRetries. За замовчуванням він адаптований до черги в пам'яті, але за потреби його можна переключити на розширені адаптери, такі як RabbitMQ.
  • У кластері переконайтеся, що всі вузли використовують один і той же адаптер, щоб уникнути фрагментації завдань між вузлами.

Приклад: plugin-async-task-manager використовує EventQueue для планування завдань

Розподіл
this.app.eventQueue.subscribe(`${plugin.name}.task`, {
  concurrency: this.concurrency,
  idle: this.idle,
  process: async (payload, { signal }) => {
    await this.runTask(payload.id, { signal });
  },
});

await this.app.eventQueue.publish(`${plugin.name}.task`, { id: taskId }, { maxRetries: 3 });

#Менеджер розподілених блокувань (LockManager)

Коли потрібно уникнути умов конкуренції, можна використовувати розподілене блокування для серіалізації доступу до ресурсу.

  • За замовчуванням надається local адаптер на основі процесу. Ви можете зареєструвати розподілені реалізації, такі як Redis; керуйте паралелізмом за допомогою app.lockManager.runExclusive(key, fn, ttl) або acquire/tryAcquire.
  • ttl використовується як запобіжник для звільнення блокування, запобігаючи його постійному утриманню у виняткових випадках.
  • Типові сценарії включають: зміни схеми, запобігання дублюванню завдань, обмеження швидкості тощо.

Приклад: plugin-data-source-main використовує розподілене блокування для захисту процесу видалення полів

Серіалізація
const lockKey = `${this.name}:fields.beforeDestroy:${collectionName}`;
await this.app.lockManager.runExclusive(lockKey, async () => {
  await fieldModel.remove(options);
  this.sendSyncMessage({ type: 'removeField', collectionName, fieldName });
});

#Рекомендації щодо розробки

  • Узгодженість стану в пам'яті: Намагайтеся уникати використання стану в пам'яті під час розробки. Замість цього використовуйте кешування або синхронні повідомлення для підтримки узгодженості стану.
  • Пріоритет повторного використання вбудованих інтерфейсів: Використовуйте єдині можливості, такі як app.cache та app.syncMessageManager, щоб уникнути повторної реалізації логіки міжвузлової комунікації в плагінах.
  • Звертайте увагу на межі транзакцій: Операції з транзакціями повинні використовувати transaction.afterCommit (який вже інтегрований у syncMessageManager.publish), щоб забезпечити узгодженість даних та повідомлень.
  • Розробіть стратегію відступу (backoff strategy): Для завдань у черзі та широкомовних завдань встановлюйте відповідні значення timeout, maxRetries та debounce, щоб запобігти виникненню нових піків трафіку у виняткових ситуаціях.
  • Використовуйте супутній моніторинг та журналювання: Ефективно використовуйте журнали застосунку для запису імен каналів, корисних навантажень повідомлень, ключів блокувань тощо, щоб полегшити усунення періодичних проблем у кластері.

Завдяки цим можливостям плагіни можуть безпечно обмінюватися станом, синхронізувати конфігурації та планувати завдання між різними екземплярами, відповідаючи вимогам стабільності та узгодженості в сценаріях розгортання кластера.