DataOps & Airflow

Michelle Mesquita
4 min readMar 28, 2022

--

Ultimamente, venho estudando cada vez mais a integridade dos dados para permitir maior confiabilidade e qualidade para as aplicações. Isso ocorre justamente por conta do papel do Engenheiro de DevSecOps dentro das equipes

Por conta disso, um outro conceito que tenho observado bastante é o DataOps. Então, vale explicar porque eu acredito que o DataOps, DevOps e o Engenheiro de dados podem ter a mesma função

Sabemos que o Engenheiro de Dados possui o papel de distribuir o dado com maior eficiência, além de manter a pipeline de ETL com bastante qualidade. Portanto, ele se preocupa com o a extração, transformação e o carregamento desse dado, que muitas vezes, será armazenado em um banco de dados (aqui, devemos pensar em como será armazenado para garantirmos que os dados permanceram seguros e sem alteração)

O DataOps e o DevOps são culturas que se preocupam com a qualidade do dado. No entanto, o DevOps está mais voltado para o ciclo do software como um todo e não apenas com o dado

Aqui, podemos perceber que os papeis são muito parecidos. Por conta disso, resolvi aprender sobre o Airflow 😄

O Airflow é um orquestrador, ou seja, gerencia as Pipelines e foca justamente no processo de ETL de maneira automatizada. Ele pode orquestrar Pipelines de diferentes linguagens. Aqui utilizaremos Python e Bash, para nosso caso.

E o que são Pipelines?
São esteiras criadas para executar uma automação, onde existem diferentes steps utilizando a DAG

→ Caso queiram instalar, podemos fazer localmente ou com Docker:

Para inicializar o Airflow na porta 8080, utilize esse comando:

➜ airflow webserver -p 8080

Outro conceito importante entender são as DAG (Directed Acyclic Graph) → É um processo que possui uma coleção de tarefas (tasks), que podem ser divididas em pequenos processos/nós, que criamos na Pipeline

Uma forma de criarmos as tasks é separamos por etapas (steps/nós), assim como fazemos com as etapas vistas no ETL

Visto isso, criaremos uma DAG para exemplificar uma pipeline, onde a segunda etapa acontecerá após o primeiro nó terminar de executar

  • Podemos observar a pipeline, conforme 2 nós:

→ Hello_Bash

→ Hello_Python

Um ponto interessante, são as formas de visualização:

  • A visualização de Árvore (tree). Aqui, podemos perceber como Hello_Python espera o Hello_Bash terminar de rodar e podemos observar de acordo com as cores, o status de running e success, conforme a imagem abaixo

Agora, vamos ao código :)

from airflow import DAGfrom airflow.operators.bash_operator import BashOperatorfrom airflow.operators.python_operator import PythonOperatorfrom datetime import datetime, timedeltadefault_args = {'owner': 'Michelle','depends_on_past' : False,'start_date' : datetime(2020,11,15,15),'email': ['airflow@airflow.com'],'email_on_failure' : False,'email_on_retry': False,"retries": 1,"retry_delay": timedelta(minutes=1)}dag = DAG("first-dag",description="Básico de Bash Operator e Python",default_args=default_args,schedule_interval=timedelta(minutes=2))#tarefashello_bash = BashOperator(task_id="Hello_Bash",bash_command='echo "Hello world :)"',dag=dag)def say_hello():  print("Hello from Py :D")hello_python = PythonOperator(task_id="Hello_Python",python_callable=say_hello,dag=dag)hello_bash >> hello_python
  • Para escolhermos a ordem das tasks, somente precisamos utilizar esse comando >>
hello_bash >> hello_python

Para criarmos um código em Bash, utilizamos o BashOperator e o bash_command. Já para o Python, utilizamos o PythonOperator e o python_callable para chamarmos as funções

Operator serve para criar diferentes tipos de tasks, enquanto que bash_command/python_callable servem para adicionarmos o código referente a essas linguagens.

Outro fator importante, é declararmos a função em Python que queremos chamar . O nome do processo que vemos no Airflow, conforme a primeira imagem, ocorre nessa função: task_id

Assim, podemos ver o log formado para cada etapa, por exemplo no segundo step:

Um ponto adicional é olharmos os outros operadores. Assim, é possível, inclusive, criarmos condições de pipeline (if/else) com o airflow.operators.branch

→ Aqui, podemos ver outros comandos:

→ Caso queiram saber mais sobre DAGs:

Para escolhermos quando iniciar uma pipeline após ter sido criada, utilizamos o comando: schedule_interval .

Assim, podemos utilizar, inclusive, cron job para configurarmos um horário de quando essa pipeline poderá rodar.

  • Aqui está uma dica de calculador para o Crontab job ;)

Espero que o conteúdo possa ajudá-los a iniciar a primeira pipeline de dados de vocês 💜

--

--

Michelle Mesquita
Michelle Mesquita

Written by Michelle Mesquita

DevSecOps & AppSec Engineer & Developer girl 👩‍💻

No responses yet