Реализация consumer'а RabbitMQ в микросервисе на Go с использованием gRPC может происходить следующим образом:
1. Установите необходимые зависимости:
go get github.com/streadway/amqp go get google.golang.org/grpc
2. Создайте файл со структурой и методами для вашего gRPC сервиса. Например, в файле service.pb.go
содержится описание сервиса и методов:
package main import ( "context" "log" ) type MessageServiceServer struct{} func (s *MessageServiceServer) PublishMessage(ctx context.Context, request *MessageRequest) (*MessageResponse, error) { // Обработка запроса и отправка сообщения в RabbitMQ err := publishMessage(request.Body) if err != nil { return nil, err } return &MessageResponse{Success: true}, nil } func publishMessage(message string) error { // Логика для отправки сообщения в RabbitMQ // Используйте библиотеку `github.com/streadway/amqp` return nil }
3. Создайте файл для запуска главного сервера gRPC. Назовем его, например, server.go
:
package main import ( "log" "net" "google.golang.org/grpc" ) const ( port = ":50051" // Порт для gRPC сервера ) func main() { // Создание нового gRPC сервера lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() // Регистрация сервиса pb.RegisterMessageServiceServer(s, &MessageServiceServer{}) log.Printf("Server listening on port %v", port) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
4. Создайте файл для consumer'а RabbitMQ, который будет слушать очередь сообщений и обрабатывать полученные сообщения. Например, назовем его consumer.go
:
package main import ( "log" "github.com/streadway/amqp" ) const ( rabbitMQURL = "amqp://guest:guest@localhost:5672/" // URL RabbitMQ queueName = "my_queue" // Имя очереди RabbitMQ ) func main() { // Подключение к RabbitMQ conn, err := amqp.Dial(rabbitMQURL) if err != nil { log.Fatalf("failed to connect to RabbitMQ: %v", err) } defer conn.Close() // Создание канала ch, err := conn.Channel() if err != nil { log.Fatalf("failed to open a channel: %v", err) } defer ch.Close() // Объявление очереди q, err := ch.QueueDeclare( queueName, // Имя очереди false, // Очередь устойчивая к сбою RabbitMQ false, // Удалять очередь, когда последний подписчик отключается false, // Исключается уведомление о подписке на очередь false, // Поддержка аргументов nil, // Аргументы для объявления очереди ) if err != nil { log.Fatalf("failed to declare a queue: %v", err) } // Получение сообщений из очереди msgs, err := ch.Consume( q.Name, // Имя очереди "", // Имя подписчика true, // Auto Acknowledgement (автоматическое подтверждение получения сообщения) false, // Exclusive (один подписчик на очередь) false, // No Local (отключение локальной доставки) false, // No Wait (блокирование ожидания подписчика) nil, // Аргументы ) if err != nil { log.Fatalf("failed to register a consumer: %v", err) } // Обработка полученных сообщений for msg := range msgs { log.Printf("Received a message: %s", msg.Body) // Обработка сообщения err := processMessage(msg.Body) if err != nil { // Возможна обработка ошибки } } } func processMessage(message []byte) error { // Обработка полученного сообщения return nil }
5. Запустите главный сервер gRPC, используя команду go run server.go
, чтобы он слушал запросы.
6. В другом терминале запустите consumer'а, используя команду go run consumer.go
, чтобы он начал слушать очередь RabbitMQ и обрабатывать полученные сообщения.
Теперь, когда ваш gRPC микросервис получает запрос на публикацию сообщения, он будет отправлять его в RabbitMQ, где consumer будет слушать очередь и обрабатывать полученные сообщения параллельно.