Organizing emails into structured labels based on their dates with Python

This Python script is designed to automate the organization of emails in Gmail by applying labels based on their date. It extracts the date from each email, and creates or applies labels accordingly.

The script categorizes emails into yearly and monthly labels within a root label named “Yearly Overview.”

It also tracks the progress of labeling and provides detailed information such as start and end times, duration, and the average time taken per message.

Prerequisites

To run the script successfully, you will need the following prerequisites:

  1. Create a new Google Project
  2. Enable the Drive API
  3. Create an OAuth consent screen
  4. Create OAuth 2.0 Client IDs – Desktop App
  5. Download the JSON file
  6. Setup Python
    $ sudo apt install python3-pip
    $ pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib

Further information can be found in the Python quickstart guide from Google.

In order to run the script, simply rename the JSON file you’ve downloaded to “credentials.json“, position it within the script’s directory.

Script

import os
import re
import datetime
import time
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow

# Define OAuth scopes for accessing Gmail API
# If modifying these scopes, delete the file token.json
SCOPES = ['https://www.googleapis.com/auth/gmail.modify']
TOKEN_PATH = 'token.json'
CREDENTIALS_PATH = 'credentials.json'
user_id = 'me'

# Function to retrieve or prompt for user credentials
def get_credentials():
    # Gets user credentials or prompts for authorization if needed
    # Try to load existing credentials from 'token.json'
    creds = Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES) if os.path.exists(TOKEN_PATH) else None

    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            # Refresh the credentials if they are expired and can be refreshed
            creds.refresh(Request())
        else:
            # If no valid credentials exist, initiate the OAuth flow to obtain them
            flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_PATH, SCOPES)
            creds = flow.run_local_server(port=0)

        # Save the obtained credentials for future runs
        with open(TOKEN_PATH, 'w') as token_file:
            token_file.write(creds.to_json())

    return creds

# Function to create a Gmail label if it doesn't exist
def create_label_if_not_exists(service, user_id, label_name):
    try:
        service.users().labels().get(userId=user_id, id=label_name).execute()
        print('Label already exists:', label_name)
    except HttpError as e:
        if e.status_code == 404:  # Label not found
            create_label(service, user_id, label_name)
        elif e.status_code == 409:  # Label conflict
            print('Label already exists or conflicts:', label_name)
            return
        else:
            print('Error checking label:', e)

# Function to create a Gmail label
def create_label(service, user_id, label_name):
    label = {'name': label_name, 'messageListVisibility': 'show', 'labelListVisibility': 'labelShow'}
    try:
        created_label = service.users().labels().create(userId=user_id, body=label).execute()
        print('Label created:', created_label['name'])
    except HttpError as e:
        print('Error creating label:', e)

# Function to apply a label to an email with retry logic
def apply_label_with_retry(service, user_id, message_id, label_id, label_name, counter, total_messages, max_retries=3, delay=2):
    for attempt in range(max_retries):
        try:
            body = {'addLabelIds': [label_id]}
            service.users().messages().modify(userId=user_id, id=message_id, body=body).execute()
            print(f'Label applied: {label_name} | {counter} of {total_messages} | {(counter / total_messages * 100):.2f}%') # Print progress
            return
        except Exception as e:
            print(f"Error applying label to email (attempt {attempt+1}/{max_retries}): {e}")
            if attempt < max_retries - 1:
                print(f"Retrying in {delay} seconds...")
                time.sleep(delay)
                delay *= 2
            else:
                print("Maximum retry attempts reached. Aborting.")

# Function to extract year and month from a date string
def extract_year_month(date_str):
    try:
        date_str = re.sub(r'(?<=\d{2}:\d{2}:\d{2}).*', '', date_str)
        try:
            date_obj = datetime.datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S')
        except ValueError:
            date_obj = datetime.datetime.strptime(date_str, '%d %b %Y %H:%M:%S')
        
        year = str(date_obj.year)
        month = str(date_obj.month).zfill(2)
        label_name = year + "-" + month
        return label_name
    except ValueError:
        print("Failed to parse the date header:", date_str)
        return None

# Function to get label ID by label name
def get_label_id(service, user_id, label_name):
    labels_list = service.users().labels().list(userId=user_id).execute()
    labels = labels_list.get('labels', [])
    for label in labels:
        if label['name'] == label_name:
            return label['id']
    return None

# Function to retrieve emails matching a query
def get_emails(service, user_id, query):
    messages = []
    page_token = None
    while True:
        response = service.users().messages().list(userId=user_id, q=query, pageToken=page_token).execute()
        messages.extend(response.get('messages', []))
        page_token = response.get('nextPageToken')
        if not page_token:
            break
    return messages

# Function to apply or create label for an email
def apply_or_create_label(service, user_id, message_id, label_name):
    try:
        label_id = get_label_id(service, user_id, label_name)
        if label_id:
            apply_label_with_retry(service, user_id, message_id, label_id, label_name)
        else:
            create_label(service, user_id, label_name)
            label_id = get_label_id(service, user_id, label_name)
            if label_id:
                apply_label_with_retry(service, user_id, message_id, label_id, label_name)
            else:
                print("Failed to apply label:", label_name)
    except Exception as e:
        print("Error applying or creating label:", e)

# Main function
def main():
    start_time = int(time.time())
    
    counter = 1  # Initialize counter

    # Get credentials
    creds = get_credentials()

    # Build Gmail service
    service = build('gmail', 'v1', credentials=creds)

    profile = service.users().getProfile(userId=user_id).execute()
    total_messages = profile['messagesTotal']

    # Check if "Yearly Overview" label exists, create if it doesn't
    yearly_overview_label_id = get_label_id(service, user_id, "Yearly Overview")
    if not yearly_overview_label_id:
        create_label(service, user_id, "Yearly Overview")
        yearly_overview_label_id = get_label_id(service, user_id, "Yearly Overview")

    # Process emails in "All Mail"
    query = 'in:all'
    messages = get_emails(service, user_id, query)
    previous_label_id = yearly_overview_label_id

    for message in messages:
        msg = service.users().messages().get(userId=user_id, id=message['id']).execute()
        headers = msg['payload'].get('headers', [])
        date_header = next((header['value'] for header in headers if header['name'] == 'Date'), None)
        
        if date_header:
            label_name = extract_year_month(date_header)
            if label_name:
                year, month = label_name.split('-')
                year_label_name = f"Yearly Overview/{year}"
                year_label_id = get_label_id(service, user_id, year_label_name)
                if not year_label_id:
                    create_label(service, user_id, year_label_name)
                    year_label_id = get_label_id(service, user_id, year_label_name)
                
                month_label_name = f"Yearly Overview/{year}/{month}"
                month_label_id = get_label_id(service, user_id, month_label_name)
                if not month_label_id:
                    create_label(service, user_id, month_label_name)
                    month_label_id = get_label_id(service, user_id, month_label_name)
                
                apply_label_with_retry(service, user_id, message['id'], previous_label_id, label_name, counter, total_messages)
                
                previous_label_id = month_label_id
            else:
                print("Failed to determine monthly label, applying previous label...")
                apply_label_with_retry(service, user_id, message['id'], previous_label_id, label_name, counter, total_messages)
        else:
            print("Date header not found in email, applying previous label...")
            apply_label_with_retry(service, user_id, message['id'], previous_label_id, label_name, counter, total_messages)
        counter += 1  # Increment counter for each message

    end_time = int(time.time())

    print("Start Time:", datetime.datetime.fromtimestamp(start_time).strftime("%H:%M:%S"))
    print("End Time:", datetime.datetime.fromtimestamp(end_time).strftime("%H:%M:%S"))
    duration_seconds = end_time - start_time
    print("Duration:", datetime.timedelta(seconds=round(duration_seconds))) #shows days and removes the decimal seconds
    
    print("Total Messages:", total_messages)

    if total_messages > 0:
        seconds_per_message = duration_seconds / total_messages
        seconds_per_message = "{:.3f}".format(seconds_per_message)
        print("Seconds per Message:", seconds_per_message)

if __name__ == '__main__':
    main()
Final Yearly Label Output
Final output with detailed information after finishing the labeling process.
asterix Written by:

Be First to Comment

Leave a Reply

Your email address will not be published. Required fields are marked *