29
loading...
This website collects cookies to deliver better user experience
python --version
fastapi[uvicorn]
ariadne
ASGI has two components:
async def hello_world():
return "Hello world"
await
, we would call our asynchronous function as follows:
obytes = await hello_world()
type User {
id: ID! // This is the id of the user
email: String! // This is the email of the user
password: String! // This is the password of the user
}
type blog {
id: ID! // This is the id of the blog
title: String! // This is the title of the blog
description: String! // This is the description of the blog
completed: Boolean! // This is the completed status of the blog
ownerId: ID! // This is the id of the owner of the blog
}
type blogResult {
errors: [String] // This is the list of errors
blog: blog // This is the blog
}
type blogsResult {
errors: [String]
blogs: [blog] // This is the list of blogs
}
type InsertResult {
errors: [String]
id: ID // This is the id of the inserted blog
}
type TokenResult {
errors: [String]
token: String // This is the token
}
schema {
query: Query // This is the query type
mutation: Mutation // This is the mutation type
subscription: Subscription // This is the subscription type
}
type Query {
blogs: blogsResult! // This is the list of blogs
blog(blogId: ID!): blogResult! // This is the blog
}
type Mutation {
createblog(title: String!, description: String!): InsertResult! // This is the blog
createUser(email: String!, password: String!): InsertResult! // This is the user
createToken(email: String!, password: String!): TokenResult! // This is the token
}
type Subscription {
reviewblog(token:String!): InsertResult! // This is the blog
}
app.py
and add the code below:from ariadne import QueryType, make_executable_schema, load_schema_from_path
from ariadne.asgi import GraphQL
type_defs = load_schema_from_path("schema.graphql")
query = QueryType()
@query.field("hello")
def resolve_hello(*_):
return "Hello world!"
schema = make_executable_schema(type_defs, query)
app = GraphQL(schema, debug=True)
schema.graphql
file and added a simple query called hello that we will use to test that our server is running. Our server is now ready to accept requests.uvicorn app:app --reload
query {
hello
}
resolve_hello
function from app.py
and delete the hello query in the type Query section of schema.graphql
.store.py.
Initialize users, blogs & queues to an empty list.users = {}
blog = []
queues = []
createUser
resolver to mutations.py.from ariadne import ObjectType, convert_kwargs_to_snake_case
from store import users, blogs
mutation = ObjectType("Mutation")
@mutation.field("createUser")
@convert_kwargs_to_snake_case
async def resolve_create_user(obj, info, email, password):
try:
if not users.get(username):
user = {
"id": len(users) + 1,
"email": email,
"password": password,
}
users[username] = user
return {
"success": True,
"user": user
}
return {
"success": False,
"errors": ["Username is taken"]
}
except Exception as error:
return {
"success": False,
"errors": [str(error)]
}
ObjectType
and convert_kwargs_to_snake_case
from the Ariadne package. ObjectType
is used to define the mutation resolver, and convert_kwargs_to_snake_case
recursively converts arguments case from camelCase
to snake_case
.@mutation.field("createblog")
@convert_kwargs_to_snake_case
async def resolve_create_blog(obj, info, content, title, description, completed, ownerId):
try:
blog = {
"ID": id,
"title": title,
"description": description
"completed": completed,
"ownerId": ownerId
}
blogs.append(blog)
return {
"success": True,
"blog": blog
}
except Exception as error:
return {
"success": False,
"errors": [str(error)]
}
resolve_create_blog
, we create a dictionary that stores the attributes of the blog. We append it to the blogs list and return the created blog. If successful, we set success to True and return success and the created blog object. If there was an error, we set success to False and return success and the error blog.from mutations import mutation
make_executable_schema
:schema = make_executable_schema(type_defs, query, mutation)
get_blogs
resolver.
# as you know here i use Database[PostgreSQL] to connect to the database
# Install psycopg2 & databases[postgresql] & asyncpg
async def get_blogs(
skip: Optional[int] = 0, limit: Optional[int] = 100
) -> Optional[Dict]:
query = blog.select(offset=skip, limit=limit)
result = await database.fetch_all(query=query)
return [dict(blog) for blog in result] if result else None
async def get_blog(blog_id: int) -> Optional[Dict]:
query = blog.select().where(blog.c.id == int(blog_id))
result = await database.fetch_one(query=query)
return dict(result) if result else None
from typing import Optional
from ariadne import QueryType, convert_kwargs_to_snake_case
from crud import get_blogs, get_blog # Create a file called crud.py and add the get_blogs function
from schemas.error import MyGraphQLError
@convert_kwargs_to_snake_case
async def resolve_blogs(obj, info, skip: Optional[int] = 0, limit: Optional[int] = 100):
blogs = await get_blogs(skip=skip, limit=limit)
return {"blogs": blogs}
@convert_kwargs_to_snake_case
async def resolve_blog(obj, info, blog_id):
blog = await get_blog(blog_id=blog_id)
if not blog:
raise MyGraphQLError(code=404, message=f"blog id {blog_id} not found")
return {"success": True, "blog": blog}
query = QueryType()
query.set_field("blogs", resolve_blogs)
query.set_field("blog", resolve_blog)
from ariadne import SubscriptionType, convert_kwargs_to_snake_case
from graphql.type import GraphQLResolveInfo
subscription = SubscriptionType()
@convert_kwargs_to_snake_case
@subscription.field("reviewblog")
async def review_blog_resolver(review_blog, info: GraphQLResolveInfo, token: str):
return {"id": review_blog}
Rabbit MQ is a messaging broker that allows you to publish and subscribe to messages.
# This is Example from My Project FastQL
@convert_kwargs_to_snake_case
@subscription.source("reviewblog")
async def review_blog_source(obj, info: GraphQLResolveInfo, token: str):
user = await security.get_current_user_by_auth_header(token)
if not user:
raise MyGraphQLError(code=401, message="User not authenticated")
while True:
blog_id = await rabbit.consumeblog()
if blog_id:
yield blog_id
else:
return
user_one
and user_two
. Paste the following mutation and hit play.mutation {
createUser(
email:"[email protected]"
password:"admin"
) {
success
user {
userId
email
password
}
}
}
user_one
to user_two
and hit play again to create the second user.createBlog
mutation expects us to provide senderId
and recipientId
. If you looked at the responses from the createUser
mutations you already know what IDs were assigned to them, but let’s assume we only know the usernames, so we will use the userId
query to retrieve the IDs.query {
userId(
// User One
email: "[email protected]"
password: "admin"
)
}
createBlog
mutation.
mutation {
createBlog(
senderId: "1",
recipientId: "2",
title:"Blog number1"
description:"This is the first blog"
ownerId: "1"
) {
success
blog {
title
description
ownerId
recipientId
senderId
}
}
}
asyncio.Queue
.