Apache airflow - автоматизация - как запустить spark отправить работу с параметром

Nick спросил: 28 марта 2018 в 02:01 в: apache-spark

Я новичок в искре и воздушном потоке, пытаясь понять, как я могу использовать воздушный поток, чтобы начать работу вместе с параметрами, необходимыми для работы. Я использую следующую команду spark-submit для запуска определенного задания для конкретных дат в пограничном узле, как показано ниже,

EXECUTORS_MEM=4G
EXECUTORS_NUM=300
STARTDAY=20180401
ENDDAY=20180401
QUEUE=m
jobname=x/home/spark/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --verbose --master yarn --deploy-mode client $EXECUTORS_NUM  --executor-memory $EXECUTORS_MEM --executor-cores 1 --driver-memory 8G  --queue $QUEUE --class test.core.Driver --jars $JARS2 abc.jar --config=/a/b/c/test.config --appName=abc --sparkMaster=yarnclient --job=$jobname --days=$STARTDAY,$ENDDAY

Так что, пожалуйста, дайте мне знать, создаю ли .py что-то похожее к приведенному ниже коду, чтобы запустить работу в воздушном потоке? Это то, как вы должны выполнять задание и amp; pass options?

Как передать параметр, как я сделал для запуска задания в пограничном узле?

Если я автоматизирую задание для ежедневной работы, я бы хотел, чтобы дата начала была" t-7", поэтому, если сегодняшняя дата - 4/20/2018, дата начала работы должна быть 4/13/2018. Как это сделать?

###############.py file example ##############
**********************************************    import BashOperator    import os
    import sys    os.environ['SPARK_HOME'] = '/path/to/spark/root'
    sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))    import os
    import sys    os.environ['SPARK_HOME'] = '/home/spark/spark-2.1.0-bin-hadoop2.6/bin/'
    sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
    and add operator:    spark_task = BashOperator(
        task_id='spark_java',
        bash_command='spark-submit --class test.core.Driver abc.jar',
        params={'EXECUTORS_MEM': '4G', 'EXECUTORS_NUM': '300', 'QUEUE' :'m' , 'jobname' : 'x'},
        dag=dag)################### EOF ######################
**********************************************

Новый .py-файл - пожалуйста, исправьте меня, если что-то не так.

  • Как передать params для запуска версии искры, которая в другом пути?
  • Передайте банку, которая в другом пути
  • Правильный способ передать параметры, как показано ниже?
  • Можно ли передать некоторый старт & конец даты вручную для выполнения задания?

from airflow import DAGfrom airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.utils import timezoneDEFAULT_DATE = timezone.datetime(2017, 1, 1)args = {
    'owner': 'airflow',
    'start_date': DEFAULT_DATE
}
dag = DAG('test_dag_id', default_args=args)_config = {
    'config' : '/a/b/c/d/prod.config' 
    'master' : 'yarn'
    'deploy-mode' : 'client'
    'sparkMaster' : 'yarnclient'
    'class' : 'core.Driver' 
    'driver_classpath': 'parquet.jar',
    'jars': '/a/b/c/d/test.jar',
    'total_executor_cores': 4,
    'executor_cores': 1,
    'EXECUTORS_MEM': '8G',
    'EXECUTORS_NUM': 500,
    'executor-cores' : '1',
    'driver-memory' : '8G',
    'JOB_NAME' : ' ',
    'QUEUE' : ' ',
    'verbose' : ' '
    'start_date' : ' '
    'end_date' : ' '
    ]
}operator = SparkSubmitOperator(
    task_id='spark_submit_job',
    dag=dag,
    **_config
)

1 ответ

Taylor Edmiston ответил: 28 марта 2018 в 02:22

Дата начала - это то, что вы устанавливаете один раз, и она должна быть установлена ​​абсолютно, не относительно текущего дня.

Примерно так:

from airflow import DAGdag = DAG(
    ...
    start_date=datetime.datetime(2018, 4, 13),
)

Можно установить начальную дату в виде дельты, например datetime.timedelta(days=7), но это не рекомендуется, так как это изменит дату начала, если вы удалите DAG (включая все ссылки, такие как прогоны DAG, экземпляры задач и т. д.) и запустите его снова с нуля в другой день. Рекомендуется, чтобы группы обеспечения доступности баз данных были идемпотентными.

Для отправки задания в Spark существует SparkSubmitOperator, заключающий в себе команду оболочки spark-submit. Это было бы предпочтительным вариантом. Тем не менее, вы можете делать с BashOperator все, что угодно, так что это также работоспособная альтернатива.

Связанный код для SparkSubmitOperator хорошо документирован для каждого аргумента, который он принимает. Вы можете указать свой файл .jar с помощью application kwarg, передать конфигурацию Spark с помощью conf. Есть также kwargs для передачи информации, такой как ядра и память исполнителя. Вы можете использовать application_args для передачи списка произвольных аргументов в ваше приложение Spark.

Вот пример использования SparkSubmitOperator, скопированного и немного упрощенного из модульных тестов. для этого в Airflow. Обратите внимание, что он использует **, чтобы взорвать kwargs из dict для инициализации оператора Spark, но именно так устроен тест. Вы также можете легко передать каждое значение конфигурации как kwarg.

from airflow import DAGfrom airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.utils import timezoneDEFAULT_DATE = timezone.datetime(2017, 1, 1)args = {
    'owner': 'airflow',
    'start_date': DEFAULT_DATE
}
dag = DAG('test_dag_id', default_args=args)_config = {
    'conf': {
        'parquet.compression': 'SNAPPY'
    },
    'files': 'hive-site.xml',
    'py_files': 'sample_library.py',
    'driver_classpath': 'parquet.jar',
    'jars': 'parquet.jar',
    'packages': 'com.databricks:spark-avro_2.11:3.2.0',
    'exclude_packages': 'org.bad.dependency:1.0.0',
    'repositories': 'http://myrepo.org',
    'total_executor_cores': 4,
    'executor_cores': 4,
    'executor_memory': '22g',
    'keytab': 'privileged_user.keytab',
    'principal': 'user/spark@airflow.org',
    'name': '{{ task_instance.task_id }}',
    'num_executors': 10,
    'verbose': True,
    'application': 'test_application.py',
    'driver_memory': '3g',
    'java_class': 'com.foo.bar.AppMain',
    'application_args': [
        '-f', 'foo',
        '--bar', 'bar',
        '--start', '{{ macros.ds_add(ds, -1)}}',
        '--end', '{{ ds }}',
        '--with-spaces', 'args should keep embdedded spaces',
    ]
}operator = SparkSubmitOperator(
    task_id='spark_submit_job',
    dag=dag,
    **_config
)

Источник: https://github.com/apache/incubator-airflow/blob/f520990fe0b7a70f80bec68cb5c3f0d41e3e984d/tests/contrib /operators/test_spark_submit_operator.py