Identifying Microsoft Teams Storage Usage with Python and Graph API
I usually have a hard time learning something when the information is completely abstract I much prefer a practical application. I don’t want to expose the internal workings of any of my clients. So this is something I have run, with ‘fictitious’ data. In this case we are going to use the example of a school. We’ll compare synchronous and async code in the Microsoft Graph API to get a list of all the teams in the organization and then get the storage usage for each team. The school has run out out of data and needs to find out how much space is being used by each team.
Prerequisites
Before starting, ensure you have the following:
- Python 3.7+
- The following Python libraries:
requests
,msal
,pandas
,aiohttp
,nest_asyncio
. Install them via pip:1
pip install requests msal pandas aiohttp nest_asyncio
- An Azure Active Directory (Azure AD) application registration with:
- Client ID
- Tenant ID
- A generated Client Secret
- The following Microsoft Graph API Application Permissions (admin consent granted):
Group.Read.All
(to list Teams)Sites.Read.All
(to access drive storage information)
Security Note: Securely manage your client_secret
. Avoid hardcoding it in scripts for production environments. Consider using an environment variable.
Python Script Walkthrough
Part 1: Authentication with MSAL
We use the Microsoft Authentication Library (MSAL) for Python to obtain an access token for the Graph API. The token is good for an hour. Plenty of time for what we are doing here. I like to put into function anyway, you don’t know when you will need it again. Which is the whole point of functions, right?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import json
import requests
from msal import ConfidentialClientApplication
import pandas as pd
from datetime import datetime, timedelta
import time
from time import sleep # Optional for deliberate delays
# --- Credentials - Replace with your actual values ---
client_id = 'YOUR_CLIENT_ID'
tenant_id = 'YOUR_TENANT_ID'
client_secret = 'YOUR_CLIENT_SECRET'
# --- End Credentials ---
msal_authority = f"https://login.microsoftonline.com/{tenant_id}"
msal_scope = ["https://graph.microsoft.com/.default"]
msal_app = ConfidentialClientApplication(
client_id=client_id,
client_credential=client_secret,
authority=msal_authority,
)
def update_headers(current_msal_app, current_msal_scope):
"""Acquires a Graph API token and returns request headers."""
result = current_msal_app.acquire_token_silent(
scopes=current_msal_scope,
account=None,
)
if not result:
# print("No token in cache, acquiring new token for client...") # Optional logging
result = current_msal_app.acquire_token_for_client(scopes=current_msal_scope)
if "access_token" in result:
access_token = result["access_token"]
# print('Token acquired successfully.') # Optional logging
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
return headers
else:
error_description = result.get("error_description", "No error description provided.")
print(f"Error acquiring token: {error_description}")
raise Exception("No Access Token found or error in acquiring token.")
headers = update_headers(msal_app, msal_scope)
The update_headers function handles token acquisition.
Part 2: Fetching All Microsoft Teams
Teams are Microsoft 365 Groups with a ‘Team’ resource. We query the Graph API for these groups. We will run this synchonously it returs the list relatively quickly in about 3 seconds.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
teams_url = "https://graph.microsoft.com/v1.0/groups?$filter=resourceProvisioningOptions/Any(x:x eq 'Team')&$select=id,displayName"
print("\nFetching list of all Teams...")
start_fetch_teams_time = time.time()
all_teams_list = []
current_url = teams_url
while current_url:
response = requests.get(current_url, headers=headers)
response.raise_for_status()
teams_data = response.json()
all_teams_list.extend(teams_data['value'])
current_url = teams_data.get('@odata.nextLink', None)
# if current_url: # Optional logging
# print(f"Fetching next page of teams...")
df_teams = pd.DataFrame(all_teams_list)
df_unique_teams = df_teams.drop_duplicates(subset=['id'], keep='first')
end_fetch_teams_time = time.time()
elapsed_fetch_teams_time = end_fetch_teams_time - start_fetch_teams_time
print(f"\nFound {df_unique_teams.shape[0]} unique Teams.")
print(f"Time to fetch Teams list: {elapsed_fetch_teams_time:.2f} seconds")
team_ids_list = df_unique_teams['id'].tolist()
team_names_list = df_unique_teams['displayName'].tolist()
After running this code, you’ll get a list of teams like this, we really only need the id, but the displayName is nice to have and shows us right away that one of the names is repeated, but has a different id. This is a good example of why you should always check for duplicates when working with data. Seems like a strong hint, but I think we all already know who is responsible::
1
2
3
4
5
6
7
8
9
10
11
12
13
# Example of teams data from Graph API:
[
{'id': '87w3uhq2ev99r123o5mwi55s6nqp2bk4zlwf', 'displayName': 'Shell Cottage'},
{'id': 'qq9dqz35wgdq8yqjrn5wt4sqzzgpuihbbvas', 'displayName': 'The Gryffindor Common Room'},
{'id': 'ly09hvq87rvxh0n1rxh83d2vnaraqy7zpxuu', 'displayName': 'Hufflepuff House'},
{'id': '38tcpukusneqdhnwconcflwt2dqsgiobrbj4', 'displayName': 'The Black Lake'},
{'id': 'tsafap1hyai38knuc3p8sgmzafdt7pqq1o9u', 'displayName': 'The Dungeons'},
{'id': 'z98u31nua35b499cbqi8hy3gzyytad4z9mxb', 'displayName': 'The Library'},
{'id': 's50u5ipnrukdp4pnrgk0g1fk4bpq0mzkpmms', 'displayName': 'Azkaban Prison'},
{'id': 'dsos1aq9517gwnavr0c8miz5xrzhc9iq4jce', 'displayName': 'The Library'},
{'id': 'usrpwr65bj18zmrk8hqqmyaryglyop8dh0x6', 'displayName': 'The Hufflepuff Common Room'},
{'id': 'no12n4fddxtoq5sddz7c0ej0q35nctjreyv7', 'displayName': "St. Mungo's Hospital"}
]
Part 3: Fetching Drive Storage (Synchronous Method)
This initial approach fetches storage information for each Team sequentially. It can be slow, especially if you have a large number of teams.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
print("\nFetching drive storage for each team (Synchronous approach)...")
sync_start_time = time.time()
all_teams_drive_data_sync = []
# Consider token refresh strategy for very long running synchronous operations.
# headers = update_headers(msal_app, msal_scope)
for i, teamid in enumerate(team_ids_list):
if i > 0 and i % 50 == 0: # Progress indicator, skip first
print(f"Processing team {i+1}/{len(team_ids_list)}: {team_names_list[i]}")
drive_url = f'https://graph.microsoft.com/v1.0/groups/{teamid}/drive'
try:
response = requests.get(drive_url, headers=headers)
response.raise_for_status()
drive_info = response.json()
team_frame = pd.json_normalize(drive_info)
team_frame['team_id'] = teamid
team_frame['teamName'] = team_names_list[i]
all_teams_drive_data_sync.append(team_frame)
except requests.exceptions.HTTPError as e:
print(f"Error fetching drive for team ID {teamid} ({team_names_list[i]}): {e.response.status_code}")
except json.JSONDecodeError:
print(f"Error decoding JSON for team ID {teamid} ({team_names_list[i]})")
except Exception as e:
print(f"An unexpected error for team ID {teamid} ({team_names_list[i]}): {e}")
# time.sleep(0.05) # Optional small delay for very basic throttling avoidance
if all_teams_drive_data_sync:
space_teams_sync_df = pd.coI didn't dive into thencat(all_teams_drive_data_sync, ignore_index=True)
print(f"\nSynchronous fetching processed {len(space_teams_sync_df)} team drives.")
else:
print("\nNo drive data fetched synchronously.")
space_teams_sync_df = pd.DataFrame()
sync_end_time = time.time()
sync_elapsed_time = sync_end_time - sync_start_time
print(f"Execution time for synchronous drive fetching: {sync_elapsed_time:.2f} seconds")
# print(space_teams_sync_df.head() if not space_teams_sync_df.empty else "No sync data to show.")
Part 4: Asynchronous Data Fetching with aiohttp
To improve performance, we use aiohttp and asyncio for concurrent API calls. nest_asyncio is only neccesary if you are running this in a jupyter notebook. You might not even need this if you are running jupyter 7.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import asyncio
import aiohttp
import nest_asyncio
nest_asyncio.apply()
async def fetch_drive_async(session, teamid, team_name, auth_headers):
"""Asynchronously fetches drive information for a single team."""
drive_url = f'https://graph.microsoft.com/v1.0/groups/{teamid}/drive'
try:
async with session.get(drive_url, headers=auth_headers) as response:
if response.status == 200:
data = await response.json()
if 'quota' in data and 'used' in data['quota']:
team_frame = pd.json_normalize(data)
team_frame['team_id'] = teamid
team_frame['teamName'] = team_name
return team_frame
else:
# print(f"Warning: 'quota.used' not found for team {team_name} ({teamid}).") # Optional
return pd.DataFrame({'team_id': [teamid], 'teamName': [team_name], 'quota.used': [None], 'error_detail': ['Missing quota info']})
else:
# print(f"Error (async) for team {team_name} ({teamid}): {response.status}") # Optional
return pd.DataFrame({'team_id': [teamid], 'teamName': [team_name], 'error_status': [response.status]})
except Exception as e:
# print(f"Exception (async) for team {team_name} ({teamid}): {e}") # Optional
return pd.DataFrame({'team_id': [teamid], 'teamName': [team_name], 'exception': [str(e)]})
async def main_async_fetch(team_ids, team_names_list_async, auth_headers):
"""Main async function to gather drive information for all teams."""
all_teams_data = []
# For very large tenants, consider an asyncio.Semaphore to limit concurrency:
# sem = asyncio.Semaphore(10) # Limit to 10 concurrent requests
# async with sem:
# tasks.append(fetch_drive_async(session, teamid, team_names_list_async[i], auth_headers))
async with aiohttp.ClientSession() as session:
tasks = []
for i, teamid in enumerate(team_ids):
if i > 0 and i % 50 == 0: # Progress indicator
print(f"Queueing async fetch for team {i+1}/{len(team_ids)}: {team_names_list_async[i]}")
tasks.append(fetch_drive_async(session, teamid, team_names_list_async[i], auth_headers))
results = await asyncio.gather(*tasks, return_exceptions=True)
for res in results:
if isinstance(res, pd.DataFrame):
all_teams_data.append(res)
else:
print(f"A task failed with exception: {res}")
return pd.concat(all_teams_data, ignore_index=True) if all_teams_data else pd.DataFrame()
print("\nFetching drive storage for each team (Asynchronous approach)...")
current_headers = update_headers(msal_app, msal_scope) # Refresh token
async_start_time = time.time()
space_teams_async_df = asyncio.run(main_async_fetch(team_ids_list, team_names_list, current_headers))
async_end_time = time.time()
async_elapsed_time = async_end_time - async_start_time
print(f"\nAsynchronous fetching complete.")
print(f"Execution time for asynchronous drive fetching: {async_elapsed_time:.2f} seconds")
# print(space_teams_async_df.head() if not space_teams_async_df.empty else "No async data to show.")
Synchronous approach results:
Synchronous fetching processed 535 team drives. Data shape: (535, 10) Execution time for synchronous drive fetching: 186.42 seconds
Asynchronous approach results:
Asynchronous fetching complete. Data shape: (535, 10) Execution time for asynchronous drive fetching: 2.94 seconds
So there you go, the async approach returns the same data, just more than 60 times faster. We didn’t dive into the data this time, so I will tell you it was the Library that was using the most space. I almost feel bad for thinking it was Slytetherin, lazy thinking on my part.