34
loading...
This website collects cookies to deliver better user experience
Airflow is a platform created by the community to programmatically author, schedule and monitor workflows.
default_args = {
'owner': 'sunny',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'load_nyc_taxi_data',
default_args=default_args,
description='DAG to load NYC Taxi data to Hive',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['sunny', 'sample'],
) as dag:
create database
query in Hive. But I’m not going to login to the Hive shell and then execute this query. Instead, I have written a .hql
file that I’ll submit to the hive command, which will in turn execute the query that’s in the file. The .hql
itself is pretty simple:create database if not exists nyc;
create_database.hql
. And how exactly do I submit this to hive? That’s pretty simple too:hive -f /mnt/d/code/poc/airflow/create_database.hql
create_database = BashOperator(
task_id='create_database',
bash_command='hive -f /mnt/d/code/poc/airflow/create_database.hql',
dag=dag
)
task_id
. This is the ID that will uniquely identify the task in the given DAG. So make sure you don’t have duplicate task IDs. The bash_command
is the actual bash command that will be executed when Airflow triggers this task. That’s pretty much it for this task. Let’s move to the next task..hql
file called create_table.hql
. The query itself is as follows:create table if not exists nyc.nyc_yellow_taxi_trips (
VendorID int,
tpep_pickup_datetime timestamp,
tpep_dropoff_datetime timestamp,
passenger_count int,
trip_distance double,
RatecodeID int,
store_and_fwd_flag string,
PULocationID int,
DOLocationID int,
payment_type int,
fare_amount double,
extra double,
mta_tax double,
tip_amount double,
tolls_amount double,
improvement_surcharge double,
total_amount double,
congestion_surcharge double
)
COMMENT 'NYC Yellow Taxi Trips'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
hive -f /mnt/d/code/poc/airflow/create_table.hql
create_table = BashOperator(
task_id='create_table',
bash_command='hive -f /mnt/d/code/poc/airflow/create_table.hql',
dag=dag
)
wget
command to get the .csv
file and place it in a path that I have already created. You don’t have to manually create the path because in most cases, the path will be constant. Anyway, the wget
command itself is straightforward:wget https://nyc-tlc.s3.amazonaws.com/trip+data/yellow_tripdata_2020-12.csv -P /mnt/d/Backup/hive_sample_datasets/nyc_taxi_trips/nyc_yellow_taxi_trips/
download_dataset.sh
. Now you might ask that because this is a bash command, and we have the BashOperator which can directly take the bash command, why are you creating a shell script to run this command? Valid question. Because we have a file for all the queries, I thought we’ll continue with that trend and create a file for this as well. You can directly use the bash command in the BashOperator itself. That should work the same way. Anyway, the task definition for this is:download_nyc_yellow_taxi_data = BashOperator(
task_id='download_nyc_yellow_taxi_data',
bash_command='/mnt/d/code/poc/airflow/download_dataset.sh ',
dag=dag
)
.hql
file called load_data_to_table.hql
. The query itself is as follows:load data local inpath '/mnt/d/Backup/hive_sample_datasets/nyc_taxi_trips/nyc_yellow_taxi_trips/yellow_tripdata_2020-12.csv' into table nyc.nyc_yellow_taxi_trips;
load_data_to_table = BashOperator(
task_id='load_data_to_table',
bash_command='hive -f /mnt/d/code/poc/airflow/load_data_to_table.hql',
dag=dag
)
>>
operand to chain two tasks together. For example:task1 >> task2
task1
will be executed first. Once that’s complete, task2
will be executed. This will happen in sequence. But what if you have another task, task3
that you want to trigger after task1
along with task2
? You can do that in parallel like this:task1 >> task2
task1 >> task3
task1
is complete, tasks task2
and task3
will be executed in parallel. We don’t need this parallelism in our example though. For us, the sequence is as follows:start >> create_database >> create_table >> download_nyc_yellow_taxi_data >> load_data_to_table >> end