Потоки Kafka передают строковые проблемы с KTable при группировке и агрегации

jawwe спросил: 28 апреля 2018 в 08:41 в: java

У меня есть поток Kafka с входящими сообщениями, которые выглядят как sensor_code: x, time: 1526978768, address: Y Я хочу создать KTable, который хранит каждый уникальный адрес при каждом коде датчика.

KTable b>

KTable<String, Long> numCount = streams
            .map(kvm1)
            .groupByKey(Serialized.with(stringSerde, stringSerde))
            .count()
            .groupBy(kvm2, Serialized.with(stringSerde, longSerde))
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("StateStore"));

Где kvm1 и kvm2 - мой собственный KeyValueMappers. Моя идея состояла в том, чтобы заменить существующий ключ на sensor_code=x, address=y, выполнить groupByKey() и count(). Затем другой groupBy(kvm2, Serialized.with(stringSerde, longSerde)), где kvm2 изменяет существующий key, чтобы содержать sensor_code, а затем значение будет его счетчиком. Но поскольку он не работает, возможно, я делаю это неправильно ... Он пытается использовать его как Long и выдает исключение, потому что он ищет строку. Я хочу, чтобы счетчик был Long, правильно?

Вот первый KeyValueMapper, который я использую с его соответствующей функцией справки:

    private static String getKeySensorIdAddress(String o) {
    String x = "sensor_id=\"x\", address=\"y\""; 
    try {
        WifiStringEvent event = mapper.readValue(o, WifiStringEvent.class);
        x = x.replace("x", event.getSensor_code());
        x = x.replace("y", event.getAddress());
        return x;
    } catch(Exception ex) {
        System.out.println("Error... " + ex);
        return "Error";
    }
}
        //KeyValueMapper1
KeyValueMapper<String, String, KeyValue<String, String>> kvm1 = 
    new KeyValueMapper<String, String, KeyValue<String, String>>() {
         public KeyValue<String, String> apply(String key, String value) {
             return new KeyValue<>(getKeySensorIdAddress(value), value);
         }
    };

Вот второй KeyValueMapper и его справочная функция.

    private static String getKeySensorId(String o) {
    int a = o.indexOf(",");
    return o.substring(0,a);
}        //KeyValueMapper2 
    KeyValueMapper<String, Long, KeyValue<String, Long>> kvm2 = 
    new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
         public KeyValue<String, Long> apply(String key, Long value) {
             return new KeyValue<>(getKeySensorId(key), value);
         }
    };

Вот исключение и ошибка, которые возвращаются, когда я пытаюсь запустить код.

[2018-05-29 15: 28: 40,119] Сообщение об ошибке ERROR [testUniqueAddresses-ed48daf8-fff0-42e4-bb5a-687584734b45-StreamThread-1] Не удалось обработать задачу потока 2_0 из-за следующей ошибки: (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks: 105) java.lang.ClassCastException: java.lang.Long нельзя передать в java.lang.Stringat org.apache.kafka. common.serialization.StringSerializer.serialize (StringSerializer.java:28) на org.apache.kafka.streams.state.StateSerdes.rawValue (StateSerdes.java:178) на org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore $ 1 .innerValue (MeteredKeyValueBytesStore.java:66) в org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore $ 1.inn erValue (MeteredKeyValueBytesStore.java:57) в org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put (InnerMeteredKeyValueStore.java:198) в org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put (MeteredKeyValueBytesStore. java: 117) в org.apache.kafka.streams.kstream.internals.KTableAggregate $ KTableAggregateProcessor.process (KTableAggregate.java:95) в org.apache.kafka.streams.kstream.internals.KTableAggregate $ KTableAggregateProcessor.process (KTableAggregate. java: 56)

Обратите внимание на ошибку java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String.

Любые идеи, почему я получаю эту ошибку и как я могу ее исправить или совет, как я может отредактировать код, чтобы достичь желаемого результата, как я уже упомянул?

Большое спасибо заранее!

EDIT: Сделал серьезный пересмотр моего вопроса, так как я отказались от одного из подходов.

1 ответ

Michal Borowiecki ответил: 29 апреля 2018 в 08:55

В первом случае, если вы хотите использовать HashMap в качестве типа значения, вам нужно определить его для него и передать его с помощью Materialized.withValueSerde.

Во втором случае я могу скажем, не видя тип возврата из вашего KeyValueMappers и точного сообщения об ошибке: пытается ли он передать String в long или наоборот?

EDIT: Спасибо, что предоставили дополнительную информацию.

Я думаю, что во втором случае вам нужно указать значение serde во второй операции подсчета. Кажется, что существует несогласованность между count () на KGroupedStream и KGroupedTable, поскольку первая автоматически устанавливает значение serde в LongSerde:

https://github.com/apache/kafka/blob/ 1.1 / streams / src / main / java / org / apache / kafka / streams / kstream / internals / KGroupedStreamImpl.java # L281-L283

, но в KGroupedTable нет:

https://github.com/apache/kafka/blob/1.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java#L253

Кажется уже были исправлены на магистрали, но еще не выпущены:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams /kstream/internals/KGroupedTableImpl.java#L158-L160

jawwe ответил: 28 апреля 2018 в 10:09
Спасибо, я рассмотрю первый сценарий. Я также добавил код моего KeyValueMappers к моему вопросу. Как вы упомянули, это похожее сообщение об ошибке, но Sting и Long: java.util.Long cannot be cast to java.lang.String
Michal Borowiecki ответил: 29 апреля 2018 в 08:57
Я сделал кое-что в исходном коде, и я обновил свой ответ. Короче говоря, я считаю, что вам нужно указать LongSerde во второй операции подсчета явно.
jawwe ответил: 05 мая 2018 в 04:44
Спасибо за ответ! Я попытался копать глубже во вторую попытку, для меня это имеет смысл, но я все еще получаю ошибки ... Я сделал редактирование и обновил новый код. Не возражаете ли вы быстро взглянуть на него?
jawwe ответил: 12 мая 2018 в 09:31
Я пересмотрел свой вопрос дальше, сосредоточившись исключительно на подходе без хэш-карты. Я был бы очень признателен, если бы у вас было время взглянуть на него.