47
loading...
This website collects cookies to deliver better user experience
{
"totalPassengers": 19675,
"totalPages": 1968,
"data": [
{
"_id": "5ff393986feae02b6c22b251",
"name": "Deepanss",
"trips": 0,
"airline": [
{
"id": 3,
"name": "Cathay Pacific",
"country": "Hong Kong",
"logo": "https://upload.wikimedia.org/wikipedia/en/thumb/1/17/Cathay_Pacific_logo.svg/300px-Cathay_Pacific_logo.svg.png",
"slogan": "Move Beyond",
"head_quaters": "Cathay City, Hong Kong International Airport, Chek Lap Kok, Hong Kong",
"website": "www.cathaypacific.com",
"established": "1946"
}
],
"__v": 0
},...
]
}
import asyncio
import typing as t
import httpx
async def get_passengers(
page: int = 0,
size: int = 10,
) -> httpx.Response:
print(f"Getting page n°{page}")
response: httpx.Response = httpx.get(
f"https://api.instantwebtools.net/v1/passenger?page={page}&size={size}"
)
response.raise_for_status()
return response
async def get_all_passengers() -> None:
page = 0
result = []
while True:
response = await get_passengers(page=page)
response_data = response.json()
result += response_data["data"]
if page >= response_data["totalPages"]:
break
page += 1
print(f"total result:{len(result)} ")
if __name__ == "__main__":
asyncio.run(get_all_passengers())
Getting page n°0
Getting page n°1
Getting page n°2
Getting page n°3
Getting page n°4
Getting page n°5
Getting page n°6
Getting page n°7
Getting page n°8
Getting page n°9
Getting page n°10
total result:110
import asyncio
import typing as t
import httpx
async def get_passengers(
page: int = 0,
size: int = 10,
) -> httpx.Response:
print(f"Getting page n°{page}")
response: httpx.Response = httpx.get(
f"https://api.instantwebtools.net/v1/passenger?page={page}&size={size}"
)
response.raise_for_status()
return response
async def get_all_passengers() -> t.AsyncGenerator[t.Any, None]:
page = 0
while True:
response = await get_passengers(page=page)
response_data = response.json()
for passenger in response_data["data"] or []:
yield passenger
if page >= response_data["totalPages"]:
break
page += 1
async def get_all_passengers_async_for() -> None:
result = [passenger async for passenger in get_all_passengers()]
print(f"total result:{len(result)} ")
if __name__ == "__main__":
asyncio.run(get_all_passengers_async_for())
>>> mock = MagicMock() # AsyncMock also works here
>>> mock.__aiter__.return_value = [1, 2, 3]
>>> async def main():
... return [i async for i in mock]
...
>>> asyncio.run(main())
[1, 2, 3]