This Python script is designed to automate the organization of emails in Gmail by applying labels based on their date. It extracts the date from each email, and creates or applies labels accordingly.
The script categorizes emails into yearly and monthly labels within a root label named “Yearly Overview.”
It also tracks the progress of labeling and provides detailed information such as start and end times, duration, and the average time taken per message.
Prerequisites
To run the script successfully, you will need the following prerequisites:
- Create a new Google Project
- Enable the Drive API
- Create an OAuth consent screen
- Create OAuth 2.0 Client IDs – Desktop App
- Download the JSON file
- Setup Python
$ sudo apt install python3-pip
$ pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib
Further information can be found in the Python quickstart guide from Google.
In order to run the script, simply rename the JSON file you’ve downloaded to “credentials.json“, position it within the script’s directory.
Script
import os
import re
import datetime
import time
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow
# Define OAuth scopes for accessing Gmail API
# If modifying these scopes, delete the file token.json
SCOPES = ['https://www.googleapis.com/auth/gmail.modify']
TOKEN_PATH = 'token.json'
CREDENTIALS_PATH = 'credentials.json'
user_id = 'me'
# Function to retrieve or prompt for user credentials
def get_credentials():
# Gets user credentials or prompts for authorization if needed
# Try to load existing credentials from 'token.json'
creds = Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES) if os.path.exists(TOKEN_PATH) else None
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
# Refresh the credentials if they are expired and can be refreshed
creds.refresh(Request())
else:
# If no valid credentials exist, initiate the OAuth flow to obtain them
flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_PATH, SCOPES)
creds = flow.run_local_server(port=0)
# Save the obtained credentials for future runs
with open(TOKEN_PATH, 'w') as token_file:
token_file.write(creds.to_json())
return creds
# Function to create a Gmail label if it doesn't exist
def create_label_if_not_exists(service, user_id, label_name):
try:
service.users().labels().get(userId=user_id, id=label_name).execute()
print('Label already exists:', label_name)
except HttpError as e:
if e.status_code == 404: # Label not found
create_label(service, user_id, label_name)
elif e.status_code == 409: # Label conflict
print('Label already exists or conflicts:', label_name)
return
else:
print('Error checking label:', e)
# Function to create a Gmail label
def create_label(service, user_id, label_name):
label = {'name': label_name, 'messageListVisibility': 'show', 'labelListVisibility': 'labelShow'}
try:
created_label = service.users().labels().create(userId=user_id, body=label).execute()
print('Label created:', created_label['name'])
except HttpError as e:
print('Error creating label:', e)
# Function to apply a label to an email with retry logic
def apply_label_with_retry(service, user_id, message_id, label_id, label_name, counter, total_messages, max_retries=3, delay=2):
for attempt in range(max_retries):
try:
body = {'addLabelIds': [label_id]}
service.users().messages().modify(userId=user_id, id=message_id, body=body).execute()
print(f'Label applied: {label_name} | {counter} of {total_messages} | {(counter / total_messages * 100):.2f}%') # Print progress
return
except Exception as e:
print(f"Error applying label to email (attempt {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
print(f"Retrying in {delay} seconds...")
time.sleep(delay)
delay *= 2
else:
print("Maximum retry attempts reached. Aborting.")
# Function to extract year and month from a date string
def extract_year_month(date_str):
try:
date_str = re.sub(r'(?<=\d{2}:\d{2}:\d{2}).*', '', date_str)
try:
date_obj = datetime.datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S')
except ValueError:
date_obj = datetime.datetime.strptime(date_str, '%d %b %Y %H:%M:%S')
year = str(date_obj.year)
month = str(date_obj.month).zfill(2)
label_name = year + "-" + month
return label_name
except ValueError:
print("Failed to parse the date header:", date_str)
return None
# Function to get label ID by label name
def get_label_id(service, user_id, label_name):
labels_list = service.users().labels().list(userId=user_id).execute()
labels = labels_list.get('labels', [])
for label in labels:
if label['name'] == label_name:
return label['id']
return None
# Function to retrieve emails matching a query
def get_emails(service, user_id, query):
messages = []
page_token = None
while True:
response = service.users().messages().list(userId=user_id, q=query, pageToken=page_token).execute()
messages.extend(response.get('messages', []))
page_token = response.get('nextPageToken')
if not page_token:
break
return messages
# Function to apply or create label for an email
def apply_or_create_label(service, user_id, message_id, label_name):
try:
label_id = get_label_id(service, user_id, label_name)
if label_id:
apply_label_with_retry(service, user_id, message_id, label_id, label_name)
else:
create_label(service, user_id, label_name)
label_id = get_label_id(service, user_id, label_name)
if label_id:
apply_label_with_retry(service, user_id, message_id, label_id, label_name)
else:
print("Failed to apply label:", label_name)
except Exception as e:
print("Error applying or creating label:", e)
# Main function
def main():
start_time = int(time.time())
counter = 1 # Initialize counter
# Get credentials
creds = get_credentials()
# Build Gmail service
service = build('gmail', 'v1', credentials=creds)
profile = service.users().getProfile(userId=user_id).execute()
total_messages = profile['messagesTotal']
# Check if "Yearly Overview" label exists, create if it doesn't
yearly_overview_label_id = get_label_id(service, user_id, "Yearly Overview")
if not yearly_overview_label_id:
create_label(service, user_id, "Yearly Overview")
yearly_overview_label_id = get_label_id(service, user_id, "Yearly Overview")
# Process emails in "All Mail"
query = 'in:all'
messages = get_emails(service, user_id, query)
previous_label_id = yearly_overview_label_id
for message in messages:
msg = service.users().messages().get(userId=user_id, id=message['id']).execute()
headers = msg['payload'].get('headers', [])
date_header = next((header['value'] for header in headers if header['name'] == 'Date'), None)
if date_header:
label_name = extract_year_month(date_header)
if label_name:
year, month = label_name.split('-')
year_label_name = f"Yearly Overview/{year}"
year_label_id = get_label_id(service, user_id, year_label_name)
if not year_label_id:
create_label(service, user_id, year_label_name)
year_label_id = get_label_id(service, user_id, year_label_name)
month_label_name = f"Yearly Overview/{year}/{month}"
month_label_id = get_label_id(service, user_id, month_label_name)
if not month_label_id:
create_label(service, user_id, month_label_name)
month_label_id = get_label_id(service, user_id, month_label_name)
apply_label_with_retry(service, user_id, message['id'], previous_label_id, label_name, counter, total_messages)
previous_label_id = month_label_id
else:
print("Failed to determine monthly label, applying previous label...")
apply_label_with_retry(service, user_id, message['id'], previous_label_id, label_name, counter, total_messages)
else:
print("Date header not found in email, applying previous label...")
apply_label_with_retry(service, user_id, message['id'], previous_label_id, label_name, counter, total_messages)
counter += 1 # Increment counter for each message
end_time = int(time.time())
print("Start Time:", datetime.datetime.fromtimestamp(start_time).strftime("%H:%M:%S"))
print("End Time:", datetime.datetime.fromtimestamp(end_time).strftime("%H:%M:%S"))
duration_seconds = end_time - start_time
print("Duration:", datetime.timedelta(seconds=round(duration_seconds))) #shows days and removes the decimal seconds
print("Total Messages:", total_messages)
if total_messages > 0:
seconds_per_message = duration_seconds / total_messages
seconds_per_message = "{:.3f}".format(seconds_per_message)
print("Seconds per Message:", seconds_per_message)
if __name__ == '__main__':
main()
Be First to Comment