Introduction
Perhaps you want to create a personal reminder system through Teams or grab data from SharePoint for monthly reports. Automating these processes saves valuable time, knocking repetitive tasks off of your to-do list. This step-by-step guide will walk through authenticating a graph client, which can be used to automate Microsoft 365 applications. In this example, we will work with Outlook to automate sending an email.
This tutorial will cover:
Creating a graph client
Authenticating manually through a web browser
Authenticating automatically with an access token
Authenticating automatically with a refresh token
The following repository will be installed and used throughout the tutorial:
The final tutorial code can be found here:
Creating a Graph Client
A graph client makes it easy to make calls to Microsoft Graph, the API for Microsoft 365 applications. Install the Office-365-REST-Python-Client repository linked above, and import GraphClient. The following send_email function creates a base for the program, with the majority of the tutorial on creating the get_token_response function used in instantiating the graph client.
from office365.graph_client import GraphClient
def send_email(subject: str, body: str, to_recipients: list, send_from: str):
# instantiate client
client = GraphClient(get_token_response)
# find desired user
user = client.users[send_from]
# send email
message = user.send_mail(
subject=subject,
body=body,
to_recipients=to_recipients,
)
message.execute_query()
Token Response
The token response contains json formatted data necessary to authenticate the graph client, such as the access token and the refresh token. For a deeper explanation of the authentication process, as well as the difference between access and refresh tokens, refer here. The means by which we obtain this token response depends on the current state of the program. There are three possible states:
This is the first time the program is run. Authentication is not yet automatic, and user intervention is required. We will get the access token by opening a web page and manually stating we allow the program to act on our behalf. The token response will be saved for later use.
The program has been run before, and the access token is still valid. We can simply read the access token from the previously saved token response.
The program has been run before, but the access token has expired. We will use a refresh token, read from the previously saved token response, to generate a new access token.
If you have not created an application in Azure Active Directory, do so now: How to Register an AAD App. Add the user you wish to send emails from as an application owner.
We will now create the get_token_response function.
Obtain the application ID:
Log into Microsoft Azure
Navigate to Azure Active Directory
Select App Registrations
Select the desired app and copy its Application (client) ID
Store this value as APP_ID.
Create a list of necessary scopes. These specify what permissions our application has. The scopes used below are specific to reading and writing emails. For other use cases, see all scope options here.
Import msal. Create a SerializableTokenCache, used to store the token response for future runs. Create a PublicClientApplication to link this program with the Azure AD app. Refer here for more information on public client applications and their respective parameters and methods.
import msal
def get_token_response():
APP_ID = "<enter-your-app-id>"
SCOPES = ["User.read", "Mail.ReadWrite", "Mail.Send", "Mail.Send.Shared"]
# create cache and public client app
cache = msal.SerializableTokenCache()
app = msal.PublicClientApplication(client_id=APP_ID, token_cache=cache)
Web Browser Authentication
Next, we will obtain the token through the web browser. Start this process with msal’s initiate_device_flow function, and print the user_code of the flow. This 9 digit code will later be copied from your terminal and pasted into a browser window.
To open this window automatically, provide the verification_uri of the flow to webbrowser.open().
Finally, get the token response by calling aquire_token_by_device_flow on your application.
import webbrowser
def get_token_response():
APP_ID = "<enter-your-app-id>"
SCOPES = ["User.read", "Mail.ReadWrite", "Mail.Send", "Mail.Send.Shared"]
# create cache and public client app
cache = msal.SerializableTokenCache()
app = msal.PublicClientApplication(client_id=APP_ID, token_cache=cache)
# authorize through web browser
flow = app.initiate_device_flow(scopes=SCOPES)
# print user code in terminal to copy and paste into web browser
print("user_code: " + flow["user_code"])
webbrowser.open(flow["verification_uri"])
# get token response
token_response = app.acquire_token_by_device_flow(flow)
return token_response
We have now completed the send_email function and the first, most simplistic version of the get_token_response function.
Test the program so far by running the send_email function, changing to_recipients and send_from to your desired email addresses.
send_email(
subject="Testing",
body="This is my test email!",
to_recipients=["test.recipient@gmail.com"],
send_from="test.sender@outlook.com"
)
Running this function should print a code to your terminal, open a web page to paste the code, and send an email to your desired address. If so, congratulations! Your authentication was successful. However, at this point we don’t have much of an automation, as user intervention is required.
Lets save this token response for the program to grab authentication data from in future runs. Open a json file and write the serialized cache to it.
# get token response
token_response = app.acquire_token_by_device_flow(flow)
# write token information to json file
with open("token_information.json", "w") as f:
f.write(cache.serialize())
return token_response
The method of authentication via web browser will now only be used the first time the application is run to obtain the first access token. The token response stored in token_information.json provides us with both an access and a refresh token. With this, we can automatically authenticate via the access token if it is still valid, or from the refresh token if the access token is expired.
Lets abstract authenticating via web browser by creating a separate function.
def acquire_token_by_web(app, scopes):
print("Acquiring token by web")
# authorize through web browser
flow = app.initiate_device_flow(scopes=scopes)
# print user code in terminal to copy and paste into web browser
print("user_code: " + flow["user_code"])
webbrowser.open(flow["verification_uri"])
# get token response
token_response = app.acquire_token_by_device_flow(flow)
return token_response
At this point, the the full program is as follows:
from office365.graph_client import GraphClient
import webbrowser
import msal
import json
def acquire_token_by_web(app, scopes):
# authorize through web browser
flow = app.initiate_device_flow(scopes=scopes)
# print user code in terminal to copy and paste into web browser
print("user_code: " + flow["user_code"])
webbrowser.open(flow["verification_uri"])
# get token response
token_response = app.acquire_token_by_device_flow(flow)
return token_response
def get_token_response():
APP_ID = "<enter-your-app-id>"
SCOPES = ["User.read", "Mail.ReadWrite", "Mail.Send", "Mail.Send.Shared"]
# create cache and public client app
cache = msal.SerializableTokenCache()
app = msal.PublicClientApplication(client_id=APP_ID, token_cache=cache)
# manually authenticate through web browser
token_response = acquire_token_by_web(app, SCOPES)
# write token information to json file
with open("token_information.json", "w") as f:
f.write(cache.serialize())
return token_response
def send_email(subject: str, body: str, to_recipients: list, send_from: str):
# instantiate client
client: GraphClient = GraphClient(get_token_response)
# find desired user
user = client.users[send_from]
# send email
message = user.send_mail(
subject=subject,
body=body,
to_recipients=to_recipients,
)
message.execute_query()
send_email(
subject="Testing",
body="This is my test email!",
to_recipients=["test.recipient@gmail.com"],
send_from="test.sender@outlook.com"
)
Automate Authentication with Access Token
Use the pathlib library to ensure we have previous token information stored. If the token information is not stored, that indicates this is the first time we have run the program, and we will authenticate manually via web browser instead.
from pathlib import Path
def get_token_response():
APP_ID = "<enter-your-app-id>"
SCOPES = ["User.read", "Mail.ReadWrite", "Mail.Send", "Mail.Send.Shared"]
# create cache and public client app
cache = msal.SerializableTokenCache()
app = msal.PublicClientApplication(client_id=APP_ID, token_cache=cache)
if Path("token_information.json").exists():
# token information is stored, authenticate via access token
else:
# first time running program - no token info exists, manually
# authenticate through web browser
token_response = acquire_token_by_web(app, SCOPES)
# write token information to json file
with open("token_information.json", "w") as f:
f.write(cache.serialize())
return token_response
If the path exists, deserialize the cache and read the stored token information. To authenticate with the saved access token, we will need the token’s expiration and the account, which can be found in the token_infomation.json file.
import json
if Path("token_information.json").exists():
# token information is stored, authenticate via access token
# deserialize the cache and read relevant information
cache.deserialize(open("token_information.json", "r").read())
with open("token_information.json", "r") as f:
token_information = json.load(f)
access_token_expiration = get_access_token_exp(token_information)
account = get_account(token_information)
Refer to the example of the file contents below to create the helper functions get_access_token_exp and get_account. This is where you will find the access token and account tags.
def get_access_token_exp(token_information):
# read access token expiration from token_information.json
access_token_tag = (
"3c4e7…"
)
access_token_expiration = int(
token_information["AccessToken"][access_token_tag]["expires_on"]
)
return access_token_expiration
def get_account(token_information):
account_tag = (
"3c4e7…"
)
account_info = token_information["Account"]
account = {
"home_account_id": account_info[account_tag]["home_account_id"],
"environment": account_info[account_tag]["environment"],
"username": account_info[account_tag]["username"],
"authority_type": account_info[account_tag]["authority_type"],
"local_account_id": account_info[account_tag]["local_account_id"],
"realm": account_info[account_tag]["realm"],
}
return account
Next, to determine if the access token is still valid, we will need to compare the current time with the access token’s expiration. Import time, and get the current time with time.time().
If the current time is less than the access token's expiration, the access token is still valid, and we can obtain the token response via msal’s acquire_token_silent() method. More information on this method can be found here.
import time
def get_token_response():
APP_ID = "<enter-your-app-id>"
SCOPES = ["User.read", "Mail.ReadWrite", "Mail.Send", "Mail.Send.Shared"]
# create cache and public client app
cache = msal.SerializableTokenCache()
app = msal.PublicClientApplication(client_id=APP_ID, token_cache=cache)
if Path("token_information.json").exists():
# deserialize the cache and read relevant information
cache.deserialize(open("token_information.json", "r").read())
with open("token_information.json", "r") as f:
token_information = json.load(f)
access_token_expiration = get_access_token_exp(token_information)
account = get_account(token_information)
# get current time
curr_time = time.time()
if curr_time < access_token_expiration:
# access token is still valid
token_response = app.acquire_token_silent(
scopes=SCOPES,
account=account,
)
else:
# access token has expired, manually authenticate through
# web browser
token_response = acquire_token_by_web(app, SCOPES)
else:
# first time running program - no token info exists, manually
# authenticate through web browser
token_response = acquire_token_by_web(app, SCOPES)
# write token information to json file
with open("token_information.json", "w") as f:
f.write(cache.serialize())
return token_response
Automate Authentication with Refresh Token
Once the access token expires (after 60-90 minutes), the refresh token will be used to obtain a new one. To do this, we will need the refresh token’s expiration to ensure it is still valid, as well as its value. Note: the refresh token expires 90 days after the last manual authentication. Modify acquire_token_by_web to store this time.
def acquire_token_by_web(app, scopes):
# update the last time manual authentication occurred
last_manual_auth = {"last_manual_auth": time.time()}
with open("last_manual_auth.json", "w") as f:
json.dump(last_manual_auth, f)
# authorize through web browser
flow = app.initiate_device_flow(scopes=scopes)
# print user code in terminal to copy and paste into web browser
print("user_code: " + flow["user_code"])
webbrowser.open(flow["verification_uri"])
# get token response
token_response = app.acquire_token_by_device_flow(flow)
return token_response
Let’s create helper functions get_refresh_token_exp and get_refresh_token to get the necessary information from token_information.json.
def get_refresh_token_exp():
# ensure file exists
if not Path("last_manual_auth.json").exists():
return 0
# read time of last manual authentication
with ("last_manual_auth.json").open("r") as f:
last_manual_auth = json.load(f)
# calculate refresh token expiration (90 days after last manual authentication)
refresh_token_expiration = (
int(last_manual_auth["last_manual_auth"]) + 7776000
)
return refresh_token_expiration
def get_refresh_token(token_information):
rt_tag = (
"3c4e7…"
)
refresh_token = token_information["RefreshToken"][rt_tag]["secret"]
return refresh_token
Add the following to get_token_response to obtain the necessary values and add a conditional to check if the refresh token is still valid if the access token has expired.
access_token_expiration = get_access_token_exp(token_information)
account = get_account(token_information)
# new lines
refresh_token_expiration = get_refresh_token_exp()
refresh_token = get_refresh_token(token_information)
# get current time
curr_time = time.time()
if curr_time < access_token_expiration:
# access token is still valid
token_response = app.acquire_token_silent(
scopes=SCOPES,
account=account,
)
# new lines
elif curr_time < refresh_token_expiration:
# access token has expired, but refresh token is still valid
token_response = app.acquire_token_by_refresh_token(
scopes=SCOPES, refresh_token=refresh_token
)
else:
# access token and refresh token have both expired, manually
# authenticate through web browser
token_response = acquire_token_by_web(app, SCOPES)
else:
# first time running program - no token info exists, manually
# authenticate through web browser
token_response = acquire_token_by_web(app, SCOPES)
The final program is as follows:
from office365.graph_client import GraphClient
import webbrowser
import msal
import json
from pathlib import Path
import time
def acquire_token_by_web(app, scopes):
# update the last time manual authentication occurred
last_manual_auth = {"last_manual_auth": time.time()}
with open("last_manual_auth.json", "w") as f:
json.dump(last_manual_auth, f)
# authorize through web browser
flow = app.initiate_device_flow(scopes=scopes)
# print user code in terminal to copy and paste into web browser
print("user_code: " + flow["user_code"])
webbrowser.open(flow["verification_uri"])
# get token response
token_response = app.acquire_token_by_device_flow(flow)
return token_response
def get_access_token_exp(token_information):
# read access token expiration from token_information.json
access_token_tag = (
"3c4e7…"
)
access_token_expiration = int(
token_information["AccessToken"][access_token_tag]["expires_on"]
)
return access_token_expiration
def get_account(token_information):
account_tag = (
"3c4e7…"
)
account_info = token_information["Account"]
account = {
"home_account_id": account_info[account_tag]["home_account_id"],
"environment": account_info[account_tag]["environment"],
"username": account_info[account_tag]["username"],
"authority_type": account_info[account_tag]["authority_type"],
"local_account_id": account_info[account_tag]["local_account_id"],
"realm": account_info[account_tag]["realm"],
}
return account
def get_refresh_token_exp():
# ensure file exists
if not Path("last_manual_auth.json").exists():
return 0
# read time of last manual authentication
with ("last_manual_auth.json").open("r") as f:
last_manual_auth = json.load(f)
# calculate refresh token expiration (90 days after last manual authentication)
refresh_token_expiration = (
int(last_manual_auth["last_manual_auth"]) + 7776000
)
return refresh_token_expiration
def get_refresh_token(token_information):
rt_tag = (
"3c4e7…"
)
refresh_token = token_information["RefreshToken"][rt_tag]["secret"]
return refresh_token
def get_token_response():
APP_ID = "<enter-your-app-id>"
SCOPES = ["User.read", "Mail.ReadWrite", "Mail.Send", "Mail.Send.Shared"]
# create cache and public client app
cache = msal.SerializableTokenCache()
app = msal.PublicClientApplication(client_id=APP_ID, token_cache=cache)
if Path("token_information.json").exists():
# deserialize the cache and read relevant information
cache.deserialize(open("token_information.json", "r").read())
with open("token_information.json", "r") as f:
token_information = json.load(f)
access_token_expiration = get_access_token_exp(token_information)
account = get_account(token_information)
refresh_token_expiration = get_refresh_token_exp()
refresh_token = get_refresh_token(token_information)
# get current time
curr_time = time.time()
if curr_time < access_token_expiration:
# access token is still valid
token_response = app.acquire_token_silent(
scopes=SCOPES,
account=account,
)
elif curr_time < refresh_token_expiration:
# access token has expired, but refresh token is still valid
token_response = app.acquire_token_by_refresh_token(
scopes=SCOPES, refresh_token=refresh_token
)
else:
# access token has expired, manually authenticate through
# web browser
token_response = acquire_token_by_web(app, SCOPES)
else:
# first time running program - no token info exists, manually
# authenticate through web browser
token_response = acquire_token_by_web(app, SCOPES)
# write token information to json file
with open("token_information.json", "w") as f:
f.write(cache.serialize())
return token_response
def send_email(subject: str, body: str, to_recipients: list, send_from: str):
# instantiate client
client: GraphClient = GraphClient(get_token_response)
# find desired user
user = client.users[send_from]
# send email
message = user.send_mail(
subject=subject,
body=body,
to_recipients=to_recipients,
)
message.execute_query()
send_email(
subject="Testing",
body="This is my test email!",
to_recipients=["test.recipient@gmail.com"],
send_from="test.sender@outlook.com"
)
With the ability to authenticate via stored access and refresh tokens, your email sender is fully automated! Using these methods to authenticate a graph client, you can automate an endless number of processes using Microsoft 365 applications.
For the full program, refer to https://github.com/Depot-Analytics/M365-Graph-Client-Authentication.git
Happy automating!
Comments