Calling python async function from sync code by Andrey Zhukov’s

Introduction

This is an example of how to transform sequential requests to parallel in some legacy code with asyncio loop when the refactoring is not an option.

To show the idea I’m using a simplified version of microsoft graph api requests to get user manager by user ids.

The code before

 1 def get_manager(user_id: str) -> Optional[Dict]:
 2     headers = get_auth_headers()
 3     url = f"https://graph.microsoft.com/beta/users/{user_id}/manager"
 4     response = requests.request("GET", url=url, headers=headers, timeout=5)
 5     if response.status_code == 200:
 6         return response.json()
 7     return None
 8
 9
10 def get_managers(user_ids: List[str]) -> Dict[str, Dict]:
11     user_managers: Dict[str, Dict] = {}
12     for user_id in user_ids:
13         manager = get_manager(user_id)
14         if manager:
15             user_managers[user_id] = manager
16     return user_managers

The code after

The important part is a RateLimiter to avoid too many 429 responses.

 1 class RateLimiter:
 2     """Rate limits for aiohttp session requests."""
 3
 4     RATE = 100  # number of requests per second
 5
 6     def __init__(self, session):
 7         self.session = session
 8         self.tokens = self.RATE
 9         self.updated_at = time.monotonic()
10
11     async def request(self, *args, **kwargs):
12         await self.wait_for_token()
13         return self.session.request(*args, **kwargs)
14
15     async def wait_for_token(self):
16         while self.tokens < 1:
17             self.add_new_tokens()
18             await asyncio.sleep(0.1)
19         self.tokens -= 1
20
21     def add_new_tokens(self):
22         now = time.monotonic()
23         time_since_update = now - self.updated_at
24         new_tokens = time_since_update * self.RATE
25         if self.tokens + new_tokens >= 1:
26             self.tokens = min(self.tokens + new_tokens, self.RATE)
27             self.updated_at = now
28
29
30 async def get_manager(session: aiohttp.ClientSession, user_id: str) -> Optional[Tuple[str, Dict]]:
31     headers = get_auth_headers()
32     url = f"https://graph.microsoft.com/beta/users/{user_id}/manager"
33     timeout = aiohttp.ClientTimeout(sock_connect=2, sock_read=5)
34     async with await session.request("GET", url=url, headers=headers, timeout=timeout) as response:
35         if response.status == 429:  # throttling
36             await asyncio.sleep(120)  # wait two minutes
37             return await async_microsoft_graph_request(session, user_id)
38
39         if response.status == 200:
40             return user_id, await response.json()
41
42         return None
43
44
45 def get_managers(user_ids: List[str]) -> Dict[str, Dict]:
46     async def batch_tasks():
47         tasks = []
48         conn = aiohttp.TCPConnector(ttl_dns_cache=300, family=socket.AF_INET)
49         async with aiohttp.ClientSession(connector=conn) as session:
50             rate_limiter_session = RateLimiter(session)
51             for user_id in user_ids:
52                 tasks.append(get_manager(user_id))
53             return await asyncio.gather(*tasks)
54
55     user_managers: Dict[str, Dict] = {}
56     loop = asyncio.new_event_loop()
57     asyncio.set_event_loop(loop)
58     results = loop.run_until_complete(batch_tasks())
59     for result in results:
60         if result is not None:
61             user_managers[result[0]] = result[1]
62     # Wait 250 ms for the underlying SSL connections to close https://github.com/aio-libs/aiohttp/issues/1925
63     loop.run_until_complete(asyncio.sleep(0.250))
64     loop.close()
65     return user_managers