Изящно перезапустить Потребительский поток Reactive-Kafka при сбое

Rabzu спросил: 28 апреля 2018 в 09:16 в: scala

Проблема Когда я перезапускаю / завершаю / поток STOP, старый потребитель не умирает / выключается:

[INFO ] a.a.RepointableActorRef -
  Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] 
  from Actor[akka://ufo-sightings/deadLetters]
  to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
  was not delivered. [1] dead letters encountered.

Описание Я создаю службу, которая получает сообщение из темы Kafka и отправляет сообщение внешней службе через HTTP-запрос.

  1. Связь с внешней службой может быть нарушена, а моя службе требуется повторить запрос.

  2. Кроме того, если в потоке есть ошибка, весь поток необходимо перезагрузить.

  3. Наконец, иногда мне не нужен поток и его соответствующий пользователь Kafka, и я бы хотел закрыть весь поток.

Итак, у меня есть Stream:

Consumer.committableSource(customizedSettings, subscriptions)
  .flatMapConcat(sourceFunction)
  .toMat(Sink.ignore)
  .run

Запрос Http отправляется в sourceFunction

Я следил за новыми инструкциями Kafka Consumer Restart в новой документации

  RestartSource.withBackoff(
      minBackoff = 20.seconds,
      maxBackoff = 5.minutes,
      randomFactor = 0.2 ) { () =>
          Consumer.committableSource(customizedSettings, subscriptions)
            .watchTermination() {
                case (consumerControl, streamComplete) =>
                  logger.info(s" Started Watching Kafka consumer id = ${consumer.id} termination: is shutdown: ${consumerControl.isShutdown}, is f completed: ${streamComplete.isCompleted}")
                  consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened id = ${consumer.id} at ${DateTime.now}"))
                  streamComplete
                    .flatMap { _ =>
                      consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} GRACEFULLY:CLOSED FROM UPSTREAM"))
                    }
                    .recoverWith {
                      case _ =>
                        consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} ERROR:CLOSED FROM UPSTREAM"))
                    }
             }
            .flatMapConcat(sourceFunction)
      }
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.ignore)(Keep.left)
      .run

Открывается проблема, которая обсуждает этот неиспользуемый потребитель в сложном потоке Akka, но решения пока нет.

Есть ли способ обхода, который заставляет Kafka Consumer term

1 ответ

Bennie Krijger ответил: 29 апреля 2018 в 12:40

Как насчет обертывания потребителя актером и регистрации KillSwitch, см. https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html#dynamic-stream-handling

Затем в методе postStop Actor вы можете завершить поток. Завернув Actor в BackoffSupervisor, вы получите экспоненциальное отсрочку.

Пример: https://github.com/tradecloud/ Кафка-Акка-расширение / блобо / ведущий / SRC / основной / Scala / нл / tradecloud / Kafka / KafkaSubscriberActor.scala # L27

Rabzu ответил: 29 апреля 2018 в 12:48
KillSwitch также не работает. Приведенный выше пример будет работать только с простым потребительским потоком, и он не сможет убить kafka ConsumerActor, если Stream содержит вложенные потоки. Но я попробую это в любом случае