Shopware 6 | Message queue sinnvoll aufteilen

Wer Froshtools nutzt kennt sicher das Ampelsystem oben links neben der Version. Dies zeigt an wenn Scheduled Tasks oder Message queues lange nicht gelaufen sind. Das passiert besonders dann wenn zum Beispiel größere Produkt Exporte mit einzelnen Messages zum re-indexieren die Abarbeitung anderer Messages wie die vom Scheduled Task blockieren. Abhilfe schafft hier die Nutzung mehrerer Queues.

Oh oh hier läuft was nicht sauber
Mal wieder eine geplante Aufgabe nicht in Time abgearbeitet

Vorraussetzung

Ich gehe in diesem Artikel nicht auf die Einrichtung der Message queue mit einem Provider wie RabbitMQ ein. Diese erfolgt üblicherweise über:

# config/packages/prod/enqueue.yaml
enqueue:
  amqp:
    transport: '%env(AMQP_URL)%'
    client: ~

Ihr solltet also einen AMQP Transport bereits schon laufen haben. Hier müsst Ihr schauen wo Ihr eventuell bereits euren messenger transport definiert habt, bei uns liegt das unter config/packages/prod/framework.yaml – wir wollen diese config logischerweise nicht im dev enviroment haben und unter prod wir diese config nur dort angewandt.

framework.yaml

# config/packages/prod/framework.yaml
framework:
  messenger:
    transports:
      prio_med:
        dsn: '%env(AMQP_URL)%'
        options:
          exchange:
            name: prio_med
            type: direct
            default_publish_routing_key: prio_med
          queues:
            prio_med:
              binding_keys: [ prio_med ]

      prio_high:
        dsn: '%env(AMQP_URL)%'
        options:
          exchange:
            name: prio_high
            type: direct
            default_publish_routing_key: prio_high
          queues:
            prio_high:
              binding_keys: [ prio_high ]
      default:
        dsn: '%env(AMQP_URL)%'
        options:
          exchange:
            name: messages
            type: direct
            default_publish_routing_key: messages
          queues:
            messages:
              binding_keys: [ messages ]

    routing:
      # External plugins      'NetInventors\NetiNextEasyCoupon\Core\Content\Indexer\OrderTransactionIndexer': [ prio_med ]
      'Shopware\Core\Content\ProductExport\ScheduledTask\ProductExportGenerateTask': [ prio_med ]
      'Shopware\Core\Content\Sitemap\ScheduledTask\SitemapGenerateTask': [ prio_med ]

            'Shopware\Core\Content\Category\DataAbstractionLayer\CategoryIndexer': [ prio_high ]
      'Shopware\Core\Framework\Adapter\Cache\InvalidateCacheTask': [ prio_high ]

In dieser Datei definieren wir mehrer Dinge:

Transport Name

Dieser ist wichtig wenn wir per supervisor / systemd oder cli queues comsumen möchten. Tun wir dies nicht, erscheint ein interactives Menü in dem wir auswählen müssen:

messenger:consume interactive

Das wollen wir natürlich gerade bei den Prozessmanagern wie supervisor nicht und das Problem ist erstmal unbemerkt.

Sprich Ihr tragt dort

bin/console messenger:consume default
bin/console messenger:consume prio_high
bin/console messenger:consume prio_med

ein. Unsere Supervisor conf sieht dann ungefähr wie folgt aus:

[program:messenger-consume-default-yonc-at]
command=/usr/bin/php8.0 /var/www/shopware/PROJECT/current/bin/console messenger:consume default --time-limit=60 --memory-limit=512M
user=www-data
numprocs=2
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d

Queue

Den Namen der queue welche Ihr eventuell anlegen müsst. Existiert dieser nicht, wird er einfach angelegt. Das passiert meines wissens bei den Default settings nicht. Ich würde empfehlen erstmal in RabbitMQ oder LavinMQ nichts anzulegen und zu monitoren was passiert.

Exchange

Im Standard gibt es bereits einige Exchanges, diese müssen wie die queues entsprechend um drei erweitert werden:

Routing

Der wohl wichtigste Part der config ist routing, dort legen wir fest welche Messages wohin geleitet werden sollen. Dort rate ich dazu externe Plugins zu markieren / kommentieren. Für uns war in diesem Fall zum Beispiel wichtig dass das Cache invalidieren und die Produktexporte in die Prio High kommen und damit immer abgearbeitet werden.

Wichtig

Alles was Ihr nicht definiert kommt automatisch in die queue default – sprich Ihr solltet hier nur aufnehmen was priorisiert werden muss.

Schön zu sehen ist hier die Cache Invalidierung jede 20sek

Consumer

Wie oben bereits erwähnt braucht Ihr dann natürlich auch 3 Consumer, hier 3 Beispiele für supervisor. Der Name von queue und exchange ist übrigens egal. Ihr könnt diese auch high und med nennen oder auch nur eine hinzufügen (urgent)

[program:messenger-consume-default-shop]
command=/usr/bin/php8.0 /var/www/shopware/SHOP/current/bin/console messenger:consume default --time-limit=60 --memory-limit=512M
user=www-data
numprocs=1
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d

[program:messenger-consume-high-shop]
command=/usr/bin/php8.0 /var/www/shopware/SHOP/current/bin/console messenger:consume prio_high --time-limit=60 --memory-limit=512M
user=www-data
numprocs=1
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d

[program:messenger-consume-med-shop]
command=/usr/bin/php8.0 /var/www/shopware/SHOP/current/bin/console messenger:consume prio_med --time-limit=60 --memory-limit=512M
user=www-data
numprocs=1
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d

Fragen an euch

Gerne in die Kommentare 🙂

Nutzt Ihr für die Message queue Redis, RabbitMQ oder andere Transports?

Wie viele Messages verarbeitet Ihr so pro Monat?

Splittet Ihr Eure Messages in verschiedene Queues?

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert