Calling python async function from sync code by Andrey Zhukov’s ¶
Introduction ¶
This is an example of how to transform sequential requests to parallel in some legacy code with asyncio loop when the refactoring is not an option.
To show the idea I’m using a simplified version of microsoft graph api requests to get user manager by user ids.
The code before ¶
1 def get_manager(user_id: str) -> Optional[Dict]:
2 headers = get_auth_headers()
3 url = f"https://graph.microsoft.com/beta/users/{user_id}/manager"
4 response = requests.request("GET", url=url, headers=headers, timeout=5)
5 if response.status_code == 200:
6 return response.json()
7 return None
8
9
10 def get_managers(user_ids: List[str]) -> Dict[str, Dict]:
11 user_managers: Dict[str, Dict] = {}
12 for user_id in user_ids:
13 manager = get_manager(user_id)
14 if manager:
15 user_managers[user_id] = manager
16 return user_managers
The code after ¶
The important part is a RateLimiter to avoid too many 429 responses.
1 class RateLimiter:
2 """Rate limits for aiohttp session requests."""
3
4 RATE = 100 # number of requests per second
5
6 def __init__(self, session):
7 self.session = session
8 self.tokens = self.RATE
9 self.updated_at = time.monotonic()
10
11 async def request(self, *args, **kwargs):
12 await self.wait_for_token()
13 return self.session.request(*args, **kwargs)
14
15 async def wait_for_token(self):
16 while self.tokens < 1:
17 self.add_new_tokens()
18 await asyncio.sleep(0.1)
19 self.tokens -= 1
20
21 def add_new_tokens(self):
22 now = time.monotonic()
23 time_since_update = now - self.updated_at
24 new_tokens = time_since_update * self.RATE
25 if self.tokens + new_tokens >= 1:
26 self.tokens = min(self.tokens + new_tokens, self.RATE)
27 self.updated_at = now
28
29
30 async def get_manager(session: aiohttp.ClientSession, user_id: str) -> Optional[Tuple[str, Dict]]:
31 headers = get_auth_headers()
32 url = f"https://graph.microsoft.com/beta/users/{user_id}/manager"
33 timeout = aiohttp.ClientTimeout(sock_connect=2, sock_read=5)
34 async with await session.request("GET", url=url, headers=headers, timeout=timeout) as response:
35 if response.status == 429: # throttling
36 await asyncio.sleep(120) # wait two minutes
37 return await async_microsoft_graph_request(session, user_id)
38
39 if response.status == 200:
40 return user_id, await response.json()
41
42 return None
43
44
45 def get_managers(user_ids: List[str]) -> Dict[str, Dict]:
46 async def batch_tasks():
47 tasks = []
48 conn = aiohttp.TCPConnector(ttl_dns_cache=300, family=socket.AF_INET)
49 async with aiohttp.ClientSession(connector=conn) as session:
50 rate_limiter_session = RateLimiter(session)
51 for user_id in user_ids:
52 tasks.append(get_manager(user_id))
53 return await asyncio.gather(*tasks)
54
55 user_managers: Dict[str, Dict] = {}
56 loop = asyncio.new_event_loop()
57 asyncio.set_event_loop(loop)
58 results = loop.run_until_complete(batch_tasks())
59 for result in results:
60 if result is not None:
61 user_managers[result[0]] = result[1]
62 # Wait 250 ms for the underlying SSL connections to close https://github.com/aio-libs/aiohttp/issues/1925
63 loop.run_until_complete(asyncio.sleep(0.250))
64 loop.close()
65 return user_managers