55
loading...
This website collects cookies to deliver better user experience
If you have been used to using the Apache Airflow cli to work with variables, then some of the Airflow commands may or may not be available to you. You can check out the summarised list of Airflow commands that work here and you will notice that this does vary between Airflow 1.x and 2.x
You cannot use the Apache Airflow REST API within MWAA. Instead, use the Apache Airflow cli either via the MWAA API or via the bash and python operators within a DAG
Note! Further reading
There are lots of great blog posts that dive into detail on how to work with variables in Apache Airflow. I can recommend these ones to get started (on top of the documentation you will find at airflow.apache.org)
https://marclamberti.com/blog/variables-with-apache-airflow/
https://www.applydatascience.com/airflow/airflow-variables/
processor_poll_interval = 1
Note! The quickest way of knowing what parameters your operators support for passing in these Jinja templates is to either check the documentation of the operator, and/or review the source code of the operator and look for the "templated" value which tells you which parameters are supported.
If we look at the source code for the bash operator we can see the following lines in the code
template_fields = ('bash_command', 'env')
template_fields_renderers = {'bash_command': 'bash', 'env': 'json'}
Which show that this supports templating when using either the bash_command or env argument within the Bash operator as follows
param = BashOperator(task_id="demo", bash_command="echo {{ params.my_param }}",
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
with DAG(dag_id="info-mwaa-env", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
env_cli_command = BashOperator(
task_id="env_cli_command",
bash_command="env"
)
MWAA uses the standard Apache Airflow roles; Admin, User, Ops, etc - Admin and Ops both enable the Admin tab within the UI but you may not want to provide that to your Airflow users, and just provide them with the level of access needed to view/trigger their workflows (User)
{
"cli_test": "newvalue/",
"cli_test2": "12341234",
"cli_test3": "https://grouplens.org/datasets/movielens/latest/"
}
secrets.backend: airflow.contrib.secrets.aws_secrets_manager.SecretsManagerBackend
secrets.backend_kwargs: '{"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}'
secrets.backend : airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend,
secrets.backend_kwargs: '{"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}'
Note! In the GitHub repo you will find a example CDK application that deploys a MWAA environment with the minimum permissions scoped down to just the variables you want MWAA to access.
Note! Remember to ensure that these are both in the same AWS region, as MWAA will look in the current region it is deploy into for those secrets
Check out the blog from John Jackson, Move your Apache Airflow connections and variables to AWS Secrets Manager where he dives deep into this.
{
"cli_test": "newvalue/",
"cli_test2": "12341234",
"cli_test3": "https://grouplens.org/datasets/movielens/latest/"
}
aws secretsmanager create-secret --name airflow/variables/cli_test --description "Cli variable 1" --secret-string "newvalue/" --region={your region}"
aws secretsmanager create-secret --name airflow/variables/cli_test2 --description "Cli variable 2" --secret-string "007007" --region={your region}"
aws secretsmanager create-secret --name airflow/variables/cli_test3 --description "Cli variable 3" --secret-string "https://grouplens.org/datasets/movielens/latest/" --region={your region}
{
"ARN": "arn:aws:secretsmanager:eu-central-1:704533066374:secret:airflow/variables/cli_test-OsNI9S",
"Name": "airflow/variables/cli_test",
"VersionId": "988f7bd5-b0fb-47ca-ab0b-343c665b0d24"
}
aws secretsmanager describe-secret --secret-id airflow/variables/cli_test
{
"ARN": "arn:aws:secretsmanager:eu-central-1:704533066374:secret:airflow/variables/cli_test-OsNI9S",
"Name": "airflow/variables/cli_test",
"Description": "Cli variable 1",
"LastChangedDate": 1627293097.284,
"VersionIdsToStages": {
"988f7bd5-b0fb-47ca-ab0b-343c665b0d24": [
"AWSCURRENT"
]
},
"CreatedDate": 1627293097.25
}
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
demo = Variable.get("cli_test", default_var="undefined")
with DAG(
dag_id="aws_secrets_variables",
schedule_interval=None,
catchup=False,
start_date=days_ago(1)) as dag:
get_var_sec1 = BashOperator(
task_id="disp_aws_secret_variable_1",
bash_command='echo "The value of the variable is: {var}"'.format(var=demo),
)
get_var_sec2 = BashOperator(
task_id="disp_aws_secret_variable_2",
bash_command='echo "The value of the second variable is: {{var.value.cli_test2}}"',
)
[2021-07-26 11:31:40,040] {{bash.py:158}} INFO - Running command: echo "The value of the variable is: newvalue/"
[2021-07-26 11:31:40,078] {{bash.py:169}} INFO - Output:
[2021-07-26 11:31:40,113] {{bash.py:173}} INFO - The value of the variable is: newvalue/
...
...
[2021-07-26 11:31:40,089] {{bash.py:158}} INFO - Running command: echo "The value of the second variable is: 123456"
[2021-07-26 11:31:40,185] {{bash.py:169}} INFO - Output:
[2021-07-26 11:31:40,273] {{bash.py:173}} INFO - The value of the second variable is: 123456
[2021-07-26 11:56:45,329] {{bash.py:158}} INFO - Running command: echo "The value of the second variable is: 123456"
[2021-07-26 11:56:45,402] {{bash.py:173}} INFO - The value of the second variable is: 123456
An error occurred (InvalidRequestException) when calling the GetSecretValue operation: You can't perform this operation on the secret because it was marked for deletion.
get_var_sec2 = BashOperator(
task_id="disp_aws_secret_variable_2",
bash_command='echo "The value of the second variable is: {{var.value.cli_test4}}"',
)
KeyError: 'Variable cli_test4 does not exist'
[2021-07-26 12:28:18,320] {{bash.py:158}} INFO - Running command: echo "The value of the second variable is: I am feeling better now"
[2021-07-26 12:28:18,399] {{bash.py:173}} INFO - The value of the second variable is: I am feeling better now
"{{ dag_run.conf }}"
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id="airflow_variables_atruntime",
schedule_interval=None,
catchup=False,
start_date=days_ago(1)) as dag:
get_var_filename = BashOperator(
task_id="get_var_filename",
bash_command='echo "You are running this DAG with the following variable file: \'{{ dag_run.conf["cli_test"] if dag_run.conf else "" }}\'"',
)
get_var_filename = BashOperator(
task_id="get_var_filename2",
bash_command='echo "You are running this DAG with the following variable file: \'{{ dag_run.conf["cli_test2"] if dag_run.conf else "" }}\'"',
)
get_var_filename = BashOperator(
task_id="get_var_filename3",
bash_command='echo "You are running this DAG with the following variable file: \'{{ dag_run.conf["cli_test3"] if dag_run.conf else "" }}\'"',
)
[2021-07-26 12:54:57,278] {{bash.py:158}} INFO - Running command: echo "You are running this DAG with the following variable file: ''"
[2021-07-26 12:54:57,379] {{bash.py:173}} INFO - You are running this DAG with the following variable file: ''
Note! If when you trigger the DAG and leave the blank, you get errors such as
jinja2.exceptions.UndefinedError: 'dict object' has no attribute 'cli_test'
check to make sure you are using dag_run.conf and not just dag_run. You will see in the examples below the correct format that allows you to combine a number of different sources for your variables.
{
"cli_test": "newvalue/",
"cli_test2": "12341234",
"cli_test3": "https://grouplens.org/datasets/movielens/latest/"
}
[2021-07-26 12:54:57,278] {{bash.py:158}} INFO - Running command: echo "You are running this DAG with the following variable file: '12341234'"
[2021-07-26 12:54:57,379] {{bash.py:173}} INFO - You are running this DAG with the following variable file: '12341234'
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id="airflow_variables_atruntime",
schedule_interval=None,
catchup=False,
start_date=days_ago(1)) as dag:
get_var_filename = BashOperator(
task_id="get_var_filename",
bash_command="""echo 'You are running this DAG with the following variable file: "{{ dag_run.conf["cli_test"] if dag_run.conf else var.value.cli_test }}"'""",
)
get_var_filename = BashOperator(
task_id="get_var_filename2",
bash_command='echo "You are running this DAG with the following variable file: \'{{ dag_run.conf["cli_test2"] if dag_run.conf else var.value.cli_test2 }}\'"',
)
get_var_filename = BashOperator(
task_id="get_var_filename3",
bash_command='echo "You are running this DAG with the following variable file: \'{{ dag_run.conf["cli_test3"] if dag_run.conf else var.value.cli_test3 }}\'"',
)
{
"cli_test": "override/",
"cli_test2": "007007",
"cli_test3": "https://bbc.co.uk"
}
[2021-07-26 15:54:34,309] {{bash.py:158}} INFO - Running command: echo 'override/'
[2021-07-26 15:54:34,374] {{bash.py:173}} INFO - override/
[2021-07-26 15:54:04,899] {{bash.py:158}} INFO - Running command: echo 'newvalue/'
[2021-07-26 15:54:04,967] {{bash.py:173}} INFO - newvalue/
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id="airflow_variables_atruntime",
schedule_interval=None,
catchup=False,
start_date=days_ago(1)) as dag:
get_var_filename = BashOperator(
task_id="get_var_filename",
bash_command="""echo 'You are running this DAG with the following variable file: "{{ dag_run.conf["cli_test"] if dag_run.conf else var.value.cli_test }}"'""",
)
get_var_filename = BashOperator(
task_id="get_var_filename2",
bash_command='echo "You are running this DAG with the following variable file: \'{{ dag_run.conf["cli_test2"] if dag_run.conf else var.value.cli_test4 }}\'"',
)
get_var_filename = BashOperator(
task_id="get_var_filename3",
bash_command='echo "You are running this DAG with the following variable file: \'{{ dag_run.conf["cli_test3"] if dag_run.conf else var.value.cli_test3 }}\'"',
)
{
"cli_test": "override/"
}
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id="airflow_variables_atruntime",
schedule_interval=None,
catchup=False,
start_date=days_ago(1)) as dag:
get_var_filename = BashOperator(
task_id="get_var_filename",
bash_command="""echo 'You are running this DAG with the following variable file: "{{ dag_run.conf["cli_test"] if dag_run.conf.get("cli_test") else var.value.cli_test }}"'""",
)
get_var_filename = BashOperator(
task_id="get_var_filename2",
bash_command='echo "You are running this DAG with the following variable file: \'{{ dag_run.conf["cli_test2"] if dag_run.conf.get("cli_test2") else var.value.cli_test3 }}\'"',
)
get_var_filename = BashOperator(
task_id="get_var_filename3",
bash_command='echo "You are running this DAG with the following variable file: \'{{ dag_run.conf["cli_test3"] if dag_run.conf.get("cli_test3") else var.value.cli_test4 }}\'"',
)
airflow variables {command}
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id="import_airflow_variables",
schedule_interval=None,
catchup=False,
start_date=days_ago(1)) as dag:
get_var_filename = BashOperator(
task_id="get_var_filename",
bash_command='echo "You are importing the following variable file: \'{{ dag_run.conf["var_filename"] if dag_run.conf else "variables.json" }}\'"',
)
task1 = BashOperator(
task_id="create_variables",
bash_command='airflow variables --import /usr/local/airflow/dags/{{ dag_run.conf["var_filename"] if dag_run.conf else "variables.json"}}'
)
.
├── import-var-dag.py
└── variables.json
[2021-07-27 08:29:13,721] {{bash_operator.py:146}} INFO - Running command: echo "You are importing the following variable file: 'variables.json'"
[2021-07-27 08:29:13,761] {{bash_operator.py:153}} INFO - Output:
[2021-07-27 08:29:13,790] {{bash_operator.py:157}} INFO - You are importing the following variable file: 'variables.json'
[2021-07-27 08:29:13,818] {{bash_operator.py:161}} INFO - Command exited with return code 0
[2021-07-27 08:29:13,858] {{taskinstance.py:1070}} INFO - Marking task as SUCCESS.dag_id=import_airflow_variables, task_id=get_var_filename, execution_date=20210727T082909, start_date=20210727T082912, end_date=20210727T082913
{"var_filename":"my_vars.json"}
[2021-07-27 08:33:19,467] {{bash_operator.py:146}} INFO - Running command: echo "You are importing the following variable file: 'my_vars.json'"
[2021-07-27 08:33:19,499] {{bash_operator.py:153}} INFO - Output:
[2021-07-27 08:33:19,523] {{bash_operator.py:157}} INFO - You are importing the following variable file: 'my_vars.json'
[2021-07-27 08:33:19,547] {{bash_operator.py:161}} INFO - Command exited with return code 0
[2021-07-27 08:33:19,589] {{taskinstance.py:1070}} INFO - Marking task as SUCCESS.dag_id=import_airflow_variables, task_id=get_var_filename, execution_date=20210727T083312, start_date=20210727T083318, end_date=20210727T083319
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id="import_airflow_variables",
schedule_interval=None,
catchup=False,
start_date=days_ago(1)) as dag:
get_var_filename = BashOperator(
task_id="get_var_filename",
bash_command='echo "You are importing the following variable file: \'{{ dag_run.conf["var_filename"] if dag_run.conf else "variables.json" }}\'"',
)
task1 = BashOperator(
task_id="create_variables",
bash_command='airflow variables import /usr/local/airflow/dags/{{ dag_run.conf["var_filename"] if dag_run.conf else "variables.json"}}'
)
#!/bin/bash
[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit
if [[ $2 == "" ]]; then
dag="variables"
elif [ $2 == "--set" ] || [ $2 == "-s" ] || [ $2 == "--get" ] || [ $2 == "-g" ] || [ $2 == "-x" ] || [ $2 == "--delete" ]; then
dag="variables $2 $3 $4 $5"
else
echo "Not a valid command"
exit 1
fi
CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \
&& CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
&& WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
&& CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \
--header "Authorization: Bearer $CLI_TOKEN" \
--header "Content-Type: text/plain" \
--data-raw "$dag" ) \
&& echo "Output:" \
&& echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \
&& echo "Errors:" \
&& echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode
./mwaa-v1-variables.sh ricsue-dublin --delete cli_test
./mwaa-v1-variables.sh ricsue-dublin --set cli_test "test"
./mwaa-v1-variables.sh ricsue-dublin --get cli_test
import boto3
import json
import requests
import base64
import getopt
import sys
argv = sys.argv[1:]
mwaa_env=''
aws_region=''
var_file=''
try:
opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region'])
#if len(opts) == 0 and len(opts) > 3:
if len(opts) != 3:
print ('Usage: -e MWAA environment -v variable file location and filename -r aws region')
else:
for opt, arg in opts:
if opt in ("-e"):
mwaa_env=arg
elif opt in ("-r"):
aws_region=arg
elif opt in ("-v"):
var_file=arg
boto3.setup_default_session(region_name="{}".format(aws_region))
mwaa_env_name = "{}".format(mwaa_env)
client = boto3.client('mwaa')
mwaa_cli_token = client.create_cli_token(
Name=mwaa_env_name
)
with open ("{}".format(var_file), "r") as myfile:
fileconf = myfile.read().replace('\n', '')
json_dictionary = json.loads(fileconf)
for key in json_dictionary:
print(key, " ", json_dictionary[key])
val = (key + " " + json_dictionary[key])
mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
raw_data = "variables -s {0}".format(val)
mwaa_response = requests.post(
mwaa_webserver_hostname,
headers={
'Authorization': mwaa_auth_token,
'Content-Type': 'text/plain'
},
data=raw_data
)
mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
print(mwaa_response.status_code)
print(mwaa_std_err_message)
print(mwaa_std_out_message)
except:
print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region')
print("Unexpected error:", sys.exc_info()[0])
sys.exit(2)
python mwaa-v1-vars.py -e ricsue-dublin -r eu-west-1 -v variables.json
cli_test newvalue/
200
cli_test2 12341234
200
cli_test3 https://grouplens.org/datasets/movielens/latest/
200
#!/bin/bash
[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit
if [[ $2 == "" ]]; then
dag="variables list"
elif [ $2 == "get" ] || [ $2 == "delete" ] || [ $2 == "set" ]; then
dag="variables $2 $3 $4 $5"
else
echo "Not a valid command"
exit 1
fi
CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \
&& CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
&& WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
&& CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \
--header "Authorization: Bearer $CLI_TOKEN" \
--header "Content-Type: text/plain" \
--data-raw "$dag" ) \
&& echo "Output:" \
&& echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \
&& echo "Errors:" \
&& echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode
./mwaa-v2-variables.sh ricsue-dublin delete cli_test
./mwaa-v2-variables.sh ricsue-dublin set cli_test "test"
./mwaa-v2-variables.sh ricsue-dublin get cli_test
raw_data = "variables set {0}".format(val)
python mwaa-v2-vars.py -e ricsue-dublin -r eu-central-1 -v variables.json
cli_test newvalue/
200
Variable cli_test created
cli_test2 12341234
200
Variable cli_test2 created
cli_test3 https://bbc.co.uk
200
Variable cli_test3 created
#!/bin/bash
[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit
if [[ $2 == "" ]]; then
dag="list_dags"
elif [ $2 == "list_tasks" ] && [[ $3 != "" ]]; then
dag="$2 $3"
elif [ $2 == "list_dags" ] || [ $2 == "version" ]; then
dag=$2
elif [ $2 == "variables" ] ; then
dag="$2 $3 $4"
else
echo "Not a valid command"
exit 1
fi
CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \
&& CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
&& WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
&& CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \
--header "Authorization: Bearer $CLI_TOKEN" \
--header "Content-Type: text/plain" \
--data-raw "$dag" ) \
&& echo "Output:" \
&& echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \
&& echo "Errors:" \
&& echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode
Note! If you want to run this in a different region to your AWS cli's default profile, set the AWS_REGION variable to the region you want to run this in
./mwaa-cli-v1.sh ricsue-dublin variables
cli_test
cli_test2
cli_test3
./mwaa-cli-v1.sh ricsue-dublin variables --get cli_test
Output:
newvalue/
Errors:
#!/bin/bash
[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit
if [[ $2 == "" ]]; then
dag="cheat-sheet"
elif [ $2 == "version" ] ; then
dag="version"
elif [ $2 == "list_dags" ] ; then
dag="dags list"
elif [ $2 == "variables" ] ; then
dag="$2 $3 $4"
else
echo "Not a valid command"
exit 1
fi
CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \
&& CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
&& WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
&& CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \
--header "Authorization: Bearer $CLI_TOKEN" \
--header "Content-Type: text/plain" \
--data-raw "$dag" ) \
&& echo "Output:" \
&& echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \
&& echo "Errors:" \
&& echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode
./mwaa-cli-v2.sh ricsue-dublin variables
cli_test
cli_test2
cli_test3
./mwaa-cli-v2.sh ricsue-dublin variables get cli_test
Output:
newvalue/
Errors:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id="airflow_variables_atruntime",
schedule_interval=None,
catchup=False,
start_date=days_ago(1)) as dag:
get_var_filename = BashOperator(
task_id="get_var_filename",
bash_command="""echo 'You are running this DAG with the following variable file: "{{ dag_run.conf["cli_test"] if dag_run.conf else var.value.cli_test }}"'""",
)
get_var_filename = BashOperator(
task_id="get_var_filename2",
bash_command='echo "You are running this DAG with the following variable file: \'{{ dag_run.conf["cli_test2"] if dag_run.conf else var.value.cli_test2 }}\'"',
)
get_var_filename = BashOperator(
task_id="get_var_filename3",
bash_command='echo "You are running this DAG with the following variable file: \'{{ dag_run.conf["cli_test3"] if dag_run.conf else var.value.cli_test3 }}\'"',
)
#!/bin/bash
[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name, name of the DAG and runtime parameters " && exit
if [[ $2 == "" ]]; then
dag="list_dags"
elif [ $2 == "trigger_dag" ] && [[ $3 != "" ]]; then
dag="$2 $3 $4 $5"
elif [ $2 == "list_dags" ] || [ $2 == "version" ]; then
dag=$2
elif [ $2 == "variables" ] ; then
dag="$2 $3 $4 $5"
else
echo "Not a valid command"
exit 1
fi
echo $dag
CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \
&& CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
&& WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
&& CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \
--header "Authorization: Bearer $CLI_TOKEN" \
--header "Content-Type: text/plain" \
--data-raw "$dag" ) \
&& echo "Output:" \
&& echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \
&& echo "Errors:" \
&& echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode
./mwaa-v1-run-param.sh ricsue-dublin trigger_dag -c '{\"cli_test3\":\"http://aws.amazon.com\"}' airflow_variables_atruntime
[2021-07-27 12:01:07,421] {{bash_operator.py:146}} INFO - Running command: echo "You are running this DAG with the following variable file: 'http://aws.amazon.com'"
[2021-07-27 12:01:07,461] {{bash_operator.py:153}} INFO - Output:
[2021-07-27 12:01:07,494] {{bash_operator.py:157}} INFO - You are running this DAG with the following variable file: 'http://aws.amazon.com'
#!/bin/bash
AWS_REGION=eu-central-1
[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name, name of the DAG and runtime parameters " && exit
if [[ $2 == "" ]]; then
dag="dags list"
elif [ $2 == "trigger_dag" ] && [[ $3 != "" ]]; then
dag="dags trigger $3 $4 $5"
elif [ $2 == "list_dags" ] || [ $2 == "version" ]; then
dag=$2
elif [ $2 == "variables" ] ; then
dag="$2 $3 $4 $5"
else
echo "Not a valid command"
exit 1
fi
echo $dag
CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \
&& CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
&& WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
&& CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \
--header "Authorization: Bearer $CLI_TOKEN" \
--header "Content-Type: text/plain" \
--data-raw "$dag" ) \
&& echo "Output:" \
&& echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \
&& echo "Errors:" \
&& echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode
./mwaa-v2-run-param.sh ricsue-dublin trigger_dag -c '{\"cli_test3\":\"http://aws.amazon.com\"}' airflow_variables_atruntime
[2021-07-27 12:17:57,315] {{__init__.py:38}} INFO - Loaded API auth backend: <module 'airflow.api.auth.backend.deny_all' from '/usr/local/lib/python3.7/site-packages/airflow/api/auth/backend/deny_all.py'>
Created <DagRun airflow_variables_atruntime @ 2021-07-27 12:17:57+00:00: manual__2021-07-27T12:17:57+00:00, externally triggered: True>
Errors:
[2021-07-27 12:32:25,428] {{bash.py:158}} INFO - Running command: echo 'http://aws.amazon.com'
[2021-07-27 12:32:25,501] {{bash.py:173}} INFO - http://aws.amazon.com
phases:
build:
commands:
- aws s3 sync . s3://{your airflow s3 bucket}/dags/ --delete
demo = "{variable_placeholder}"
version: 0.2
env:
secrets-manager:
VARIABLE: "arn:aws:secretsmanager:eu-central-1:704533066374:secret:blog/airflow/variables/codebuild-iIvHYf:airflow_secret_variable"
phases:
build:
commands:
- for file in *.py; do sed -i "s|{variable_placeholder}|${VARIABLE}|g" $file ; done
- aws s3 sync . s3://{your-airflow-s3}/dags/ --delete
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
demo = "{variable_placeholder}"
with DAG(
dag_id="display_changed_variables",
schedule_interval=None,
catchup=False,
start_date=days_ago(1)) as dag:
get_var_filename = BashOperator(
task_id="disp_variable",
bash_command='echo "The value of the variable is: {var}"'.format(var=demo),
)
...
from airflow.models import Variable
demo = "airflow_secret_value"
with DAG(
...
[2021-07-23 16:35:34,205] {{bash.py:158}} INFO - Running command: echo "The value of the variable is: airflow_secret_value"
[2021-07-23 16:35:34,246] {{bash.py:169}} INFO - Output:
[2021-07-23 16:35:34,276] {{bash.py:173}} INFO - The value of the variable is: airflow_secret_value
Note! If you get the following error when trying this approach:
[Container] 2021/07/23 14:44:33 Phase context status code: Secrets Manager Error Message: AccessDeniedException: User: arn:aws:sts::704533066374:assumed-role/codebuild-deploy-dags-service-role/AWSCodeBuild-7a8ab216-7c74-4cc6-91d4-f48bcdb99a09 is not authorized to perform: secretsmanager:GetSecretValue on resource: arn:aws:secretsmanager:eu-central-1:704533066374:secret:blog/airflow/variables/codebuild-awsaws
Remember to add the "secretsmanager:GetSecretValue" permission to the CodeBuild execution role. In the above example, this is what I added (yours will be similar, but with different resource names)
{
"Effect": "Allow",
"Resource": [
"arn:aws:secretsmanager:eu-central-1:704533066374:secret:blog/airflow/variables*"
],
"Action": [
"secretsmanager:GetSecretValue"
]
},
55