26
loading...
This website collects cookies to deliver better user experience
from flask import Flask
app = Flask(name__)
@app.route("/")
def home():
return ""Hello, World!""
if name == "__main":
app.run(debug=True)
#imports
from flask import Flask
from celery import Celery
#creates a Flask object
app = Flask(name)
#Configure the redis server
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
#creates a Celery object
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def async_function(arg1, arg2):
#Async task
return result
`async_function(10, 30)`
<html>
<head>
<title>Flask and Celery</title>
</head>
<body>
<h2>Sending Asynchronous Email</h2>
{% for message in get_flashed_messages() %}
<p style="color: red;">{{ message }}</p>
{% endfor %}
<form method="POST">
<p>Send email to: <input type="text" name="email" value="{{ email }}"></p>
<input type="submit" name="submit" value="Send">
</form>
</body>
</html>
Flask-Mail configuration
app.config['MAIL_SERVER'] = 'smtp.googlemail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME')
app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD')
app.config['MAIL_DEFAULT_SENDER'] = '[email protected]'
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'GET':
return render_template('index.html', email=session.get('email', ''))
email = request.form['email']
session['email'] = email
# sends this content
email_msg = {
'subject': 'Testing Celery with Flask',
'to': email,
'body': 'Testing background task with Celery'
}
if request.form['submit'] == 'Send':
# sends the email content to the backgraound function
send_email.delay(email_msg)
flash('Sending email to {0}'.format(email))
else:
flash('No Email sent')
return redirect(url_for('index'))
@celery.task
def send_email(email_msg):
#Async function to send an email with Flask-Mail
msg_sub = Message(email_msg['subject'],
email_sender = app.config['MAIL_DEFAULT_SENDER'],
recipient = [email_msg['to']])
msg_sub.body = email_msg['body']
with app.app_context():
mail.send(msg_sub)