Я не являюсь экспертом Spark SQL API или базового RDD.
Но, зная об оптимизации Catalyst, я ожидал, что Spark попытается свести к минимуму усилия в памяти.
Это моя ситуация: я имею, скажем, две таблицы
TABLE GenericOperation (ID, CommonFields...)
TABLE SpecificOperation (OperationID, SpecificFields...)
Они оба довольно огромные (~ 500M, not big , но нецелесообразно иметь в целом в памяти на стандартном сервере приложений).
Тем не менее, я должен получить с помощью Spark (часть большего использования) все SpecificOperation
, которые соответствуют определенному условию для полей, принадлежащих GenericOperation
.
Это код, который я использую:
val gOps = spark.read.jdbc(db.connection, "GenericOperation", db.properties)
val sOps = spark.read.jdbc(db.connection, "SpecificOperation", db.properties)
val joined = sOps.join(gOps).where("ID = OperationID")
joined.where("CommonField= 'SomeValue'").select("SpecificField").show()
Проблема в том, что, когда дело доходит до выполнения вышеуказанного, я вижу из SQL Profiler, что Spark не выполняет соединение в базе данных, а скорее извлекает все OperationID
из SpecificOperation
, и тогда я предполагаю, что он будет запускать все слияние в памяти. Поскольку фильтр SpecificOperation
не применим ни к одному фильтру, такое извлечение принесет много, слишком много , данных в конечную систему.
Можно ли написать выше, так что соединение требуется непосредственно для dbms? Или это зависит от какой-то волшебной конфигурации Spark, о которой я не знаю?
Конечно, я мог бы просто жестко кодировать соединение в качестве подзапроса при извлечении, но это в моем случае это невозможно: утверждения, которые должны быть созданы во время выполнения, начиная с простых строительных блоков. Следовательно, мне нужно реализовать это, начиная с двух уже созданных spark.sql.DataFrame
В качестве побочного примечания я запускаю это с помощью Spark 2.3.0 для Scala 2.11, против SQL Server 2016 экземпляр базы данных.
Исключение статически сгенерированных запросов ( В Apache Spark 2.0.0 возможно ли получить запрос из внешней базы данных (а не захватить всю таблицу)? ), Spark не поддерживает push-код
join
. Только предикаты и выбор могут быть делегированы источнику.Нет никакой волшебной конфигурации или кода, которые могли бы даже поддерживать этот тип процесса.
В общем случае, если сервер может обрабатывать соединение, данные обычно недостаточно велика, чтобы извлечь выгоду из искры.