Apache Flink - доступ к внутреннему буферу WindowedStream из другой карты MapFunction

Sebastian спросил: 28 марта 2018 в 04:04 в: apache-flink

У меня есть потоковое приложение на основе Apache Flink со следующей настройкой:

  • Источник данных: генерирует данные каждую минуту.
  • Оконный поток с использованием CountWindow с размером = 100, slide = 1 (окно подсчета количества скобок).
  • ProcessWindowFunction применит некоторое вычисление (скажем, F (x)) к данным в окне.
  • Приемник данных для потребления выходного потока

Это прекрасно работает. Теперь я хочу, чтобы пользователи предоставляли функцию G (x) и применяли ее к текущим данным в окне и отправляли результат пользователю в режиме реального времени

Я не спрашиваю о как применить произвольную функцию G (x) - для этого я использую динамические скрипты. Я спрашиваю, как получить доступ к буферизованным данным в окне из функции отображения другого потока.

Некоторый код для уточнения

DataStream<Foo> in  = .... // source data produced every minute
    in
       .keyBy(new MyKeySelector())
       .countWindow(100, 1)
       .process(new MyProcessFunction())
       .addSink(new MySinkFunction())// The part above is working fine. Note that windowed stream created by countWindow() function above has to maintain internal buffer. Now the new requirementDataStream<Function> userRequest  = .... // request function from useruserRequest.map(new MapFunction<Function, FunctionResult>(){
   public FunctionResult map(Function Gx) throws Exception {
         Iterable<Foo> windowedDataFromAbove = // HOW TO GET THIS???
         FunctionResult result = Gx.apply(windowedDataFromAbove);
         return result;   }

})

3 ответа

Есть решение
kkrugler ответил: 29 марта 2018 в 01:32

Соедините два потока, затем используйте функцию CoProcessFunction. Вызов метода, который получает поток функций, может применить их к тому, что находится в окне вызова другого метода.

Если вы хотите транслировать функции, то вам нужно будет либо использовать Flink 1.5 (который поддерживает подключение ключевые и широковещательные потоки) или использовать некоторые вертолетные трюки для создания единого потока, который может содержать оба типа: Foo и Function, с соответствующей репликацией функций (и генераций ключей) для имитации широковещательной передачи.

Sebastian ответил: 29 марта 2018 в 06:51
Это была моя первая мысль. Но API не поддерживает соединение с WindowedStream ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/…
kkrugler ответил: 29 марта 2018 в 07:07
В (Co) ProcessFunction вы можете создавать свои собственные окна, контролируя время срабатывания таймеров.
David Anderson ответил: 29 марта 2018 в 10:11
В любом случае реализация вашего собственного оконного управления будет намного более эффективной - с этими скользящими окнами подсчета каждое событие копируется в 100 оконных объектов.
Sebastian ответил: 29 марта 2018 в 12:41
Это действительно так? Зачем Флинк делать 100 копий? Скользящее окно может быть реализовано с буфером фиксированного размера.
Nizar ответил: 04 апреля 2018 в 11:55

Предполагая, что Fx агрегирует входящие foos на лету, а Gx обрабатывает ценность foos окна, вы должны быть в состоянии достичь того, что вы хотите, следующим образом:

DataStream<Function> userRequest  = .... // request function from user
Iterator<Function> iter = DataStreamUtils.collect(userRequest);
Function Gx = iter.next();DataStream<Foo> in  = .... // source data
 .keyBy(new MyKeySelector())
 .countWindow(100, 1)
 .fold(new ArrayList<>(), new MyFoldFunc(), new MyProcessorFunc(Gx))
 .addSink(new MySinkFunction())

Функция Fold (работает для входящих данных, как только они поступят) можно определить следующим образом:

private static class MyFoldFunc implements FoldFunction<foo, Tuple2<Integer, List<foo>>> {
    @Override
    public Tuple2<Integer, List<foo>> fold(Tuple2<Integer, List<foo>> acc, foo f) {
        acc.f0 = acc.f0 + 1; // if Fx is a simple aggregation (count)
        acc.f1.add(foo);
        return acc;
    }
}

Функция процессора может выглядеть примерно так:

public class MyProcessorFunc
    extends ProcessWindowFunction<Tuple2<Integer, List<foo>>, Tuple2<Integer, FunctionResult>, String, TimeWindow> {    public MyProcessorFunc(Function Gx) {
        super();
        this.Gx = Gx;
    }    @Override
    public void process(String key, Context context,
                        Iterable<Tuple2<Integer, List<foo>> accIt,
                        Collector<Tuple2<Integer, FunctionResult>> out) {
        Tuple2<Integer, List<foo> acc = accIt.iterator().next();
        out.collect(new Tuple2<Integer, FunctionResult>(
            acc.f0, // your Fx aggregation
            Gx.apply(acc.f1), // your Gx results
        ));
    }
}

Обратите внимание, что функции сложения \ уменьшения по умолчанию не содержат внутренних элементов буфера. Здесь мы используем сложение для вычисления метрик "на лету", а также для создания списка элементов окна.

Если вы заинтересованы в применении Gx к опрокидывающимся окнам (не скользящим), вы можете использовать опрокидывающиеся окна в своем конвейере. , Чтобы вычислить скользящее число тоже , у вас может быть другая ветвь вашего конвейера, которая вычисляет только скользящее число (не применяется Gx). Таким образом, вам не нужно хранить 100 списков в каждом окне.

Примечание. Для использования DataStreamUtils может потребоваться добавить следующую зависимость:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-contrib</artifactId>
    <version>0.10.2</version>
</dependency>
Nizar ответил: 04 апреля 2018 в 12:25
@Sebastian Предполагая, что у вас есть функция от пользователя, всегда доступная в начале программы, вы можете преобразовать ее в итератор и предоставить ее в качестве входных данных для MyProcessorFunc, как показано в обновленном ответе. Вы также можете собрать выходные данные 1-го потока (списки) в памяти и подождать, пока пользовательский запрос не придет, но это не подходит для работы Flink, поскольку вы заблокируете вычисления первого потока (и сохраните его вывод в памяти), пока не поступит пользовательский запрос!
Sebastian ответил: 05 апреля 2018 в 10:37
Функция от пользователя (Gx) недоступна в начале программы. Он отправляется через REST API конечными пользователями - который преобразуется в поток и отправляется на Flink для обработки. Однако существуют встроенные функции (Fx), которые обрабатываются на оконных данных. Вопрос здесь, как использовать оконные данные, которые уже находятся в памяти (так как они используются для вычисления Fx), для вычисления Gx
Sebastian ответил: 04 апреля 2018 в 05:33
Gx исходит от конечных пользователей в своем собственном DataStream. В приведенном выше примере кода Gx объявлен в качестве входного параметра для конструктора MyProcessorFunc, но код вызывающей стороны не показывает, как Gx передается в конструктор.