Как в Reactor Netty обрабатывать ошибки внутри Flux?

Reactor Netty - это фреймворк, основанный на реактивных потоках из проекта Reactor, который предоставляет реактивные возможности для создания серверных приложений. В Reactor Netty есть несколько способов обработки ошибок внутри Flux, и мы рассмотрим их подробнее.

1. Метод onErrorResume: Этот метод позволяет заменить ошибку на альтернативное значение или альтернативный Flux. Пример использования:

Flux<String> flux = Flux.just("A", "B", "C")
    .concatWith(Mono.error(new RuntimeException("Ошибка!")))
    .onErrorResume(error -> {
        log.error("Произошла ошибка: {}", error.getMessage());
        return Flux.just("D", "E", "F");
    });

flux.subscribe(System.out::println);

В этом примере, если возникает ошибка, мы логируем ее и заменяем Flux на новый Flux, в котором есть альтернативные значения "D", "E", "F". Результатом выполнения кода будет:

A
B
C
D
E
F

2. Метод onErrorReturn: Этот метод позволяет заменить ошибку на заданное значение. Пример использования:

Flux<String> flux = Flux.just("A", "B", "C")
    .concatWith(Mono.error(new RuntimeException("Ошибка!")))
    .onErrorReturn("Значение по умолчанию");

flux.subscribe(System.out::println);

В этом примере, если возникает ошибка, мы заменяем ее на значение "Значение по умолчанию". Результатом выполнения кода будет:

A
B
C
Значение по умолчанию

3. Метод onErrorMap: Этот метод позволяет преобразовать ошибку в другую ошибку. Пример использования:

Flux<String> flux = Flux.just("A", "B", "C")
    .concatWith(Mono.error(new RuntimeException("Ошибка!")))
    .onErrorMap(error -> new CustomException("Произошла кастомная ошибка", error));

flux.subscribe(System.out::println, error -> {
    if (error instanceof CustomException) {
        CustomException customError = (CustomException) error;
        log.error("Произошла кастомная ошибка: {}", customError.getMessage());
    } else {
        log.error("Произошла ошибка: {}", error.getMessage());
    }
});

В этом примере, если возникает ошибка, мы преобразуем ее в CustomException и логируем кастомное сообщение об ошибке. Если ошибка не является CustomException, мы просто логируем ее. Результатом выполнения кода будет:

A
B
C
Произошла кастомная ошибка: Произошла кастомная ошибка

4. Методы doOnError и doFinally: Эти методы позволяют выполнить определенные действия при возникновении ошибки или завершении Flux. Пример использования:

Flux<String> flux = Flux.just("A", "B", "C")
    .concatWith(Mono.error(new RuntimeException("Ошибка!")))
    .doOnError(error -> log.error("Произошла ошибка: {}", error.getMessage()))
    .doFinally(signalType -> log.info("Flux завершен: {}", signalType));

flux.subscribe(System.out::println);

В этом примере, если возникает ошибка, мы логируем ее. После завершения Flux (успешно или с ошибкой), мы логируем это. Результатом выполнения кода будет:

A
B
C
Произошла ошибка: Ошибка!
Flux завершен: onError

В Reactor Netty есть и другие методы для работы с ошибками, такие как onErrorContinue и onErrorStop, которые позволяют обрабатывать ошибки с полным или частичным пропуском, и onErrorResumeWith, который позволяет создать альтернативный Flux на основе ошибки. Все эти методы предоставляют различные возможности для обработки ошибок внутри Flux в зависимости от ваших потребностей.