This script is designed to facilitate the copying of files and folders from one Google Drive location to another. The main functionality involves recursively copying folders and their contents, preserving the folder structure, from a specified source location (source_id) to a specified destination location (destination_id) within Google Drive.
The script provides progress updates in the log file, including the number of folders and files copied, any errors encountered, the start and end times, and the average time taken to copy each file.
The outcome of running this script is the successful duplication of files and folders, with detailed information logged in the “log.txt” file, making it a valuable tool for managing and tracking Google Drive data migration or organization tasks.
Prerequisites
To run the script successfully, you will need the following prerequisites:
- Create a new Google Project
- Enable the Drive API
- Create an OAuth consent screen
- Create OAuth 2.0 Client IDs – Desktop App
- Download the JSON file
- Setup Python
$ sudo apt install python3-pip
$ pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib
Further information can be found in the Python quickstart guide from Google.
In order to run the script, simply rename the JSON file you’ve downloaded to “credentials.json“, position it within the script’s directory and specify the source/destination folder IDs.
Script
import os
import time
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# Global counters for copied items
copied_folders = 0
copied_files = 0
not_copied_files = 0
total_copy_time = 0 # Total time spent copying files
# Define the Google Drive API scopes for accessing files and folders
# If modifying these scopes, delete the file token.json
SCOPES = ['https://www.googleapis.com/auth/drive']
TOKEN_PATH = 'token.json'
CREDENTIALS_PATH = 'credentials.json'
def get_credentials():
# Gets user credentials or prompts for authorization if needed
# Try to load existing credentials from 'token.json'
creds = Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES) if os.path.exists(TOKEN_PATH) else None
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
# Refresh the credentials if they have expired
creds.refresh(Request())
else:
# If no valid credentials exist, initiate the OAuth flow to obtain them
flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_PATH, SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for future runs
with open(TOKEN_PATH, 'w') as token_file:
token_file.write(creds.to_json())
return creds
def log_to_file(text, log_file):
# Print to console and write to a log file
print(text)
log_file.write(text + '\n')
def get_file_metadata(drive_service, file_id):
# Get file metadata by ID
return drive_service.files().get(fileId=file_id, fields='name').execute()
def copy_file(drive_service, source_file_id, destination_folder_id, log_file):
# Copy a file to a destination folder and calculate the copy time
global copied_files, not_copied_files, total_copy_time
source_file_metadata = get_file_metadata(drive_service, source_file_id)
source_file_name = source_file_metadata.get('name', 'Unknown File')
file_metadata = {
'name': source_file_name,
'parents': [destination_folder_id],
}
try:
start_time = time.time()
copied_file = drive_service.files().copy(fileId=source_file_id, body=file_metadata, supportsAllDrives=True).execute()
end_time = time.time()
copy_time = end_time - start_time
total_copy_time += copy_time
copied_files += 1
log_to_file(f'Copied Items: Folders {copied_folders} | Files {copied_files} | Errors {not_copied_files}', log_file)
return copied_file, copy_time
except Exception as e:
not_copied_files += 1
log_to_file(f'An error occurred while copying file: {str(e)}', log_file)
return None, 0
def create_folder(drive_service, name, parent_folder_id, log_file):
# Create a folder within a parent folder
global copied_folders
file_metadata = {
'name': name,
'parents': [parent_folder_id],
'mimeType': 'application/vnd.google-apps.folder',
}
try:
new_folder = drive_service.files().create(body=file_metadata, supportsAllDrives=True).execute()
copied_folders += 1
log_to_file(f'Copied Items: Folders {copied_folders} | Files {copied_files} | Errors {not_copied_files}', log_file)
return new_folder
except Exception as e:
log_to_file(f'An error occurred while creating a folder: {str(e)}', log_file)
return None
def calculate_average_copy_time():
# Calculate the average time it took to copy a file
if copied_files > 0:
average_time = total_copy_time / copied_files
return average_time
else:
return 0
def copy_folder_recursive(drive_service, source_folder_id, destination_folder_id, log_file):
# Recursively copy a folder and its contents to a destination folder
results = drive_service.files().list(
pageSize=1000, # The number of files returned in each API request
q=f"'{source_folder_id}' in parents",
fields="nextPageToken, files(id, name, mimeType)",
includeItemsFromAllDrives=True,
supportsAllDrives=True
).execute()
source_folder_metadata = get_file_metadata(drive_service, source_folder_id)
source_folder_name = source_folder_metadata['name']
new_folder = create_folder(drive_service, source_folder_name, destination_folder_id, log_file)
if new_folder:
for item in results.get('files', []):
if item['mimeType'] == 'application/vnd.google-apps.folder':
copy_folder_recursive(drive_service, item['id'], new_folder['id'], log_file)
else:
copy_file(drive_service, item['id'], new_folder['id'], log_file)
def main():
global start_time
start_time = time.time() # Record the start time
# Open a log file for writing
log_file = open('log.txt', 'w')
creds = get_credentials()
service = build('drive', 'v3', credentials=creds)
# Specify the source and destination folder IDs or shared drive IDs
source_id = 'Source Folder ID'
destination_id = 'Destination Folder ID'
# Copy the folder
copy_folder_recursive(service, source_id, destination_id, log_file)
end_time = time.time() # Record the end time
elapsed_time = end_time - start_time
# Calculate the average copy time
average_copy_time = calculate_average_copy_time()
# Convert the elapsed time to a human-readable format
elapsed_time_str = format_time(elapsed_time)
# Print summary information
log_to_file(f'Started at: {time.ctime(start_time)}', log_file)
log_to_file(f'Ended at: {time.ctime(end_time)}', log_file)
log_to_file(f'Total time: {elapsed_time_str}', log_file)
log_to_file(f'Avg. time per file: {average_copy_time:.2f} seconds', log_file)
# Close the log file
log_file.close()
def format_time(seconds):
# Format the time duration into a human-readable string
if seconds < 60:
return f'{seconds:.2f} seconds'
elif seconds < 3600:
minutes = seconds / 60
return f'{minutes:.2f} minutes'
else:
hours = seconds / 3600
return f'{hours:.2f} hours'
if __name__ == '__main__':
main()
Be First to Comment