32
loading...
This website collects cookies to deliver better user experience
compression
, export_format
). Just to create a set of assumptions, I do not want to have the header row (print_header
), and I use comma (,
) as text file delimiter (field_delimiter
). The partitions are created with the wildcard (*
) character in the end of the file name (destination_cloud_storage_uris
).from airflow.models import DAG
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import (
BigQueryToGCSOperator)
from airflow.utils.dates import days_ago
# Define configuration global variables
with DAG(
'gcp_dag',
schedule_interval=None,
start_date=days_ago(1),
tags=['example'],
) as dag:
bigquery_to_gcs = BigQueryToGCSOperator(
gcp_conn_id='gcp_connection_id',
task_id='bigquery_to_gcs',
compression='GZIP',
export_format='CSV',
field_delimiter=',',
print_header=False,
source_project_dataset_table=f'{DATASET_NAME}.{TABLE}',
destination_cloud_storage_uris=[
f'gs://{DATA_EXPORT_BUCKET_NAME}/{EXPECTED_FILE_NAME}-*.csv.gz',
],
)
# Define other operations
compose_files_into_one
) with bucket_name
, source_object_prefix
, destination_object
, gcp_conn_id
parameters. Those parameters are provided as keys in the Python Operator's op_kwargs
argument.from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
# Define configuration global variables
with DAG(
'gcp_dag',
schedule_interval=None,
start_date=days_ago(1),
tags=['example'],
) as dag:
# Previous operations
compose_files = PythonOperator(
task_id='gcs_compose',
python_callable=compose_files_into_one,
op_kwargs={
'bucket_name': DATA_EXPORT_BUCKET_NAME,
'source_object_prefix': EXPECTED_FILE_NAME,
'destination_object': f'{EXPECTED_FILE_NAME}.csv.gz',
'gcp_conn_id': 'gcp_connection_id'
},
)
# Any other operations
compose_files_into_one
, all the magic happens there. compose_files_into_one
is a function contains all the hook logic. It uses the GCSHook
as a client to list all the objects with the given prefix. Then it composes the partition files into one gzip file.from airflow.providers.google.cloud.hooks.gcs import GCSHook
def compose_files_into_one(bucket_name: str,
source_object_prefix: str,
destination_object: str,
gcp_conn_id: str) -> None:
'''Composes wildcarded files into one in the given destination'''
gcs_hook = GCSHook(
gcp_conn_id=gcp_conn_id
)
list_of_objects = gcs_hook.list(
bucket_name,
prefix=source_object_prefix
)
gcs_hook.compose(
bucket_name,
source_objects=list_of_objects,
destination_object=destination_object
)
bucket_name
, a gcp_conn_id
, and a prefix
parameter. With the prefix
parameter, we can filter out the objects to be deleted. So, if the partitions start with <defined_text_file_name>-<partition_number>
, then we can use <defined_text_file_name>-
to filter out all the partitions to be deleted.from airflow.models import DAG
from airflow.providers.google.cloud.operators.gcs import (
GCSDeleteObjectsOperator)
from airflow.utils.dates import days_ago
with DAG(
'gcp_dag',
schedule_interval=None,
start_date=days_ago(1),
tags=['example'],
) as dag:
# Previous operations
delete_combined_objects = GCSDeleteObjectsOperator(
task_id='gcs_combined_files_delete',
gcp_conn_id='gcp_connection_id',
bucket_name=DATA_EXPORT_BUCKET_NAME,
prefix=f'{EXPECTED_FILE_NAME}-'
)
# DAG definition (dependency)