top of page
Writer's pictureRebecca Hampton

Automate Office 365 - Graph Client Authentication in Python

Introduction

Perhaps you want to create a personal reminder system through Teams or grab data from SharePoint for monthly reports. Automating these processes saves valuable time, knocking repetitive tasks off of your to-do list. This step-by-step guide will walk through authenticating a graph client, which can be used to automate Microsoft 365 applications. In this example, we will work with Outlook to automate sending an email.


This tutorial will cover:

  • Creating a graph client

  • Authenticating manually through a web browser

  • Authenticating automatically with an access token

  • Authenticating automatically with a refresh token


The following repository will be installed and used throughout the tutorial:


The final tutorial code can be found here:


Creating a Graph Client

A graph client makes it easy to make calls to Microsoft Graph, the API for Microsoft 365 applications. Install the Office-365-REST-Python-Client repository linked above, and import GraphClient. The following send_email function creates a base for the program, with the majority of the tutorial on creating the get_token_response function used in instantiating the graph client.


from office365.graph_client import GraphClient

def send_email(subject: str, body: str, to_recipients: list, send_from: str):

    # instantiate client
    client = GraphClient(get_token_response)

    # find desired user
    user = client.users[send_from]

    # send email
    message = user.send_mail(
        subject=subject,
        body=body,
        to_recipients=to_recipients,
    )
    message.execute_query()

Token Response

The token response contains json formatted data necessary to authenticate the graph client, such as the access token and the refresh token. For a deeper explanation of the authentication process, as well as the difference between access and refresh tokens, refer here. The means by which we obtain this token response depends on the current state of the program. There are three possible states:

  1. This is the first time the program is run. Authentication is not yet automatic, and user intervention is required. We will get the access token by opening a web page and manually stating we allow the program to act on our behalf. The token response will be saved for later use.

  2. The program has been run before, and the access token is still valid. We can simply read the access token from the previously saved token response.

  3. The program has been run before, but the access token has expired. We will use a refresh token, read from the previously saved token response, to generate a new access token.


If you have not created an application in Azure Active Directory, do so now: How to Register an AAD App. Add the user you wish to send emails from as an application owner.


We will now create the get_token_response function.


Obtain the application ID:

  1. Log into Microsoft Azure

  2. Navigate to Azure Active Directory

  3. Select App Registrations

  4. Select the desired app and copy its Application (client) ID

Store this value as APP_ID.


Create a list of necessary scopes. These specify what permissions our application has. The scopes used below are specific to reading and writing emails. For other use cases, see all scope options here.


Import msal. Create a SerializableTokenCache, used to store the token response for future runs. Create a PublicClientApplication to link this program with the Azure AD app. Refer here for more information on public client applications and their respective parameters and methods.


import msal

def get_token_response():

    APP_ID = "<enter-your-app-id>"
    SCOPES = ["User.read", "Mail.ReadWrite", "Mail.Send", "Mail.Send.Shared"]

    # create cache and public client app
    cache = msal.SerializableTokenCache()
    app = msal.PublicClientApplication(client_id=APP_ID, token_cache=cache)

Web Browser Authentication

Next, we will obtain the token through the web browser. Start this process with msal’s initiate_device_flow function, and print the user_code of the flow. This 9 digit code will later be copied from your terminal and pasted into a browser window.


To open this window automatically, provide the verification_uri of the flow to webbrowser.open().


Finally, get the token response by calling aquire_token_by_device_flow on your application.

import webbrowser

def get_token_response():

    APP_ID = "<enter-your-app-id>"
    SCOPES = ["User.read", "Mail.ReadWrite", "Mail.Send", "Mail.Send.Shared"]

    # create cache and public client app
    cache = msal.SerializableTokenCache()
    app = msal.PublicClientApplication(client_id=APP_ID, token_cache=cache)

    # authorize through web browser
    flow = app.initiate_device_flow(scopes=SCOPES)
    # print user code in terminal to copy and paste into web browser
    print("user_code: " + flow["user_code"])
    webbrowser.open(flow["verification_uri"])

    # get token response
    token_response = app.acquire_token_by_device_flow(flow)

    return token_response

We have now completed the send_email function and the first, most simplistic version of the get_token_response function.


Test the program so far by running the send_email function, changing to_recipients and send_from to your desired email addresses.

send_email(
subject="Testing", 
body="This is my test email!", 
to_recipients=["test.recipient@gmail.com"],  
send_from="test.sender@outlook.com"
)

Running this function should print a code to your terminal, open a web page to paste the code, and send an email to your desired address. If so, congratulations! Your authentication was successful. However, at this point we don’t have much of an automation, as user intervention is required.


Lets save this token response for the program to grab authentication data from in future runs. Open a json file and write the serialized cache to it.

    # get token response
    token_response = app.acquire_token_by_device_flow(flow)

    # write token information to json file
    with open("token_information.json", "w") as f:
        f.write(cache.serialize())

    return token_response

The method of authentication via web browser will now only be used the first time the application is run to obtain the first access token. The token response stored in token_information.json provides us with both an access and a refresh token. With this, we can automatically authenticate via the access token if it is still valid, or from the refresh token if the access token is expired.


Lets abstract authenticating via web browser by creating a separate function.

def acquire_token_by_web(app, scopes):
    print("Acquiring token by web")
 
    # authorize through web browser
    flow = app.initiate_device_flow(scopes=scopes)
    # print user code in terminal to copy and paste into web browser
    print("user_code: " + flow["user_code"])
    webbrowser.open(flow["verification_uri"])

    # get token response
    token_response = app.acquire_token_by_device_flow(flow)

    return token_response

At this point, the the full program is as follows:

from office365.graph_client import GraphClient
import webbrowser
import msal
import json

def acquire_token_by_web(app, scopes):
 
   # authorize through web browser
   flow = app.initiate_device_flow(scopes=scopes)
   # print user code in terminal to copy and paste into web browser
   print("user_code: " + flow["user_code"])
   webbrowser.open(flow["verification_uri"])

   # get token response
   token_response = app.acquire_token_by_device_flow(flow)

   return token_response


def get_token_response():

    APP_ID = "<enter-your-app-id>"
    SCOPES = ["User.read", "Mail.ReadWrite", "Mail.Send", "Mail.Send.Shared"]

    # create cache and public client app
    cache = msal.SerializableTokenCache()
    app = msal.PublicClientApplication(client_id=APP_ID, token_cache=cache)

    # manually authenticate through web browser
    token_response = acquire_token_by_web(app, SCOPES)

    # write token information to json file
    with open("token_information.json", "w") as f:
        f.write(cache.serialize())

    return token_response


def send_email(subject: str, body: str, to_recipients: list, send_from: str):

    # instantiate client
    client: GraphClient = GraphClient(get_token_response)

    # find desired user
    user = client.users[send_from]

    # send email
    message = user.send_mail(
        subject=subject,
        body=body,
        to_recipients=to_recipients,
    )

    message.execute_query()

send_email(
subject="Testing", 
body="This is my test email!", 
to_recipients=["test.recipient@gmail.com"],  
send_from="test.sender@outlook.com"
)

Automate Authentication with Access Token

Use the pathlib library to ensure we have previous token information stored. If the token information is not stored, that indicates this is the first time we have run the program, and we will authenticate manually via web browser instead.


from pathlib import Path

def get_token_response():

    APP_ID = "<enter-your-app-id>"
    SCOPES = ["User.read", "Mail.ReadWrite", "Mail.Send", "Mail.Send.Shared"]

    # create cache and public client app
    cache = msal.SerializableTokenCache()
    app = msal.PublicClientApplication(client_id=APP_ID, token_cache=cache)

    if Path("token_information.json").exists():
        # token information is stored, authenticate via access token	
    else:
        # first time running program - no token info exists, manually 
  # authenticate through web browser
        token_response = acquire_token_by_web(app, SCOPES)

    # write token information to json file
    with open("token_information.json", "w") as f:
        f.write(cache.serialize())

    return token_response

If the path exists, deserialize the cache and read the stored token information. To authenticate with the saved access token, we will need the token’s expiration and the account, which can be found in the token_infomation.json file.

import json

if Path("token_information.json").exists():
	  # token information is stored, authenticate via access token
        # deserialize the cache and read relevant information
        cache.deserialize(open("token_information.json", "r").read())
        with open("token_information.json", "r") as f:
            token_information = json.load(f)

        access_token_expiration = get_access_token_exp(token_information)
        account = get_account(token_information)

Refer to the example of the file contents below to create the helper functions get_access_token_exp and get_account. This is where you will find the access token and account tags.



def get_access_token_exp(token_information):
    # read access token expiration from token_information.json
    access_token_tag = (
        "3c4e7…"
    )
    access_token_expiration = int(
        token_information["AccessToken"][access_token_tag]["expires_on"]
    )

    return access_token_expiration


def get_account(token_information):
    account_tag = (
        "3c4e7…"
    )
    account_info = token_information["Account"]
    account = {
        "home_account_id": account_info[account_tag]["home_account_id"],
        "environment": account_info[account_tag]["environment"],
        "username": account_info[account_tag]["username"],
        "authority_type": account_info[account_tag]["authority_type"],
        "local_account_id": account_info[account_tag]["local_account_id"],
        "realm": account_info[account_tag]["realm"],
    }

    return account

Next, to determine if the access token is still valid, we will need to compare the current time with the access token’s expiration. Import time, and get the current time with time.time().


If the current time is less than the access token's expiration, the access token is still valid, and we can obtain the token response via msal’s acquire_token_silent() method. More information on this method can be found here.

import time

def get_token_response():

    APP_ID = "<enter-your-app-id>"
    SCOPES = ["User.read", "Mail.ReadWrite", "Mail.Send", "Mail.Send.Shared"]

    # create cache and public client app
    cache = msal.SerializableTokenCache()
    app = msal.PublicClientApplication(client_id=APP_ID, token_cache=cache)

    if Path("token_information.json").exists():

        # deserialize the cache and read relevant information
        cache.deserialize(open("token_information.json", "r").read())
        with open("token_information.json", "r") as f:
            token_information = json.load(f)

        access_token_expiration = get_access_token_exp(token_information)
        account = get_account(token_information)

        # get current time
        curr_time = time.time()

        if curr_time < access_token_expiration:
            # access token is still valid
            token_response = app.acquire_token_silent(
                scopes=SCOPES,
                account=account,
            )
        else:
            # access token has expired, manually authenticate through 		
      # web browser
            token_response = acquire_token_by_web(app, SCOPES)
    else:
        # first time running program - no token info exists, manually 
  # authenticate through web browser
        token_response = acquire_token_by_web(app, SCOPES)

    # write token information to json file
    with open("token_information.json", "w") as f:
        f.write(cache.serialize())

    return token_response

Automate Authentication with Refresh Token

Once the access token expires (after 60-90 minutes), the refresh token will be used to obtain a new one. To do this, we will need the refresh token’s expiration to ensure it is still valid, as well as its value. Note: the refresh token expires 90 days after the last manual authentication. Modify acquire_token_by_web to store this time.

def acquire_token_by_web(app, scopes):

    # update the last time manual authentication occurred
    last_manual_auth = {"last_manual_auth": time.time()}
    with open("last_manual_auth.json", "w") as f:
        json.dump(last_manual_auth, f)

    # authorize through web browser
    flow = app.initiate_device_flow(scopes=scopes)
    # print user code in terminal to copy and paste into web browser
    print("user_code: " + flow["user_code"])
    webbrowser.open(flow["verification_uri"])

    # get token response
    token_response = app.acquire_token_by_device_flow(flow)

    return token_response

Let’s create helper functions get_refresh_token_exp and get_refresh_token to get the necessary information from token_information.json.

def get_refresh_token_exp():
    # ensure file exists
        if not Path("last_manual_auth.json").exists():
        return 0

    # read time of last manual authentication
    with ("last_manual_auth.json").open("r") as f:
        last_manual_auth = json.load(f)

    # calculate refresh token expiration (90 days after last manual authentication)
    refresh_token_expiration = (
        int(last_manual_auth["last_manual_auth"]) + 7776000
    )

    return refresh_token_expiration


def get_refresh_token(token_information):
    rt_tag = (
        "3c4e7…"
    )
    refresh_token = token_information["RefreshToken"][rt_tag]["secret"]

    return refresh_token

Add the following to get_token_response to obtain the necessary values and add a conditional to check if the refresh token is still valid if the access token has expired.

        access_token_expiration = get_access_token_exp(token_information)
        account = get_account(token_information)
        # new lines
        refresh_token_expiration = get_refresh_token_exp()
        refresh_token = get_refresh_token(token_information)

        # get current time
        curr_time = time.time()

        if curr_time < access_token_expiration:
            # access token is still valid
            token_response = app.acquire_token_silent(
                scopes=SCOPES,
                account=account,
            )
        # new lines
        elif curr_time < refresh_token_expiration:
            # access token has expired, but refresh token is still valid
            token_response = app.acquire_token_by_refresh_token(
                scopes=SCOPES, refresh_token=refresh_token
            )
        else:
            # access token and refresh token have both expired, manually
            # authenticate through web browser
            token_response = acquire_token_by_web(app, SCOPES)
    else:
        # first time running program - no token info exists, manually 
        #  authenticate through web browser
        token_response = acquire_token_by_web(app, SCOPES)

The final program is as follows:

from office365.graph_client import GraphClient
import webbrowser
import msal
import json
from pathlib import Path
import time

def acquire_token_by_web(app, scopes):
   
   # update the last time manual authentication occurred
   last_manual_auth = {"last_manual_auth": time.time()}
   with open("last_manual_auth.json", "w") as f:
       json.dump(last_manual_auth, f)
 
   # authorize through web browser
   flow = app.initiate_device_flow(scopes=scopes)
   # print user code in terminal to copy and paste into web browser
   print("user_code: " + flow["user_code"])
   webbrowser.open(flow["verification_uri"])

   # get token response
   token_response = app.acquire_token_by_device_flow(flow)

   return token_response


def get_access_token_exp(token_information):
    # read access token expiration from token_information.json
    access_token_tag = (
        "3c4e7…"
    )
    access_token_expiration = int(
        token_information["AccessToken"][access_token_tag]["expires_on"]
    )

    return access_token_expiration


def get_account(token_information):
    account_tag = (
        "3c4e7…"
    )
    account_info = token_information["Account"]
    account = {
        "home_account_id": account_info[account_tag]["home_account_id"],
        "environment": account_info[account_tag]["environment"],
        "username": account_info[account_tag]["username"],
        "authority_type": account_info[account_tag]["authority_type"],
        "local_account_id": account_info[account_tag]["local_account_id"],
        "realm": account_info[account_tag]["realm"],
    }

    return account


def get_refresh_token_exp():
    # ensure file exists
    if not Path("last_manual_auth.json").exists():
        return 0

    # read time of last manual authentication
    with ("last_manual_auth.json").open("r") as f:
        last_manual_auth = json.load(f)

    # calculate refresh token expiration (90 days after last manual authentication)
    refresh_token_expiration = (
        int(last_manual_auth["last_manual_auth"]) + 7776000
    )

    return refresh_token_expiration


def get_refresh_token(token_information):
    rt_tag = (
        "3c4e7…"
    )
    refresh_token = token_information["RefreshToken"][rt_tag]["secret"]

    return refresh_token


def get_token_response():

    APP_ID = "<enter-your-app-id>"
    SCOPES = ["User.read", "Mail.ReadWrite", "Mail.Send", "Mail.Send.Shared"]

    # create cache and public client app
    cache = msal.SerializableTokenCache()
    app = msal.PublicClientApplication(client_id=APP_ID, token_cache=cache)

    if Path("token_information.json").exists():

        # deserialize the cache and read relevant information
        cache.deserialize(open("token_information.json", "r").read())
        with open("token_information.json", "r") as f:
            token_information = json.load(f)

        access_token_expiration = get_access_token_exp(token_information)
        account = get_account(token_information)
        refresh_token_expiration = get_refresh_token_exp()
        refresh_token = get_refresh_token(token_information)

        # get current time
        curr_time = time.time()

        if curr_time < access_token_expiration:
            # access token is still valid
            token_response = app.acquire_token_silent(
                scopes=SCOPES,
                account=account,
            )
        elif curr_time < refresh_token_expiration:
            # access token has expired, but refresh token is still valid
            token_response = app.acquire_token_by_refresh_token(
                scopes=SCOPES, refresh_token=refresh_token
            )
        else:
            # access token has expired, manually authenticate through      
      # web browser
            token_response = acquire_token_by_web(app, SCOPES)
    else:
        # first time running program - no token info exists, manually
        # authenticate through web browser
        token_response = acquire_token_by_web(app, SCOPES)

    # write token information to json file
    with open("token_information.json", "w") as f:
        f.write(cache.serialize())

    return token_response


def send_email(subject: str, body: str, to_recipients: list, send_from: str):

    # instantiate client
    client: GraphClient = GraphClient(get_token_response)

    # find desired user
    user = client.users[send_from]

    # send email
    message = user.send_mail(
        subject=subject,
        body=body,
        to_recipients=to_recipients,
    )

    message.execute_query()


send_email(
subject="Testing",
body="This is my test email!",
to_recipients=["test.recipient@gmail.com"],  
send_from="test.sender@outlook.com"
)

With the ability to authenticate via stored access and refresh tokens, your email sender is fully automated! Using these methods to authenticate a graph client, you can automate an endless number of processes using Microsoft 365 applications.



Happy automating!


Comments


bottom of page