Kafka streams - NullPointerException при подключении KStream с KTable leftjoin ()

Stefan Repcek спросил: 28 марта 2018 в 04:13 в: java

Я использую leftJoin для преобразования потока сообщения (транзакции) с использованием значений из таблиц. Я объединяю несколько объединений. После каждого присоединения я выбираю новый ключ. Я получаю NullPointerException в моем последнем соединении. Фильтрация null не помогает.

Exception in thread "account-transactions-e0a66751-f65e-4ba0-947c-024b5d32f7c2-StreamThread-1" java.lang.NullPointerException at org.namematching.kafkastreams.transactiontransformation.TransactionTransformationStream.lambda$launchStream$14(TransactionTransformationStream.java:124)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:58)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

Вот моя логика соединения:

KStream<String,DoiTAccountTransaction> transactionsWithAccounts =
            transactionStream.leftJoin(accountTable,(v1,v2) -> { v1.setBeneficiaryBankAccount(v2.getOriginId()); return v1; }, Joined.with(Serdes.String(), transactionSerde, accountSerde))
            .selectKey((k,v) -> v.getTransactionCode());KStream<String,DoiTAccountTransaction> transactionsWithTransactionCodes =
            transactionsWithAccounts.leftJoin(transactionCodesTable, (v1,v2) -> {v1.setTransactionCode(v2.getUid()); return v1; }, Joined.with(Serdes.String(), transactionSerde, transactionCodesSerde))
            .selectKey((k,v) -> v.getCurrency());KStream<String,DoiTAccountTransaction> transactionsWithCurrencies =
            transactionsWithTransactionCodes.leftJoin(currenciesTable, (v1,v2) -> {v1.setCurrency(v2.getUid()); return v1;}, Joined.with(Serdes.String(), transactionSerde, currenciesSerde))
            .selectKey((k,v) -> v.getOriginalBeneficiary1());KStream<String, DoiTAccountTransaction> transactionsWithPersons =
            transactionsWithCurrencies.leftJoin(personsTable,(v1,v2) -> {v1.setOriginalBeneficiary1(v2.getUid()); return v1;}, Joined.with(Serdes.String(), transactionSerde, personsSerde));transactionsWithPersons.to("account-transactions-processed", Produced.with(Serdes.String(),transactionSerde));

Последнее соединение не выполняется , Фильтрация нулевых ключей не помогает.


1 ответ

Michal Borowiecki ответил: 31 марта 2018 в 03:16

Возможно ли, что строка в peopleTable для этого ключа не присутствовала (в то время), и NPE исходил из выражения v2.getUid ()?

Было бы безопаснее сделать это проверка на ненулевое значение всех правосторонних объектов в объединениях перед доступом к их полям (всем v2).

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams + Регистрация + Семантика # KafkaStreamsJoinSemantics-KStream-KTableJoin