Как записать сообщение в очередь rabbit из обработчика?

Для записи сообщения в очередь RabbitMQ из обработчика Symfony вам потребуется использовать RabbitMQ Bundle, который предоставляет удобный интерфейс для работы с RabbitMQ. Разберем шаги, которые необходимо выполнить:

1. Установите RabbitMQ Bundle, выполнив команду composer require enqueue/bundle.
2. Подключите RabbitMQ Bundle, добавив его в файл config/bundles.php вашего проекта:

<?php
// config/bundles.php

return [
    // ...
    EnqueueBundleEnqueueBundle::class => ['all' => true],
    // ...
];

3. Создайте файл config/packages/enqueue.yaml и настройте параметры подключения к RabbitMQ:

# config/packages/enqueue.yaml

enqueue:
    client:
        transport:
            default:
                dsn: "amqp://guest:guest@localhost:5672/%2f"

4. В обработчике, откуда вы хотите отправить сообщение в очередь, внедрите сервис enqueue.client.default:

<?php
// src/EventListener/MyEventSubscriber.php

namespace AppEventListener;

use EnqueueClientClientInterface;
use SymfonyComponentEventDispatcherEventSubscriberInterface;
// ...

class MyEventSubscriber implements EventSubscriberInterface
{
    private $enqueueClient;

    public function __construct(ClientInterface $enqueueClient)
    {
        $this->enqueueClient = $enqueueClient;
    }

    public static function getSubscribedEvents()
    {
        return [
            // ...
        ];
    }

    public function myEventHandler()
    {
        // ...
        // отправка сообщения в очередь
        $this->enqueueClient->sendEvent('my_event', 'Hello, RabbitMQ!', [
            'exchange' => 'my_exchange',
            'routing_key' => 'my_routing_key',
        ]);
    }
}

В примере выше мы внедряем сервис enqueue.client.default и используем его для отправки события в очередь. Метод sendEvent() принимает следующие параметры:
- event - имя события, которое будет использоваться при обработке сообщения в очереди;
- message - само сообщение, которое будет отправлено;
- properties - дополнительные параметры сообщения, такие как exchange и routing_key.

5. Для обработки сообщений из очереди вам необходимо настроить консольный потребитель (consumer). Создайте файл config/packages/enqueue.yaml и настройте вашего консольного потребителя:

# config/packages/enqueue.yaml

enqueue:
    consumers:
        my_consumer:
            processor: 'AppMessageProcessorMyMessageProcessor'
            queue: 'my_queue'

6. Консольный потребитель (MyMessageProcessor в примере выше) должен реализовывать интерфейс EnqueueConsumptionQueueSubscriberInterface и должен обрабатывать сообщения, полученные из очереди:

<?php
// src/MessageProcessor/MyMessageProcessor.php

namespace AppMessageProcessor;

use EnqueueConsumptionQueueSubscriberInterface;
use InteropQueueContext;
use InteropQueueMessage;
use InteropQueueProcessor;

class MyMessageProcessor implements Processor, QueueSubscriberInterface
{
    public function process(Message $message, Context $context)
    {
        // обработка исходящего сообщения
        // ...

        return self::ACK;
    }

    public static function getSubscribedQueues()
    {
        return ['my_queue'];
    }
}

Выше мы создали класс MyMessageProcessor, который реализует интерфейс Processor. Метод process() будет вызываться для каждого сообщения, полученного из очереди. В данном методе вы можете выполнить необходимую обработку сообщения и вернуть одно из следующих значений:
- ACK - подтверждение успешной обработки сообщения;
- REJECT - отклонение сообщения, которое может быть повторно вставлено в очередь или отправлено в другую очередь, предназначенную для отклоненных сообщений;
- REQUEUE - повторная вставка сообщения в очередь для последующей обработки.

7. Запустите команду для обработки сообщений в фоне:

$ bin/console enqueue:consume --setup-broker --no-interaction

После запуска этой команды Symfony будет слушать и обрабатывать сообщения из очереди, используя вашего консольного потребителя (MyMessageProcessor в примере выше).

Таким образом, вы можете использовать RabbitMQ в вашем проекте Symfony для отправки и обработки сообщений в очереди.