REGISTER NOW
Monday • 07.11.22 • 15:07
10 minutes for reading
Yevhenii Trishyn
Magento Senior Full-Stack Developer | IT Delight
Back-EndMagento

Using Redis as Message Broker in Magento 2

In this post, I would like to discuss managing message queues for Magento2. We would rather move away from the usual documentation reading and try to delve into the process of working with message brokers following the case of adding new functionality – that’s using Redis as Message broker in Magento2. As you know, Magento supports MySql and RabbitMQ-based message queues out of the box. MySQL is a fallback feature because RabbitMQ is not a hard dependency for Magento2 (like currently, Elasticsearch is). This topic may attract the attention of many developers since the functionality of managing queues is increasingly being integrated into Magento2 core modules.

For this purpose, a new module will soon be released, that allows you to add a Redis section to the queue section in env.php file, in the queues’ configuration files using Redis connection type, and use Redis as a message broker. To implement this functionality, we will use colinmollenhour/credis which is already available in Magento2 and is used to work with the cache and sessions.

As an implementation example, you can take either the Magento/MySQL-MQ or Magento AMQP module, The Magento_AmqpStore module provides functionality for message queues processing. In this case, Magento / MySQL-MQ is more suitable for us, since Redis does not implement the AMQP protocol.

Let’s jump right into writing the code and adding our functionality. To do this, we will create a module, which we will call ITDelight\RedisQueue – it will contain the implementation of functions specific to Redis. Here I would like to explain that, as mentioned above, we will use the Redis client, which is already available in Magento 2 – the colinmollenhour/credis library. In Magento 2 it can be used for both caching and session store scenarios. Thus, in our module, I will add a client that provides an implementation of methods for working with Redis-based queues. This client goes beyond this article, as it is not specific to Magento2, so its code will not be shown here. Its code can be explored in the GitHub repository, which will be listed at the end of the post.

GitHub repository

When it comes to Magento2-specific functionality, then the first thing to add is a class that implements \Magento\Framework\MessageQueue\ConnectionTypeResolverInterface. This class is guaranteed to return the name of the required connection. So, it will look like as following:

<?php
declare(strict_types=1);

namespace ITDelight\RedisQueue\Model;

use Magento\Framework\MessageQueue\ConnectionTypeResolverInterface;

class ConnectionTypeResolver implements ConnectionTypeResolverInterface
{
   /**
    * DB connection names.
    *
    * @var string[]
    */
   private array $dbConnectionNames;

   /**
    * Initialize dependencies.
    *
    * @param string[] $dbConnectionNames
    */
   public function __construct(array $dbConnectionNames = [])
   {
       $this->dbConnectionNames = $dbConnectionNames;
       $this->dbConnectionNames[] = 'redis';
   }

   /**
    * @param string $connectionName
    *
    * @return string|null
    */
   public function getConnectionType($connectionName): ?string
   {
       return in_array($connectionName, $this->dbConnectionNames) ? 'redis' : null;
   }
}

At this stage, this class should be integrated into Magento2. To do this, via di.xml file, we will input this class to the resolvers argument of the Magento\Framework\MessageQueue\ConnectionTypeResolver.

<type name="Magento\Framework\MessageQueue\ConnectionTypeResolver">
   <arguments>
       <argument name="resolvers" xsi:type="array">
           <item name="redis" xsi:type="object">ITDelight\RedisQueue\Model\ConnectionTypeResolver</item>
       </argument>
   </arguments>
</type>

After that, you need to add the Exchange class. It directly invokes the Redis client for the queue and passes the queue name and message as arguments. It must implement the following interface Magento\Framework\MessageQueue\ExchangeInterface.

This class should be like this:

class Exchange implements ExchangeInterface
{
   /**
    * Exchange constructor.
    * @param MessageQueueConfig $queueConfig
    * @param ConnectionTypeResolver $connectionTypeResolver
    * @param Queue $queueClient
    */
   public function __construct(
       private MessageQueueConfig $queueConfig,
       private ConnectionTypeResolver $connectionTypeResolver,
       private Queue $queueClient
   ) {
   }

   /**
    * @param string $topic
    * @param EnvelopeInterface $envelope
    * @return null|array
    */
   public function enqueue($topic, EnvelopeInterface $envelope): ?array
   {
       $queueNames = [];
       $exchanges = $this->queueConfig->getExchanges();
       foreach ($exchanges as $exchange) {
           $connection = $exchange->getConnection();
           if ($this->connectionTypeResolver->getConnectionType($connection)) {
               foreach ($exchange->getBindings() as $binding) {
                   if ($binding->getTopic() === $topic) {
                       $queueNames[] = $binding->getDestination();
                   }
               }
           }
       }
       $data = [
           'body' => $envelope->getBody(),
           'properties' => $envelope->getProperties()
       ];
       foreach ($queueNames as $queueName) {
           $this->queueClient->enqueue($queueName, json_encode($data));
       }

       return null;
   }
}

This class implements the enqueue method and accepts 2 arguments:

  • $topic is the name of the topic, which is used to find the required queue;
  • $envelop is an object that implements Magento\Framework\MessageQueue\EnvelopeInterface and is used to pass data when adding information to a queued message.

In order to integrate this Exchange object into the Magento2 logic, you need to define an ExchangeFactory, which will be added to the logical process of object instantiation. This is a simple implementation of the factory method pattern and will be as follows:

class ExchangeFactory implements ExchangeFactoryInterface
{
   /**
    * ExchangeFactory constructor.
    * @param ObjectManagerInterface $objectManager
    * @param string $exchangeModelClass
    */
   public function __construct(
       private ObjectManagerInterface $objectManager,
       private string $exchangeModelClass = Exchange::class
   ) {
   }

   /**
    * @param string $connectionName
    * @param array $data
    * @return ExchangeInterface
    */
   public function create($connectionName, array $data = []): ExchangeInterface
   {
       return $this->objectManager->create($this->exchangeModelClass, $data);
   }
}

To implement this class into the standard Magento2 mechanism, you have to add it to the exchangeFactories argument of the \Magento\Framework\MessageQueue\ExchangeFactory class. Also, it can be done via di.xml:

<type name="Magento\Framework\MessageQueue\ExchangeFactory">
   <arguments>
       <argument name="exchangeFactories" xsi:type="array">
           <item name="redis" xsi:type="object">ITDelight\RedisQueue\Model\Driver\ExchangeFactory</item>
       </argument>
   </arguments>
</type>

Just to be clear, we use the concept of Exchange because initially Magento2 queues were developed with a focus on AMQP protocol (and specifically RabbitMQ). It has no direct relation to concepts related to Redis.

And now, so that we can use all our module’s function components, we have to add the Publisher class, which implements Magento\Framework\MessageQueue\PublisherInterface.

This class will go like this:

class Publisher implements PublisherInterface
{
   /**
    * Publisher constructor.
    * @param MessageValidator $messageValidator
    * @param MessageEncoder $messageEncoder
    * @param EnvelopeFactory $envelopeFactory
    * @param PublisherConfig $publisherConfig
    * @param ExchangeRepository $exchangeRepository
    */
   public function __construct(
       private MessageValidator $messageValidator,
       private MessageEncoder $messageEncoder,
       private EnvelopeFactory $envelopeFactory,
       private PublisherConfig $publisherConfig,
       private ExchangeRepository $exchangeRepository
   ) {
   }

 /**
    * @param string $topicName
    * @param mixed $data
    * @throws LocalizedException
    */
   public function publish($topicName, $data): void
   {
       $this->messageValidator->validate($topicName, $data);
       $data = $this->messageEncoder->encode($topicName, $data);
       $envelope = $this->envelopeFactory->create([
           'body' => $data,
           'properties' => [
               'topic_name' => $topicName,
               'delivery_mode' => 2,
               'message_id' => md5(uniqid($topicName))
           ]
       ]);
       $publisher = $this->publisherConfig->getPublisher($topicName);

       $connectionName = $publisher->getConnection()->getName();
       $exchange = $this->exchangeRepository->getByConnectionName($connectionName);
       $exchange->enqueue($topicName, $envelope);
   }
}

Let’s take a closer look at this class. It uses Magento framework base classes. That is to say:

– Message Validator class – Magento\Framework\MessageQueue\MessageValidator

– Encoder class – Magento\Framework\MessageQueue\MessageEncoder

– Factories –  Magento\Framework\MessageQueue\Envelope.

Using these classes allows us to validate the message, convert its Data to a string, and create an Envelope object that will be used to transfer the data further to the client.

Next, $publisher variable contains an object that implements \Magento\Framework\MessageQueue\Publisher\Config\PublisherConfigItem interface. This object we get from Magento\Framework\MessageQueue\Publisher\ConfigInterface.

With this object, we get the connection name that can be used for this message. Then, using the Magento\Framework\MessageQueue\ExchangeRepository class, we get the $exchange object. This is an object of exactly the \ITDelight\RedisQueue\Model\Driver\Exchange class that we defined earlier. This ExchangeRepository class uses the Magento\Framework\MessageQueue\ExchangeFactory factory, we added all the necessary configuration to it through the configuration and now for the connection type equal Redis we got an object of the class we need. After that, the Enqueue public solution is called on, which adds the message to the queue.

Now we have a ready implementation that allows us to add messages to the queue – let’s add functionality to read messages from the queue.

To do this, we need to create a class \ITDelight\RedisQueue\Model\Driver\Queue, which implements the Magento\Framework\MessageQueue\QueueInterface interface. This class will be used when requesting a queue, when, for example, we run the console command queue:consumers: start.

Let’s give an example of the implementation of this class:

class Queue implements QueueInterface
{
   /**
    * Queue constructor.
    * @param Client $client
    * @param string $queueName
    * @param MessageEncoder $messageEncoder
    * @param EnvelopeFactory $envelopeFactory
    * @param LoggerInterface $logger
    * @param int $interval
    */
   public function __construct(
       private Client $client,
       private string $queueName,
       private MessageEncoder $messageEncoder,
       private EnvelopeFactory $envelopeFactory,
       private LoggerInterface $logger,
       private int $interval = 5

   ) {
   }

   /**
    * @return EnvelopeInterface|null
    */
   public function dequeue(): ?EnvelopeInterface
   {
       $envelope = null;
       $message = $this->client->dequeue($this->queueName);
       if ($message) {
           $message = json_decode($message, true);
           $envelope = $this->envelopeFactory->create([
               'body' => $message['body'],
               'properties' => $message['properties']
           ]);
       }

       return $envelope;
   }

   /**
    * @param EnvelopeInterface $envelope
    */
   public function acknowledge(EnvelopeInterface $envelope): void
   {
       $this->client->acknowledge($this->queueName);
   }

   /**
    * @param EnvelopeInterface $envelope
    */
   public function push(EnvelopeInterface $envelope): void
   {
       $this->client->enqueue($this->queueName, json_encode([
           'body' => $envelope->getBody(),
           'properties' => $envelope->getProperties()
       ]));
   }

   /**
    * @param EnvelopeInterface $envelope
    * @param bool $requeue
    * @param null $rejectionMessage
    */
   public function reject(EnvelopeInterface $envelope, $requeue = true, $rejectionMessage = null): void
   {
       $this->logger->debug('Message from queue ' . $this->queueName . ' was rejected');
   }

   /**
    * @param array|callable $callback
    */
   public function subscribe($callback): void
   {
       while (true) {
           while ($envelope = $this->dequeue()) {
               try {
                   // phpcs:ignore Magento2.Functions.DiscouragedFunction
                   call_user_func($callback, $envelope);
               } catch (\Exception $e) {
                   $this->reject($envelope);
               }
           }
           // phpcs:ignore Magento2.Functions.DiscouragedFunction
           sleep($this->interval);
       }
   }
}

In this class, the subscribe method is used to receive a message from the queue. If the message exists, it is converted and passed to the handler class, which is specified in the configuration, or rather in the queue_consumer.xml file. This subscribe method is called in the \Magento\Framework\MessageQueue\Consumer::process method. However, please keep in mind that the subscribe method is called when the max-messages parameter is not specified when starting the \Magento\MessageQueue\Console\StartConsumerCommand console command. In case this parameter is specified, then in the \Magento\Framework\MessageQueue\Consumer::process method, the \Magento\Framework\MessageQueue\CallbackInvokerInterface::invoke method is called, which takes a which takes an instance of our Queue class as arguments, the number of messages to be processed, the callback function that we specified in the configuration, and a few more optional parameters. In this cycle, this method also calls the dequeue method of our Queue class and passes the converted message to the callback function.

The \ITDelight\RedisQueue\Model\Driver\Queue::acknowledge method is called when a message from the queue is successfully processed. 

The \ITDelight\RedisQueue\Model\Driver\Queue::reject method is called in case of unsuccessful message processing – here it is possible to implement a message return to the queue if this is implied by the needs of your functionality. 

The \ITDelight\RedisQueue\Model\Driver\Queue::push method is used to add a message to the queue. Now we need to add our \ITDelight\RedisQueue\Model\Driver\Queue class to the Magento2 functionality. To do this, you still need to add the QueueFactory class. It might look like this:

class QueueFactory implements QueueFactoryInterface
{
   /**
    * QueueFactory constructor.
    * @param ObjectManagerInterface $objectManager
    * @param string $className
    */
   public function __construct(
       private ObjectManagerInterface $objectManager,
       private string $className = Queue::class
   ) {
   }

   /**
    * @param string $queueName
    * @param string $connectionName
    * @return QueueInterface
    */
   public function create($queueName, $connectionName): QueueInterface
   {
       return $this->objectManager->create(
           $this->className,
           [
               'queueName' => $queueName,
               'connectionName' => $connectionName
           ]
       );
   }
}

As before with the Exchange class – this factory simply implements the factory method pattern and is necessary to comply with the logic of Magento2 functional contracts. Next, this factory is added to the queueFactories argument of the Magento\Framework\MessageQueue\QueueFactory composite factory.

<type name="Magento\Framework\MessageQueue\QueueFactory">
   <arguments>
       <argument name="queueFactories" xsi:type="array">
           <item name="redis" xsi:type="object">ITDelight\RedisQueue\Model\Driver\QueueFactory</item>
       </argument>
   </arguments>
</type>

Here, we also specify the key ‘redis’, since the connection name is a common key in the queue functionality.

Also, it should be specified that Magento 2 has bulk functionality. That is why we need to add the implementation of our functionality. This can be also done through di.xml. Similar to the usual implementation, you need to implement the Exchange class, but when adding the bulk functionality you should implement Magento\Framework\MessageQueue\Bulk\ExchangeInterface. The difference with the usual functionality is that the enqueue method takes arguments public function enqueue(string $topic, array $envelopes).

<virtualType name="ITDelight\RedisQueue\Model\Driver\Bulk\ExchangeFactory"
            type="ITDelight\RedisQueue\Model\Driver\ExchangeFactory">
   <arguments>
       <argument name="instanceName" xsi:type="string">ITDelight\RedisQueue\Model\Driver\Bulk\Exchange</argument>
   </arguments>
</virtualType>
<type name="Magento\Framework\MessageQueue\Bulk\ExchangeFactory">
   <arguments>
       <argument name="exchangeFactories" xsi:type="array">
           <item name="redis" xsi:type="object">ITDelight\RedisQueue\Model\Driver\Bulk\ExchangeFactory</item>
       </argument>
   </arguments>
</type>

Also, for integration into the bulk functionality, you need to add a separate publisher class (similarly to the main functionality).

<type name="Magento\Framework\MessageQueue\Bulk\PublisherPool">
   <arguments>
       <argument name="publishers" xsi:type="array">
           <item name="async" xsi:type="array">
               <item name="redis" xsi:type="object">ITDelight\RedisQueue\Model\Publisher\Bulk</item>
           </item>
       </argument>
   </arguments>
</type>

And finally, we need to create the Recurring class, which is called when the setup:upgrade console command is running and it will create queues in the broker. To do this, in our module we will add the class \ITDelight\RedisQueue\Setup\Recurring

class Recurring implements InstallSchemaInterface
{
   /**
    * Recurring constructor.
    * @param MessageQueueConfig $messageQueueConfig
    * @param Queue $client
    * @param LoggerInterface $logger
    */
   public function __construct(
       private MessageQueueConfig $messageQueueConfig,
       private Queue $client,
       private LoggerInterface $logger
   ) {
   }

   /**
    * @param SchemaSetupInterface $setup
    * @param ModuleContextInterface $context
    */
   public function install(SchemaSetupInterface $setup, ModuleContextInterface $context): void
   {
       $queues = [];
       foreach ($this->messageQueueConfig->getQueues() as $queue) {
           $queues[] = $queue->getName();
       }

       foreach ($queues as $queue) {
           try {
               $this->client->createQueue($queue);
           } catch (LocalizedException $e) {
               $this->logger->debug($e->getMessage());
           }
       }
   }
}

Now we can say that our functionality for working with the queue is ready and integrated into the system. Now let’s give an example and see how we can use this functionality. For this, we are going to add a new module that will contain minimal functionality, demonstrating the potential of releasing and receiving messages using Redis.

Let’s first talk a little about how queues work in general.

If you want to add some operations with queues to your module, then you need to add the Magento\Framework\MessageQueue\PublisherInterface object to your class and call the publish method request.

$publisher->publish($topic, $message)

You also need to add some configuration:

  • add the connection configuration to the env.php file (it can be DB or AMQP or Redis in our case);
  • in the module, add the queue_consumer.xml file indicating the class that will process the messages from the queue;
  • in the module, add the queue_publisher.xml file indicating the type of connection and exchange for the topic;
  • communication.xml file, where we specify what type of data will be transmitted in the request.

Now with all this configuration, we can start using queues in a Magento2 application.

Let’s take a look at an example of creating a module to see how we can use our new Redis-based queuing functionality.

Let’s create the ITDelight\QueueExample module. This module will contain the code that will use our new functionality. First, let’s add the configuration to the env.php file in the queue section.

'redis' => [
   'host' => 'redis',
   'port' => '6379',
   'db' => '0'
]

Now let’s add a communication.xml file to our new module. Its content may be as follows:

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
   <topic name="itd_test_queue" request="string"/>
</config>

Here we specify the type of messages to be sent that will be connected to our queue.

Now let’s add the queue_consumer.xml file

Thus, we specify the class that will receive messages from the queue.

Now let’s add the queue_publisher.xml file

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue_consumer.xsd">
   <consumer
       name="itd_test_queue"
       queue="itd_test_queue"
       connection="redis"
       handler="ITDelight\QueueExample\Model\MessageQueues\Consumer::processMessage"/>
</config>

In this file, you should pay attention to the topic. This value will indicate later the required queue and the name of the Redis connection. This type of connection is specific to our case.

Let’s add the configuration file queue_topology.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue_topology.xsd">
   <exchange name="itd_test_queue" type="topic" connection="redis">
       <binding id="itd_test_queue" topic="itd_test_queue"
                destinationType="queue" destination="itd_test_queue"/>
   </exchange>
</config>

If you look above, you will see that this configuration is used in the Exchange class to find the correct queue to insert the message.

Now all the configuration files are ready and you can add a message to the queue.

So, we will add a console command that will use the PublisherInterface object to queue a message. And the code for this class is:

class PublishMessage extends Command
{
   private const TOPIC_NAME = 'itd_test_queue';

   public function __construct(
       private PublisherInterface $publisher,
       string $name = null
   ) {
       parent::__construct($name);
   }

   protected function configure(): void
   {
       $this->setName('itd:queue:test');
       $this->setDescription('Test redis queue functionality');
   }

   protected function execute(InputInterface $input, OutputInterface $output): int
   {
       $this->publisher->publish(self::TOPIC_NAME, 'Test message content');

       return Cli::RETURN_SUCCESS;
   }
}

Also, this console command must be registered in the system via di.xml

<type name="Magento\Framework\Console\CommandListInterface">
   <arguments>
       <argument name="commands" xsi:type="array">
           <item name="test_queue" xsi:type="object">ITDelight\QueueExample\Console\Command\PublishMessage</item>
       </argument>
   </arguments>
</type>

Now we only have to run the setup: upgrade the command, which is going to create a queue in Redis, and we can run the itd:queue:test command, which will add the message to the queue.

You can view messages in the queue using redis-cli, for example.

Let’s add a class to receive our messages (consumer). It should be like this:

<?php
declare(strict_types=1);

namespace ITDelight\QueueExample\Model\MessageQueues;

class Consumer
{
   public function processMessage(string $message): void
   {

   }
}

In this method, we just receive our message. To set it right, you need to call the queue:consumers:start command.

At this stage, we can say that our functionality is completed. I hope that the article was helpful to you. This post can be considered as a kind of excursion into the logic of the Magento2 queue functionality and as a kind of tutorial when you want to add your custom message broker.

Once again, please pay attention that this post is only instructive. Although I am no stranger to Redis Use Cases as the main message broker in real projects, nevertheless, in Magento2, you should pay your attention first of all to RabbitMQ, as the main broker option messages.

Back

Best authors

Sidovolosyi Bogdan
Dmitry Rybkin
Admin Pro Magento
Alexander Galich and Stanislav Matyavin
Alexander Galich
Yevhenii Trishyn
Abramchenko Anton
Sidovolosyi Bohdan
Shuklin Alexander

Similar topics

  • Advanced JS bundling
  • Backend
  • e-commerce
  • graphics
  • Hyvä
  • Hyvä compatible module
  • Hyvä compatible modules for Magento 2
  • Hyvä Styles
  • indexes
  • Integration Tests
  • Jquery Compat
  • JS
  • JS loading
  • loaded JS files
  • Magento 2
  • Magento 2 indexes
  • Magento 2 theme
  • Magento Functional Testing Framework
  • Magento JS bundling
  • Magento JS merge
  • Magento layouts
  • Magento Templates
  • Magento2
  • MagePack bundling
  • Message Broker
  • MySql
  • optimization
  • PhpStorm
  • Pricing
  • Pricing types
  • product prices
  • proxy classes
  • RabbitMQ
  • Redis
  • Redis \
  • Reduce JS size
  • Tailwind CSS
  • Testing Framework
  • Year in Review
  • Оptimization