Как установить air flow на windows

Обновлено: 08.07.2024

Версия Airflow, установленная на этот раз, - 1.10, которая должна полагаться на Python и БД. БД, выбранная на этот раз, - Mysql.

Установленные компоненты и версии:

Установка Python

Установите MySQL

Сборка библиотек, сборка пользователей

Название библиотеки - airflow

Имя пользователя - airflow, и все IP-адреса доступны.

Здесь новый пользователь воздушного потока получает все разрешения библиотеки воздушного потока

Установка воздушного потока

Настройка переменной среды Следующий код настраивает переменную среды.Эту переменную среды нужно задавать только как временную переменную и не нужно настраивать как постоянную переменную.

Затем используйте pip для установки воздушного потока.

В настоящее время airflow будет установлен в сторонний пакет под Python, запомните этот адрес, обычно это путь $ /lib/python3.6/sit-packages/airflow.

Изменить переменную среды AIRFLOW_HOME

В этой установке я только устанавливаю переменную HOME на временную переменную, но не устанавливаю постоянную переменную.

Запустите команду воздушного потока

Выполнить в каталоге $ /lib/python3.6/sit-packages/airflow/bin

Этот шаг должен установить поток воздуха в каталог AIRFLOW_HOME, который вы только что установили. Выполнение этой команды может сообщить о некоторых ошибках, вам не нужно обращать слишком много внимания, если вы гарантируете, что файл создан в каталоге AIRFLOW_HOME, это доказывает, что это выполнение было успешным.

Установите модуль Mysql

Здесь мы можем просто сказать, что таким образом могут быть установлены другие компоненты, от которых зависит воздушный поток. Компонент пароля также устанавливается таким образом впоследствии.

Изменить конфигурацию Airflow DB

Формат параметра: mysql: // account: password @ ip: port / db

Инициализировать дб

Создайте новую зависимую от воздушного потока таблицу.

Измените часовой пояс на Район Донгба

Время воздушного потока по умолчанию - GMT, что на 8 часов раньше пекинского времени. Эта схема предназначена для того, чтобы при распределении кластера Airflow по разным часовым поясам время оставалось одинаковым и не возникало проблем с синхронизацией времени.

Но в сцене, к которой я прикоснулся, я обслуживал один узел, и даже после расширения он находился в одном и том же часовом поясе, и все решили изменить часовой пояс на часовой пояс Восточной восьмерки, который является пекинским временем.

Модификация делится на следующие этапы:

Здесь изменяется время планирования расписания, то есть при записи времени планирования вы можете напрямую записывать пекинское время.

2. Измените текущее время в правом верхнем углу интерфейса веб-сервера.


Измените код в разделе комментариев на код в красном поле. Код в комментарии предназначен для преобразования системного времени в время по Гринвичу, а измененный код должен напрямую брать системное время без преобразования часового пояса.

Измененный эффект показан в красной рамке.


3. Изменить время последнего запуска веб-сервера

  • Измените $ /lib/python3.6/site-packages/airflow/models.py, чтобы добавить метод get_last_dagrun

Иллюстрация выглядит следующим образом:


  • Изменить $ /lib/python3.6/site-packages/airflow/www/templates/airflow/dags.html last_run.execution_date..strftime ("% Y-% m-% d% H:% M" ) И last_run.start_date.strftime ("% Y-% m-% d% H:% M"):

Иллюстрация выглядит следующим образом:


* (Необязательная операция) перезапустите веб-сервер

Если вы устанавливаете в порядке, указанном в этой статье, вам не нужно выполнять этот шаг. Если вы запустили поток воздуха до смены часового пояса, вам необходимо перезапустить веб-сервер. Метод запуска будет описан позже, а метод остановки службы может напрямую завершить процесс.

Эффект после модификации выглядит следующим образом:


Аутентификация пользователя

Метод аутентификации пользователя, принятый в этой статье, является методом пароля, и другие методы, такие как LDAP, также поддерживаются, но не будут представлены в этой статье. Автор попытался метод LDAP во время установки, но не удалось.

2. Изменить airflow.cfg

3. Выполните следующий код в среде python, чтобы добавить учетную запись:

Настроить почтовый сервис

Этот параметр конфигурации является отправителем электронного письма, если задание dag не выполнено или повторите попытку. Конфигурация выглядит следующим образом:

Далее просто перечислите Python-код dag для справки:

Настройте Исполнителя

Поскольку в этой статье только один узел, используется режим LocalExecutor.

Изменить адрес журнала

Изменить адрес веб-сервера

Вы можете получить доступ к веб-серверу по указанному выше адресу.

Начать воздушный поток

Краткое описание проблемы

Следующие пункты - это проблемы и решения, с которыми столкнулся автор в процессе установки.

  1. pip install apache-airflow == 1.10 ошибка отчета:

To avoid this dependency set SLUGIFY_USES_TEXT_UNIDECODE=yes in your environment when you install


Это потому, что переменные среды не были добавлены перед установкой, выполните следующий код:

2. Ошибка при инициализации БД:

Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql


Измените файл конфигурации Mysql my.cnf, выполнив следующие действия:

Красное поле на рисунке ниже - это расположение файла my.cnf:


Модифицированный код показан на рисунке:


Примечание: должно быть написано в [mysqld]

  • Перезапустите Mysql
  • Проверьте, вступает ли модификация в силу. Выполните следующий SQL, если значение равно 1, оно эффективно.


3. Ошибка при запуске веб-сервиса:

No such file or directory: 'gunicorn': 'gunicorn'


Увеличьте переменные среды:

По конкретным причинам, пожалуйста, смотрите ссылку ниже, здесь нет дальнейшего обсуждения.

4. pip install ошибка "airflow [mysql]":

mysql_config not found



  • Если mysql-devel не установлен
  • Поиск снова после установки, результат выглядит следующим образом:


5. Выполните ./aiflow и сообщите об ошибке:

cannot import name 'CSRFProtect'


6. Ошибка при запуске веб-сервера:

No module named 'pymysql'


7. Ошибка при запуске веб-сервера:


Установите указанную версию gunicorn.

Версия Airflow 1.10 устанавливает версию 19.4.0:

Airflow 1.8 версия устанавливает версию 19.3.0:

В некоторых случаях после изменения группы доступности базы данных во избежание выполнения задачи до текущей даты можно использовать backfill Заполните задачи за определенный период времени

  • airflow backfill -s START -e END --mark_success DAG_ID

Интеллектуальная рекомендация


WECHAT MILLY WAY POINT CAMENT CARD - СОВРЕМЕННЫЕ СТРАНИЦЫ ПРОГЛЯЮТНОЕ ЗНАЧЕНИЕ (Запрос)

WECHAT Small Pass Pass Value и значение приобретения: 1. Установите способ настройки идентификатора идентифицирует значение параметра, передаваемое после прыжка; 2, используя метод Data-XXXX для идент.


Текущая задача Узел больше экземпляров


[Linux] Программирование сетевых сокетов UDP

Что такое протокол UDP Протокол UDP называетсяПротокол пользовательских датаграмм UDP - протокол транспортного уровня Без установления соединения, ненадежная передача, ориентированная на дейтаграмму П.



Основная идея обработки больших данных - разделяй и властвуй

Разделяй и властвуй - «разделяй и властвуй» Как мы все знаем, компьютеры очень быстрые и используются людьми. Однако независимо от того, насколько быстрым является компьютер, способность о.

PXF, Greenplum и оптимизация SQL-запросов к разным источникам данных

PXF, Greenplum и оптимизация SQL-запросов к разным источникам данных

Комбо Apache Airflow и NiFi для запланированного запуска ETL-конвейеров: практическая инженерия Big Data

Комбо Apache Airflow и NiFi для запланированного запуска ETL-конвейеров: практическая…

Синергия Apache Airflow и Ray для MLOps-конвейеров: инженерия Data Science

Синергия Apache Airflow и Ray для MLOps-конвейеров: инженерия Data Science

Напиши отзыв и выиграй

Новое на сайте
Отзывы на Google

BigDataSchool

Курсы от инженеров и для инженеров. Всё чётко, по делу. Тренеры глубоко знают продукты, о которых читают лекции. read more

Принимал участие в обучении по курсу "KAFKA: Администрирование кластера Kafka". В целом понравилось, но хотелось бы более качественной организации работы с лабгайдами. Когда лектор выполняет лабораторную работу, не совсем удобно выполнять её параллельно - где-то отстаешь, где-то убегаешь вперед. Может будет лучше разделить на более мелкие модули. read more

Прошел Курс Администрирование кластера Hadoop. Подача материала хорошая, размеренная. Преподаватель отвечает на все вопросы, и пытается как можно прозрачней приподнести материал. read more

Обучался на программе HADM. Подача материала доступная. Порадовало соотношение теории и практики 50/50. Отзывчивый преподаватель. Однозначно рекомендую. read more

Заканчиваю прохождения курса "ADH: Администрирование кластера Arenadata Hadoop". Хочу сказать, что выстроен грамотный план обучения, где отслеживается отличное соотношение практики и теории. Преподаватель, Комисаренко Николай, обладает отличным чувством юмора, что позволило не скучать на серьезных темах, и обладает отличным навыком объяснять сложные вещи простыми словами. На курс приходил с большим числом вопросов, на все из которых получил грамотные ответы, после чего все разложилось по полочкам. read more

В декабре 2020 прошел курс "Администрирование кластера Kafka". Курс проводился удаленно. В части организации обучения придраться не к чему. Необходимую информацию прислали заранее, лабораторный стенд и портал обучения работали стабильно. Немного разочаровали лабораторные работы. На месте BigDataSchool я бы их переделал. В документах с лабами нужно сделать нормальное форматирование и нумерацию пунктов. Все пункты, необходимые для выполнения, нужно сделать в виде текста. В лабах много работ по созданию «обвязки» kafka (создание самоподписных сертификатов, развертывание MIT и т.п), которые можно сделать заранее. Это позволит студентам уделять больше времени изучению самой kafka. BigDataSchool идет навстречу и позволяет пользоваться лабораторным стендом гораздо дольше установленных часов обучения. Это очень к стати, если в течении дня Вы вынуждены отвлекаться от обучения. В целом, курс дает хорошую базу по kafka. Преподаватель хорошо подает материал, делает акценты в нужных местах, подробно отвечает на вопросы. read more

С 30 ноября по 4 декабря прошел курс "Администрирование кластера Hadoop". Учитывая, что я обладал довольно поверхностной информацией в данной теме (я CIO) - ушел с курсов просветленным. Многое стало понятным, в процессе обучения наложил знания на существующую инфраструктуру компании, в которой работаю. Рекомендую коллегам руководителям в ИТ - прокачаться на данном курсе, вы поймете куда двигаться в ближайшие 2-3 года. Админам, работающим или стремящимся в BigData- обязательно! Рекомендация - настойчиво, для тех кто "думает, что знает": перед курсом уделите время работе с командной строкой Linux! Total recall - обязательное условие. Много практической работы, и если есть затык в Linux - будете безнадежно отставать при выполнении лабораторных работ. read more

В октябре прошел курс Анализ данных с Apache Spark, это был второй раз, когда я обучался в этом месте. В целом, все хорошо, думаю что не последний. Не могу не подчеркнуть профессионализм преподавателя Королева Михаила, отвечал на поставленные вопросы, делился своим опытом. В общем, рекомендую! read more

Прошел тут курс "NIFI: Кластер Apache NiFi", вёл Комисаренко Николай. Живое и понятное обучение. Преподаватель отвечал на все вопросы от самых глупых, до самых умных и это было приятно. Так же порадовало, что преподаватель не идёт по заранее проложенным рельсам, а проходит весь путь вместе с вами, стараясь привнести, что-то новое. read more

Спасибо за обучение!

Очень крутое место, много практики, понятное объяснение заданной темы. Еще вернусь :) read more

Обучался на курсе HADM администрирование кластера Arenadata Hadoop. Интересный курс, хорошая подача. read more

Обучался на курсе по администрированию Apache Kafka. Хорошая подача материала, интересные практические задачи. Возникающие вопросы доходчиво и ясно объясняют. Остался очень доволен. read more

Был на курсе "Администрирование кластера Hadoop". Отличная подача материала. Очень много практики и технических подробностей. Подробный обзор стека технологий, платформы и инструментов. Рекомендую! read more

Учился на курсе Администрирование Hadoop. Курс вёл Николай Комиссаренко. Отлично подготовленная, продуманная, системная программа курса. Практические занятия организованы так, что у студентов есть возможность познакомиться с реальными особенностями изучаемого продукта. Отключил голову и прощёлкал лабы по книжке - здесь не работает. Преподаватель легко и развёрнуто отвечает на возникающие вопросы не только по теме предмета, но и по смежным. read more

Прошёл курс по администрированию Apache Kafka. Очень понравилась как подача материала, так и структура курса. Только вот времени маловато оказалось. не всё успел доделать, но это уже не к курсу претензии :). Практики было довольно много, и это хорошо read more

Прошёл курс "Hadoop для инженеров данных" у Николая Комиссаренко. Информация очень актуальна и полезна, заставляет задуматься о текущих методах работы с большими данными в нашей компании и, возможно, что-то поменять. Занятия с большим количеством практики, поэтому материал хорошо усваивается. Отдельное спасибо Николаю за то, что некоторые вещи объяснял простым языком, понятным даже для "чайников" в области Hadoop. read more

I did not find any disadvantages in the course. Pluses: + A lot of practice (50% of the time). + The teacher can explain difficult topics easy way. + Announced topics were considered. Besides additional materials were studied. read more

Посетил курс администрирование Hadoop. На курсе устанавливали кластер с нуля на виртуалках в облаке Amazon. Настраивали Kerberos, тестировали выполнение задач на кластере, управление ресурсами кластера. Т.к. кластер развернут в облаке, после завершения занятий можно самостоятельно работать с кластером из дома. Лекции вел Николай Комиссаренко, после обучения предоставил все материалы. На занятиях отвечал на дополнительные вопросы, рассмотрели как решить пару живых задач от студентов. Хороший курс для начала изучения BigData. Update Дополнительно прошел обучения по Airflow и NiFi. Курсы двух дневные упор на занятиях делался на использовании продуктов, администрированию уделялось меньше времени. Т.к. курсы короткие, то перед занятиями желательно почитать обзорные статьи по продуктам, чтобы не терять время на базовое погружение и задавать более предметные вопросы. Перед началом занятий желательно связаться с школой и запросить что больше интересуется на обучении. Может быть предложить свои кейсы, чтобы на лабораторных отработать не только общий функционал. read more

Был на основах хадупа, все материалы описаны доступным языком. В частности хочу отметить преподавателя Николая Комисаренко, как очень квалифицированного преподавателя и специалиста. read more

Отличные курсы по "Администрированию Hadoop" и отличная организация проведения занятий, все по делу и понятно. Очень понравилось, знания получены основательные. Материал подаётся основательно. Постараюсь ещё попасть на другие курсы. read more

Курс по Isilon у Николая Комиссаренко мне тоже понравился. Грамотный и отзывчивый. Возникали вопросы по курсу он отвечал на все вопросы. Спасибо. Успехов ему read more

Посетил курс администрирование Hadoop. На курсе устанавливали кластер с нуля на виртуалках в облаке Amazon. Настраивали Kerberos, тестировали выполнение задач на кластере, управление ресурсами кластера. Т.к. кластер развернут в облаке, после завершения занятий можно самостоятельно работать с кластером из дома. Лекции вел Николай Комиссаренко, после обучения предоставил все материалы. На занятиях отвечал на дополнительные вопросы, рассмотрели как решить пару живых задач от студентов. Хороший курс для начала изучения BigData. read more

Эффективный практический курс. Прошел курс Администрирование Hadoop в октябре 2018. Хорошо наполненный материал, оптимальная длительность курса и все делалось своими руками. Местами было непросто, но преодолимо. Оправдал все ожидания, после курса появилось целостное понимание создания и работы кластера. Николай, большое спасибо read more

Прошёл курс по администрированию Hadoop Cloudera. Отличная "живая" подача материала на "простом" языке. Как плюс работа с кластером построена на платформе AWS. На курсах не скучно, рекомендую! read more

Я узнал много нового посетив курс уважаемого Николая Комиссаренко по айзелону. Очень грамотный специалист обучение было очень полезным и грамотным. Спасибо вам большое read more

Для работы Airflow в prod-режиме рекомендуется использовать PostgreSQL, вместо используемого по умолчанию SQLite. Ничего сложного в этом этапе нет.

  1. Добавляем в /etc/yum.repos.d/ репо-файл для PostgreSQL 13.
  2. Устанавливаем актуальную версию PosgreSQL:
  • Для сборки пакета 'psycopg2' понадобилось установить через yum пакет libpq5-devel . Обходной способ: установка бинарника psycopg2-binary , вместо пакета psycopg2 .

Установка дополнительных пакетов

Устанавливаем пакеты для разработки:

Устанавливаем python-пакет для работы с PostgreSQL:

  • Без пакета 'cryptography' операция airflow initdb отработает, но пожалуется на невозможность шифрования значений.

Установка Airflow-пакетов

Перед установкой Airflow скачиваем свежий ограничительный файл constraints.txt и переносим на целевую машину или, в случае доступа машины к интернету, используем URL.

Устанавливаем Airflow с помощью pip и актуального файла constraints.txt :

Устанавливаем дополнительные airflow-пакеты для работы с "Hadoop":

Пакет 'apache-airflow[kerberos]' не установится из-за зависимостей на древний пакет 'krbV', версии для python3 которого не существует.

Подготовка к запуску Airflow

Создание системного пользователя airflow

Создаём системного пользователя 'airflow' и разрешаем ему беспарольное 'sudo' для имперсонации, то есть запуска скриптов от имени владельца:

Создание каталогов и конфигурационных файлов для работы Airflow

Создаём каталог для работы Apache Airflow и назначаем права:

  • В последней команде мы назначаем default-права на каталог 'POSIX'-группе _airflow_workfolder_access из FreeIPA. Эти права будут наследоваться с режимом 770 для поддиректорий и 660 для файлов.
  • Заметка: 'POSIX ACL' не работает с 'MemberOf'-группами из FreeIPA, но работает с 'POSIX'-группами.

Создаём файл /etc/sysconfig/airflow с переменными, использующимися Airflow при запуске из systemd-юнитов:

  • Без переменных HADOOP_CONF_DIR и HIVE_CONF_DIR , в Airflow не будет работать 'SparkSubmitOperator', так как он использует файлы hive_site.xml и yarn-site.xml .
  • Каталог /etc/hadoop/ , в случае его отсутствия из-за, например, недавнего добавления этого узла в кластер, может быть скопирован с любого кластерного узла. Или выполняем перезагрузку кластера и этот каталог, как и прочие кластерные папки, автоматически появятся на своих местах.

Создаём каталог /etc/airflow для хранения airflow.cfg , сертификатов, keytab'ов и прочих подобных файлов:

Создание systemd-юнитов

Kerberos в Apache Airflow не работает, но приведу здесь этот 'unit' в надежде на светлое будущее:

Не забываем выполнить:

Одноразовый запуск Airflow для создания типового файла /etc/airflow/airflow.cfg , с последующей зачисткой каталога '/data/airflow', в которой появятся ненужные в дальнейшем файлы:

  • Первый запуск Apache Airflow должен закончиться ошибкой, а в каталоге /etc/airflow должен появиться типовой файл airflow.cfg .

Создание базы данных airflow в PostgreSQL

Далее приведены команды psql для создания базы данных:

Так как машина находится в домене FreeIPA, то создаём ключ и сертификаты встроенными средствами FreeIPA.

Выполняем работу из-под root'а.

  1. Сначала получаем kerberos-билет.
  2. Добавляем 'alias' для существующего принципала, соответствующего хосту.
  3. Создаём ключ и запрашиваем сертификат к нему одной командой.
  4. Проверяем, что сертификат добавлен и будет обновляться в срок (auto-renew: yes).
  5. Добавляем CNAME-запись в DNS-зону.

Если понадобится снять сертификат с отслеживания, то выполняем getcert list , выбираем сертификат и даём команду снятия с отслеживания:


В этой статье я собираюсь обсудить Apache Airflow, систему управления рабочим процессом, разработанную Airbnb.

Раньше у меня былообсуждаетсянаписание основных ETL-конвейеров в Bonobo. Bonobo хорош для написания ETL-конвейеров, но мир не только для написания ETL-конвейеров для автоматизации вещей. Существуют и другие варианты использования, в которых вы должны выполнять задачи в определенном порядке один раз или периодически. Например:

  • Мониторинг заданий Cron
  • передача данных из одного места в другое.
  • Автоматизация ваших операций DevOps.
  • Периодически получать данные с веб-сайтов и обновлять базу данных для вашей удивительной системы сравнения цен.
  • Обработка данных для систем на основе рекомендаций.
  • Трубопроводы машинного обучения.

Прежде чем мы продолжим внедрение Airflow в наших системах, давайте обсудим, что на самом деле является Airflow и его терминологии.

Airflow - это платформа для программного создания, планирования и мониторинга рабочих процессов.

Использование воздушного потока для создания рабочих процессов в виде направленных ациклических графов (DAG) задач. Планировщик воздушного потока выполняет ваши задачи на массиве рабочих, следуя указанным зависимостям. Богатые утилиты командной строки делают выполнение сложных операций на DAG проще простого. Богатый пользовательский интерфейс позволяет легко визуализировать конвейеры, работающие на производстве, отслеживать прогресс и устранять проблемы при необходимости.

По сути, это помогает автоматизировать сценарии для выполнения задач. Airflow основан на Python, но вы можете выполнять программу независимо от языка. Например, на первом этапе вашего рабочего процесса необходимо выполнить программу на C ++ для анализа изображений, а затем программу на Python для передачи этой информации в S3. Возможности бесконечны.

В математике и информатике ориентированный ациклический граф (DAG / ˈdæɡ / (Об этом звуке слушайте)) - это конечный ориентированный граф без направленных циклов. То есть он состоит из конечного числа вершин и ребер, причем каждое ребро направлено из одной вершины в другую, так что нет никакого способа начать с любой вершины v и следовать последовательно направленной последовательности ребер, которая в конечном итоге снова возвращается к v. , Эквивалентно, DAG - это ориентированный граф, который имеет топологический порядок, последовательность вершин, так что каждое ребро направлено от более раннего к более позднему в последовательности.

Позвольте мне попытаться объяснить простыми словами: Вы можете быть только сыном своего отца, но не наоборот. Хорошо, это глупо или странно, но не смог найти лучшего примера, чтобы объяснитьнаправленный цикл,


В Airflow все рабочие процессы являются группами DAG. Даг состоит изоператоры.Оператор определяет отдельную задачу, которая должна быть выполнена. Доступны различные типы операторов (как указано на сайте Airflow):

Вы также можете придумать пользовательский оператор в соответствии с вашими потребностями.

Воздушный поток основан на Python. Лучший способ установить это через pip орудие труда.

pip install apache-airflow

Чтобы проверить, установлен ли он, выполните команду: airflow version и это должно напечатать что-то вроде:

Вам нужно будет установить mysqlclient а также включить MySQL в ваши рабочие процессы. Это необязательно, хотя.

pip install mysqlclient

Прежде чем что-либо запускать, создайте папку и установите ее как AIRFLOW_HOME , В моем случае это airflow_home , После создания вы позвоните export Команда, чтобы установить его в пути.

Убедитесь, что вы находитесь в папке выше airflow_home перед запуском export команда. В airflow_home вы создадите еще одну папку для хранения групп DAG. Назови это dags

Если вы установите load_examples=False он не будет загружать примеры по умолчанию в веб-интерфейсе.

Теперь вам нужно позвонить airflow initdb в airflow_home папки. Как только это сделано, это создает airflow.cfg а также unitests.cfg

airflow.db файл SQLite для хранения всей конфигурации, связанной с рабочими процессами airflow.cfg сохранить все начальные настройки, чтобы все работало.

В этом файле вы можете увидеть sql_alchemy_conn параметр со значением ../airflow_home/airflow.db

Вы можете использовать MySQL, если хотите. А пока просто придерживайтесь основных настроек.

Пока все хорошо, теперь, не теряя времени, давайте запустим веб-сервер.

При запуске он показывает экран как:

Теперь, когда вы посещаете 0.0.0.0:8080 это показывает экран как:


Вы можете увидеть кучу записей здесь. Это пример, поставляемый с установкой Airflow. Вы можете отключить их, посетив airflow.cfg файл и набор load_examples в FALSE

DAG Runsскажите, сколько раз был выполнен определенный DAG.Недавние задачисообщает, какая задача из множества задач в группе DAG в настоящее время выполняется и каков ее статус.Графикпохож на тот, который вы использовали бы при планировании Cron, поэтому я не буду сейчас на этом останавливаться Графикотвечает, в какое время должен запускаться этот определенный DAG.


Вот скриншот DAG, который я создал ранее и выполнил. Вы можете видеть прямоугольники, представляющие задачу. Вы также можете видеть различные цветные рамки в правом верхнем углу серого поля с именем:успех,Бег,не удалосьи т.д. Это легенды. На картинке выше вы можете отметить, что все поля имеют зеленую рамку, но если вы не уверены, наведите указатель мыши на легенду успеха, и вы увидите экран, показанный ниже:


Вы, возможно, заметили цвет фона / заливки этих коробок - зеленый и тростниковый. В левом верхнем углу серого поля вы можете видеть, почему они имеют такие цвета, этот цвет фона представляет различные типы операторов, используемых в этой группе обеспечения доступности баз данных. В этом случае мы используемBashOperatorа такжеPythonOperator.

Мы будем работать над базовым примером, чтобы увидеть, как он работает. Я буду объяснять пример. в dags папка, которая была ранее создана в airflow_home/ мы создадим наш первый образец DAG. Итак, я собираюсь создать файл с именем, my_simple_dag.py

Самое первое, что вы собираетесь сделать после импорта, это написать подпрограммы, которые будут служитьзадачидляоператоры, Мы будем использовать смесь BashOperator а также PythonOperator ,

Это две простые процедуры, которые ничего не делают, но возвращают текст. Я скажу вам позже, почему я пишу что-то в текстовом файле. Следующие вещи, которые я собираюсь сделать, это определить default_args и создать DAG экземпляр.

Здесь вы устанавливаете кучу параметров в default_args dict переменная.

start_date говорит с тех пор, когда этот DAG должен начать выполнение рабочего процесса. это start_date может принадлежать прошлому. В моем случае это 22 сентября и 11 утра UTC. Эта дата уже прошла для меня, потому что уже 11:15универсальное глобальное времядля меня. Вы всегда можете изменить этот параметр с помощью airflow.cfg файл и установить свой собственный часовой пояс. Пока UTC мне подходит. Если вам все еще интересно, какое время используется Airflow, проверьте в правом верхнем углу веб-интерфейса Airflow, вы должны увидеть что-то вроде приведенного ниже. Вы можете использовать это как ссылку для планирования ваших задач.


retries Параметр повторяется, чтобы запустить DAGИксколичество раз в случае неудачного выполнения. concurrency Параметр помогает определять количество процессов, которые необходимо использовать для выполнения нескольких групп доступности баз данных. Например, ваша группа обеспечения доступности баз данных должна запустить 4 последних экземпляра, также называемые какзасыпкас интервалом в 10 минут (Я расскажу об этой сложной теме в ближайшее время) и вы установили concurrency в 2 тогда2 DAGбудет работать одновременно и выполнять задачи в нем. Если вы уже реализовали multiprocessing в вашем Python тогда вы должны чувствовать себя здесь как дома.

Сейчас используюДиспетчер контекстамы определяем DAG с его свойствами, первый параметр - это ID DAG, в нашем случае это my_simple_dag второй параметр, который мы уже обсудили, третий параметр - это то, что необходимо обсудить вместе с start_date что упомянуто в default_args ,

В этомДиспетчер контекста,Вы назначаете операторов вместе с идентификаторами задач. В нашем случае эти операторы помечены как: opr_hello opr_greet opr_sleep а также opr_respond , Эти имена затем появляются в прямоугольных прямоугольниках, описанных выше.

Прежде чем двигаться дальше, я лучше обсудитьDAG Runsа такжепланировщики какую роль они играют во всем рабочем процессе.

Что такое планировщик воздушного потока?

Планировщик воздушного потока - это процесс мониторинга, который выполняется постоянно и запускает выполнение задачи на основе schedule_interval а также execution_date.

Что такое DagRun?

DagRunэто экземпляр группы обеспечения доступности баз данных, которая будет выполняться одновременно. Когда он запустится, все задачи внутри него будут выполнены.

Выше диаграмма, которая может помочь выяснить оDAGRun:-)

Предположим, start_date являетсяСентябрь, 24,2018 12:00:00 UTCи вы начали DAG в12:30:00 UTCс schedule_interval из* / 10 * * * * (через каждые 10 минут).Используя тот же default_args обсуждаемые выше параметры, ниже будут записи DAG, которые будут запускаться мгновенно, по одному в нашем случае из-за concurrency является 1 :


Почему это происходит? Ну, вы несете ответственность за это. Воздушный поток дает вам возможность пройти мимо DAG. Процесс прохождения DAG называетсязасыпка, Процесс Backfill фактически позволяет Airflow устанавливать некоторый статус всех групп доступности баз данных с момента его создания. Эта функция была предоставлена ​​для сценариев, в которых вы используете группу обеспечения доступности баз данных, которая запрашивает некоторую базу данных или API, например, Google Analytics, для извлечения предыдущих данных и включения их в рабочий процесс. Даже если прошлых данных нет, Airflow все равно будет их запускать, чтобы сохранить состояние всего рабочего процесса в целости и сохранности.

После запуска прошлых групп доступности базы данных следующий (тот, который вы собираетесь запустить, будет запущен) в12:40:00 UTC.Помните, что независимо от установленного вами графика, DAG запускается ПОСЛЕ того времени, в нашем случае, если он должен работатьпослекаждые 10 минут, он будет работать после 10 минут.

Давай поиграем с этим. я поворачиваю my_simple_dag и затем запустите планировщик


Как только вы запустите, вы увидите экран с меткой:


Некоторые задачи поставлены в очередь. Если вы нажмете на DAG Id, my_simple_dag Вы увидите экран, как показано ниже:


Обратите внимание на отметку времени вИдентификатор запускаколонка. Вы видите образец? Первый исполнен в 10:00, затем 10:10, 10:20. Затем он останавливается, позвольте мне еще раз уточнить, что группа обеспечения доступности баз данных запускается по истечении 10 минут.планировщикначалось в 10:30 утра. так что заполнено прошло3с разницей10 минутинтервала.

DAG, которая была выполнена для10:30:00 UTCна самом деле было сделано в10:40:00 UTC, Последняя запись DAGRun всегда будет на один минус меньше, чем текущее время. В нашем случае время машины было10:40:00 UTC


Если вы наведите курсор на один из кругов, вы можете увидеть отметку времени передБег:это говорит время, когда оно было выполнено. Вы можете видеть, что эти зеленые круги имеют разницу во времени в 10 минут.В виде дерева«Да» немного сложнее, но дает полную картину всего вашего рабочего процесса. В нашем случае он был запущен 4 раза, и все задачи были выполнены успешно, темно-зеленого цвета.

Вы можете избежать обратной засыпки двумя способами: start_date будущего или набора catchup = False в DAG экземпляр. Например, вы можете сделать что-то вроде ниже:

Установив catchup=False тогда не имеет значения, является ли ваш start_date принадлежит прошлому или нет. Он будет выполняться с текущего времени и продолжается. Установив end_date вы можете заставить DAG перестать работать сам.

Строка, которую вы видите выше, говорит о связи между операторами, следовательно, создает весь рабочий процесс. Побитовый оператор здесь говорит о связи между операторами. Вот opr_hello сначала бегает, а потом отдыхает. Поток выполняется слева направо. В наглядной форме это выглядит так:


Если вы измените направление последнего оператора, поток будет выглядеть следующим образом:

respond задача будет выполняться параллельно и sleep будет выполнять в обоих случаях.

В этой статье я рассказал о том, как вы можете внедрить комплексную систему рабочих процессов для планирования и автоматизации ваших рабочих процессов. Во второй части я приведу пример из реальной жизни, чтобы показать, как можно использовать Airflow. Я хотел скрыть это в этом посте, но он уже стал достаточно длинным и объясняющимDAGRunПонятие было необходимо, поскольку мне потребовалось много времени, чтобы понять это.

Читайте также: