31
loading...
This website collects cookies to deliver better user experience
mkdir -p streaming-dashboard/app
# streaming-dashboard
# └── app
version: "3"
volumes:
questdb_data: {}
services:
redis:
image: "redis:latest"
ports:
- "6379:6379"
questdb:
image: "questdb/questdb:latest"
volumes:
- questdb_data:/root/.questdb/db
ports:
- "9000:9000"
- "8812:8812"
docker-compose up
, QuestDB and Redis will fire up. After starting the services, we can access QuestDB's interactive console on http://127.0.0.1:9000.CREATE TABLE
quotes(stock_symbol SYMBOL CAPACITY 5 CACHE INDEX, -- we are in fact just checking 3
current_price DOUBLE,
high_price DOUBLE,
low_price DOUBLE,
open_price DOUBLE,
percent_change DOUBLE,
tradets TIMESTAMP, -- timestamp of the trade
ts TIMESTAMP) -- time of insert in our table
timestamp(ts)
PARTITION BY DAY;
finnhub-python==2.4.5 # The official Finnhub Python client
pydantic[dotenv]==1.8.2 # We will use Pydantic to create data models
celery[redis]==5.1.2 # Celery will be the periodic task executor
psycopg2==2.9.1 # We are using QuestDB's PostgreSQL connector
sqlalchemy==1.4.2 # SQLAlchemy will help us executing SQL queries
dash==2.0.0 # Dash is used for building data apps
pandas==1.3.4 # Pandas will handle the data frames from QuestDB
plotly==5.3.1 # Plotly will help us with beautiful charts
$ virtualenv -p python3.8 virtualenv
$ source virtualenv/bin/activate
$ pip install -r requirements.txt
app
package, called db.py
. This file contains the SQLAlchemy
engine that will serve as the base for our connections.from sqlalchemy import create_engine
from app.settings import settings
engine = create_engine(
settings.database_url, pool_size=settings.database_pool_size, pool_pre_ping=True
)
settings.py
file in the app
package. We will use pydantic
's BaseSettings to define the configuration. This helps us to read the settings from a .env
file, environment variable, and prefix them if needed.SMD
that stands for "stock market dashboard", our application. Below you can see the settings file:from typing import List
from pydantic import BaseSettings
class Settings(BaseSettings):
"""
Settings of the application, used by workers and dashboard.
"""
# Celery settings
celery_broker: str = "redis://127.0.0.1:6379/0"
# Database settings
database_url: str = "postgresql://admin:[email protected]:8812/qdb"
database_pool_size: int = 3
# Finnhub settings
api_key: str = ""
frequency: int = 5 # default stock data fetch frequency in seconds
symbols: List[str] = list()
# Dash/Plotly
debug: bool = True
graph_interval: int = 10
class Config:
"""
Meta configuration of the settings parser.
"""
env_file = ".env"
# Prefix the environment variable not to mix up with other variables
# used by the OS or other software.
env_prefix = "SMD_" # SMD stands for Stock Market Dashboard
settings = Settings()
celery_broker
and database_url
settings with unusual default values..env
file. One of pydantic
based settings' most significant advantage is that it can read environment variables from .env
files..env
file in the project root, next to docker-compose.yml
:SMD_API_KEY = "<YOUR SANDBOX API KEY>"
SMD_FREQUENCY = 10
SMD_SYMBOLS = ["AAPL","DOCN","EBAY"]
import finnhub
from celery import Celery
from sqlalchemy import text
from app.db import engine
from app.settings import settings
client = finnhub.Client(api_key=settings.api_key)
celery_app = Celery(broker=settings.celery_broker)
@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
"""
Setup a periodic task for every symbol defined in the settings.
"""
for symbol in settings.symbols:
sender.add_periodic_task(settings.frequency, fetch.s(symbol))
@celery_app.task
def fetch(symbol: str):
"""
Fetch the stock info for a given symbol from Finnhub and load it into QuestDB.
"""
quote: dict = client.quote(symbol)
# https://finnhub.io/docs/api/quote
# quote = {'c': 148.96, 'd': -0.84, 'dp': -0.5607, 'h': 149.7, 'l': 147.8, 'o': 148.985, 'pc': 149.8, 't': 1635796803}
# c: Current price
# d: Change
# dp: Percent change
# h: High price of the day
# l: Low price of the day
# o: Open price of the day
# pc: Previous close price
# t: when it was traded
query = f"""
INSERT INTO quotes(stock_symbol, current_price, high_price, low_price, open_price, percent_change, tradets, ts)
VALUES(
'{symbol}',
{quote["c"]},
{quote["h"]},
{quote["l"]},
{quote["o"]},
{quote["pc"]},
{quote["t"]} * 1000000,
systimestamp()
);
"""
with engine.connect() as conn:
conn.execute(text(query))
import finnhub
from celery import Celery
from sqlalchemy import text
from app.db import engine
from app.settings import settings
# [...]
# [...]
client = finnhub.Client(api_key=settings.api_key)
celery_app = Celery(broker=settings.celery_broker)
# [...]
# [...]
@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
"""
Setup a periodic task for every symbol defined in the settings.
"""
for symbol in settings.symbols:
sender.add_periodic_task(settings.frequency, fetch.s(symbol))
# [...]
fetch
task that does the majority of the work.# [...]
@celery_app.task
def fetch(symbol: str):
"""
Fetch the stock info for a given symbol from Finnhub and load it into QuestDB.
"""
quote: dict = client.quote(symbol)
# https://finnhub.io/docs/api/quote
# quote = {'c': 148.96, 'd': -0.84, 'dp': -0.5607, 'h': 149.7, 'l': 147.8, 'o': 148.985, 'pc': 149.8, 't': 1635796803}
# c: Current price
# d: Change
# dp: Percent change
# h: High price of the day
# l: Low price of the day
# o: Open price of the day
# pc: Previous close price
# t: when it was traded
query = f"""
INSERT INTO quotes(stock_symbol, current_price, high_price, low_price, open_price, percent_change, tradets, ts)
VALUES(
'{symbol}',
{quote["c"]},
{quote["h"]},
{quote["l"]},
{quote["o"]},
{quote["pc"]},
{quote["t"]} * 1000000,
systimestamp()
);
"""
with engine.connect() as conn:
conn.execute(text(query))
client
, we get a quote for the given symbol. After the quote is retrieved successfully, we prepare a SQL query to insert the quote into the database. At the end of the function, as the last step, we open a connection to QuestDB and insert the new quote.python -m celery --app app.worker.celery_app worker --beat -l info -c 1
docker-compose.yml
file to manage related servicesapp/settings.py
that handles our application configurationapp/db.py
configuring the database engine, andapp/worker.py
that handles the hard work, fetches, and stores the data.├── app
│ ├── __init__.py
│ ├── db.py
│ ├── settings.py
│ └── worker.py
└── docker-compose.yml
assets
directory next to the app
package with the structure below:├── app
│ ├── __init__.py
│ ├── db.py
│ ├── settings.py
│ └── worker.py
├── assets
├── .env
├── docker-compose.yml
style.css
will define the styling for our application. As mentioned above, Dash will save us from boilerplate code, so the assets
directory will be used by default in conjunction with the stylesheet in it.style.css
file to the assets
directory, this can be done using curl
:curl -s -Lo ./assets/style.css https://raw.githubusercontent.com/gabor-boros/questdb-stock-market-dashboard/main/assets/style.css
main.py
file in the app
package, and let's begin with the imports:from datetime import datetime, timedelta
import dash
import pandas
from dash import dcc, html
from dash.dependencies import Input, Output
from plotly import graph_objects
from app.db import engine
from app.settings import settings
# [...]
# [...]
GRAPH_INTERVAL = settings.graph_interval * 1000
TIME_DELTA = 5 # last T hours of data are looked into as per insert time
COLORS = [
"#1e88e5",
"#7cb342",
"#fbc02d",
"#ab47bc",
"#26a69a",
"#5d8aa8",
]
def now() -> datetime:
return datetime.utcnow()
def get_stock_data(start: datetime, end: datetime, stock_symbol: str):
def format_date(dt: datetime) -> str:
return dt.isoformat(timespec="microseconds") + "Z"
query = f"quotes WHERE ts BETWEEN '{format_date(start)}' AND '{format_date(end)}'"
if stock_symbol:
query += f" AND stock_symbol = '{stock_symbol}' "
with engine.connect() as conn:
return pandas.read_sql_query(query, conn)
# [...]
GRAPH_INTERVAL
) and colors that will be used for coloring the graph (COLORS
).now
and get_stock_data
. While now
is responsible only for getting the current time in UTC (as Finnhub returns the date in UTC too), the get_stock_data
does more. It is the core of# [...]
df = get_stock_data(now() - timedelta(hours=TIME_DELTA), now(), "")
app = dash.Dash(
__name__,
title="Real-time stock market changes",
assets_folder="../assets",
meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}],
)
# [...]
df
) will contain the latest 5 hours of data we have. This is needed to pre-populate the application with some data we have. The application definition app
describes the application's title, asset folder, and some HTML meta tags used during rendering.# [...]
app.layout = html.Div(
[
html.Div(
[
html.Div(
[
html.H4("Stock market changes", className="app__header__title"),
html.P(
"Continually query QuestDB and display live changes of the specified stocks.",
className="app__header__subtitle",
),
],
className="app__header__desc",
),
],
className="app__header",
),
html.Div(
[
html.P("Select a stock symbol"),
dcc.Dropdown(
id="stock-symbol",
searchable=True,
options=[
{"label": symbol, "value": symbol}
for symbol in df["stock_symbol"].unique()
],
),
],
className="app__selector",
),
html.Div(
[
html.Div(
[
html.Div(
[html.H6("Current price changes", className="graph__title")]
),
dcc.Graph(id="stock-graph"),
],
className="one-half column",
),
html.Div(
[
html.Div(
[html.H6("Percent changes", className="graph__title")]
),
dcc.Graph(id="stock-graph-percent-change"),
],
className="one-half column",
),
],
className="app__content",
),
dcc.Interval(
id="stock-graph-update",
interval=int(GRAPH_INTERVAL),
n_intervals=0,
),
],
className="app__container",
)
# [...]
dcc.Interval
. The interval is used to set up periodic graph refresh.# [...]
@app.callback(
Output("stock-graph", "figure"),
[Input("stock-symbol", "value"), Input("stock-graph-update", "n_intervals")],
)
def generate_stock_graph(selected_symbol, _):
data = []
filtered_df = get_stock_data(now() - timedelta(hours=TIME_DELTA), now(), selected_symbol)
groups = filtered_df.groupby(by="stock_symbol")
for group, data_frame in groups:
data_frame = data_frame.sort_values(by=["ts"])
trace = graph_objects.Scatter(
x=data_frame.ts.tolist(),
y=data_frame.current_price.tolist(),
marker=dict(color=COLORS[len(data)]),
name=group,
)
data.append(trace)
layout = graph_objects.Layout(
xaxis={"title": "Time"},
yaxis={"title": "Price"},
margin={"l": 70, "b": 70, "t": 70, "r": 70},
hovermode="closest",
plot_bgcolor="#282a36",
paper_bgcolor="#282a36",
font={"color": "#aaa"},
)
figure = graph_objects.Figure(data=data, layout=layout)
return figure
# [...]
# [...]
@app.callback(
Output("stock-graph-percent-change", "figure"),
[
Input("stock-symbol", "value"),
Input("stock-graph-update", "n_intervals"),
],
)
def generate_stock_graph_percentage(selected_symbol, _):
data = []
filtered_df = get_stock_data(now() - timedelta(hours=TIME_DELTA), now(), selected_symbol)
groups = filtered_df.groupby(by="stock_symbol")
for group, data_frame in groups:
data_frame = data_frame.sort_values(by=["ts"])
trace = graph_objects.Scatter(
x=data_frame.ts.tolist(),
y=data_frame.percent_change.tolist(),
marker=dict(color=COLORS[len(data)]),
name=group,
)
data.append(trace)
layout = graph_objects.Layout(
xaxis={"title": "Time"},
yaxis={"title": "Percent change"},
margin={"l": 70, "b": 70, "t": 70, "r": 70},
hovermode="closest",
plot_bgcolor="#282a36",
paper_bgcolor="#282a36",
font={"color": "#aaa"},
)
figure = graph_objects.Figure(data=data, layout=layout)
return figure
# [...]
run_server
on the app
object when the script is called from the CLI.# [...]
if __name__ == "__main__":
app.run_server(host="0.0.0.0", debug=settings.debug)
PYTHONPATH=. python app/main.py
from the project root:$ PYTHONPATH=. python app/main.py
Dash is running on http://0.0.0.0:8050/
* Tip: There are .env or .flaskenv files present. Do "pip install python-dotenv" to use them.
* Serving Flask app 'main' (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: off
* Running on all addresses.
WARNING: This is a development server. Do not use it in a production deployment.
* Running on http://192.168.0.14:8050/ (Press CTRL+C to quit)