from logging import getLogger, Formatter, StreamHandler
from typing import Dict, List, Union, Pattern
from sys import stdout
import pygsheets
import re
[docs]class GoogleSheets:
sheet: 'pygsheets.Spreadsheet' = None
ws: 'pygsheets.Worksheet' = None
ws_range_format: Pattern = re.compile(r'(?P<worksheet>[a-zA-Z0-9_]+)!(?P<start_range>[A-Z0-9]+):(?P<end_range>[A-Z0-9]+)')
def __init__(self, drive_folder_id=None, logging_level: str='INFO', service_account_file: str=None, credentials=None) -> None:
"""Google Sheets class initializer"""
# Disable sub-logging from the `googleapiclient` discovery.py
getLogger('googleapiclient.discovery').setLevel('WARNING')
self.log = getLogger(__name__)
self.log.setLevel(logging_level)
formatter = Formatter('{message:<80} {asctime}:{levelname:<9} - {module}:{funcName}:{lineno}', "%H:%M:%S", '{')
stdout_handler = StreamHandler(stdout)
stdout_handler.setFormatter(formatter)
self.log.addHandler(stdout_handler)
self.folder = drive_folder_id
scopes = [
'https://www.googleapis.com/auth/spreadsheets',
'https://www.googleapis.com/auth/drive'
]
if service_account_file:
self.client = pygsheets.authorize(service_account_file=service_account_file, scopes=scopes)
elif credentials:
self.client = pygsheets.authorize(custom_credentials=credentials)
else:
self.client = pygsheets.authorize(credentials_directory=None, scopes=scopes)
[docs] def list_sheets(self) -> List[str]:
"""Lists the available Google Spreadsheets under `self.folder`
Returns:
A list of Google Spreadsheet titles
"""
if self.folder:
return self.client.spreadsheet_titles(query=f'"{self.folder}" in parents')
else:
return self.client.spreadsheet_titles()
[docs] def create_sheet(self, title: str) -> 'GoogleSheets':
"""Creates and activates a Google Spreadsheet within `drive_folder_id`
https://pygsheets.readthedocs.io/en/stable/reference.html#pygsheets.Client.create
Args:
title (str): Title of the spreadsheet to create
Returns:
A copy of the current object; this allows call chaining
"""
self.sheet = self.client.create(title, folder=self.folder)
return self
[docs] def delete_sheet(self, *, title: str=None, key: str=None, url: str=None, ignore_errors: bool=False) -> 'GoogleSheets':
"""Deletes the specified Google Spreadsheet by ID only
Args:
title (str, optional): The title of the spreadsheet to delete
key (str, optional): The key ID of the spreadsheet to delete
url (str, optional): The URL of the spreadsheet to delete
ignore_errors (bool, optional): Can optionally ignore any errors like missing spreadsheets
Returns:
A copy of the current object; this allows call chaining
Raises:
KeyError: If the specified Spreadsheet does not exist
"""
prev_sheet = self.sheet
try:
self.set_sheet(title=title, key=key, url=url)
self.sheet.delete()
self.sheet = prev_sheet
except (pygsheets.SpreadsheetNotFound, KeyError) as e:
self.log.info(f'Spreadsheet not found by {title if title else key if key else url}: {e}')
if not ignore_errors:
raise(KeyError('Spreadsheet not found'))
return self
[docs] def set_sheet(self, *, title: str=None, key: str=None, url: str=None) -> 'GoogleSheets':
"""Activates the current Google Spreadsheet that subsequent actions will be performed on
At least one parameter must be specified. If more that one is specified, then only one will be used
https://pygsheets.readthedocs.io/en/stable/reference.html#pygsheets.Client.open
Sets the worksheet to index: 0 by default
Args:
title (str, optional): The title of the spreadsheet to activate
key (str, optional): The key ID of the spreadsheet to activate
url (str, optional): The URL of the spreadsheet to activate
Returns:
A copy of the current object; this allows call chaining
Raises:
TypeError: If no keyword arguments are specified
KeyError: If the specified Spreadsheet does not exist
"""
if not any([title is not None, key is not None, url is not None]):
raise(ValueError('At least one of `title`, `key`, or `url` keyword arguments must be specified'))
try:
if title:
self.sheet = self.client.open(title)
elif key:
self.sheet = self.client.open_by_key(key)
elif url:
self.sheet = self.client.open_by_url(url)
except (pygsheets.SpreadsheetNotFound, IndexError) as e:
self.log.info(f'Spreadsheet not found by {title if title else key if key else url}: {e}')
raise(KeyError('Spreadsheet not found'))
self.set_ws(index=0)
return self
[docs] def set_or_create_sheet(self, title: str) -> 'GoogleSheets':
"""Activates the Google Spreadsheet by title if it exists, otherwise create it
Args:
title (str): The title of the Google Spreadsheet to set or create
Returns:
A copy of the current object; this allows call chaining
"""
if title in self.list_sheets():
self.set_sheet(title=title)
else:
self.create_sheet(title)
return self
[docs] def list_ws(self) -> List[pygsheets.Worksheet]:
"""Lists all worksheets in the active Google Spreadsheet
https://pygsheets.readthedocs.io/en/stable/spreadsheet.html#pygsheets.Spreadsheet.worksheets
Returns:
List of all pygsheets Worksheet objects in active Spreadsheet
Raises:
TypeError: If a Spreadsheet has not been activated
"""
if not self.sheet:
raise(TypeError('You must activate a Spreadsheet before listing worksheets'))
return self.sheet.worksheets()
[docs] def set_ws(self, *, title: str=None, index: int=None, ws_id: str=None) -> 'GoogleSheets':
"""Activates the worksheet that subsequent actions will be performed on
At least one parameter must be specified. If more that one is specified, then only one will be used
https://pygsheets.readthedocs.io/en/stable/spreadsheet.html#pygsheets.Spreadsheet.worksheet
Args:
title (str, optional): The title of the worksheet to activate
index (int, optional): The index of the worksheet to activate
ws_id (str, optional): The ID of the worksheet to activate
Returns:
A copy of the current object; this allows call chaining
Raises:
ValueError: If a Spreadsheet has not been activated
TypeError: If no keyword arguments are specified
KeyError: If the specified worksheet does not exist
"""
if not self.sheet:
raise(ValueError('You must activate a Spreadsheet before activating a worksheet'))
try:
if title is not None:
self.ws = self.sheet.worksheets('title', title, force_fetch=True)[0]
elif index is not None:
self.ws = self.sheet.worksheets('index', index, force_fetch=True)[0]
elif ws_id is not None:
self.ws = self.sheet.worksheets('id', ws_id, force_fetch=True)[0]
except pygsheets.WorksheetNotFound as e:
self.log.info(f'Worksheet not found by {title if title is not None else index if index is not None else ws_id}: {e}')
raise(KeyError('Worksheet not found'))
return self
[docs] def create_ws(self, title: str) -> 'GoogleSheets':
"""Creates and activates a worksheet within the active Google Spreadsheet,
https://pygsheets.readthedocs.io/en/stable/spreadsheet.html#pygsheets.Spreadsheet.add_worksheet
Args:
title (str): Title of the worksheet to create
Returns:
A copy of the current object; this allows call chaining
Raises:
TypeError: If a Spreadsheet has not been activated
"""
if not self.sheet:
raise(TypeError('You must activate a Spreadsheet before activating a worksheet'))
self.ws = self.sheet.add_worksheet(title)
return self
[docs] def set_or_create_ws(self, title: str) -> 'GoogleSheets':
"""Activates the worksheet by title if it exists, otherwise create it
Args:
title (str): The title of the worksheet to set or create
Returns:
A copy of the current object; this allows call chaining
Raises:
TypeError: If a Spreadsheet has not been activated
"""
if not self.sheet:
raise(TypeError('You must activate a Spreadsheet before activating/creating a worksheet'))
if title in [ws.title for ws in self.list_ws()]:
self.set_ws(title=title)
else:
self.create_ws(title)
return self
[docs] def delete_ws(self, ws_id: str) -> 'GoogleSheets':
"""Deletes the specified worksheet by ID only
Args:
ws_id (str): The worksheet ID to delete
Returns:
A copy of the current object; this allows call chaining
Raises:
TypeError: If a Spreadsheet has not been activated
KeyError: If the specified worksheet does not exist
"""
if not self.ws:
raise(TypeError('You must activate a Spreadsheet before deleting a worksheet'))
try:
if self.ws.id == ws_id:
self.sheet.del_worksheet(self.ws)
self.ws = None
else:
current_ws = self.ws
self.sheet.del_worksheet(self.set_ws(ws_id=ws_id).ws)
self.ws = current_ws
except pygsheets.WorksheetNotFound as e:
self.log.info(f'Worksheet not found by {ws_id}: {e}')
raise(KeyError('Worksheet not found'))
return self
[docs] def get_row(self, index: int) -> List:
"""Get all values in row `index` from the active worksheet
https://pygsheets.readthedocs.io/en/stable/worksheet.html#pygsheets.Worksheet.get_row
Args:
index (int): The integer index of the row to access
Returns:
List of values from row
Raises:
TypeError: If a worksheet has not been activated
"""
if not self.ws:
raise(TypeError('You must activate a worksheet before getting a row'))
return self.ws.get_row(index, include_tailing_empty=False)
[docs] def get_column(self, index: int) -> List:
"""Get all values in column `index` from the active worksheet
https://pygsheets.readthedocs.io/en/stable/worksheet.html#pygsheets.Worksheet.get_col
Args:
index (int): The integer index of the column to access
Returns:
List of values from column
Raises:
TypeError: If a worksheet has not been activated
"""
if not self.ws:
raise(TypeError('You must activate a worksheet before getting a column'))
return self.ws.get_col(index, include_tailing_empty=False)
def _last_dimension(self, dimension: str) -> int:
"""Provides the index of last non-empty row or column
Args:
dimension (str): String representation of whether to find the last row or column
Returns:
Integer row or column index of last non-empty row or column
"""
# Get all non-empty values from row or column 1, assume that the length == # of rows or columns
# (It's backwards, list the values in the opposite index to find length of desired index)
if dimension == 'ROWS':
return len(self.get_column(1))
elif dimension == 'COLUMNS':
return len(self.get_row(1))
else:
raise(ValueError('You must specify the dimension, either `ROWS` or `COLUMNS`'))
[docs] def update_row_by_index(self, values: List[List], row_offset: int, column_offset: int=1) -> bool:
"""Update values in a row based on the numerical index. (row_offset, column_offset) specifies the starting
point in the worksheet for where to start updating values. Will add additional rows to fit provided data
Args:
values (List of lists): Adds values in order at the specified `row_offset`
Additional lists in list will update the next row(s)
row_offset (int): Adds row at specified position in the worksheet
column_offset (int, optional): Starts the row at specified column, defaults to first column (far left)
Returns:
True on successful update; false otherwise
Raises:
TypeError: If a worksheet has not been activated
"""
if not self.ws:
raise(TypeError(f'You must activate a worksheet before updating row values'))
try:
if not isinstance(values[0], list):
values = [values]
self.ws.update_values(crange=(row_offset, column_offset), values=values, extend=True, majordim='ROWS')
return True
except Exception:
return False
[docs] def update_column_by_index(self, values: List[List], column_offset: int, row_offset: int=1) -> bool:
"""Update values in a column based on the numerical index. (row_offset, column_offset) specifies the starting
point in the worksheet for where to start updating values. Will add additional columns to fit provided data
Args:
values (List of lists): Adds values in order at the specified `column_offset`
Additional lists in list will update the next column(s)
column_offset (int): Adds column at specified position in the worksheet
row_offset (int, optional): Starts the column at specified row, defaults to first row (top)
Returns:
True on successful update; false otherwise
Raises:
TypeError: If a worksheet has not been activated
"""
if not self.ws:
raise(TypeError(f'You must activate a worksheet before updating column values'))
try:
if not isinstance(values[0], list):
values = [values]
self.ws.update_values(crange=(row_offset, column_offset), values=values, extend=True, majordim='COLUMNS')
return True
except Exception:
return False
[docs] def update_column_by_header(self, values: List[Dict], column_offset: int, header_column: int=1,
case_sensitive: bool=True) -> bool:
"""Update values in a column based on the specified header keys. Will add additional columns to fit provided data
.. seealso:: blabla
VersionAdded:
1.2
Args:
values (List of dicts): Adds values starting at the `column_offset` based on position
of matching key in `header_column`. Additional dicts in list will update the next column(s)
column_offset (int): Adds column at specified position in the worksheet
header_column (int, optional): Specifies which column to use for matching headers, defaults to the first column
case_sensitive (bool, optional): Specifies whether the header-key matching is case sensitive, it is by default
Returns:
bool: True on successful update; false otherwise
Raises:
TypeError: If a worksheet has not been activated
"""
if not self.ws:
raise(TypeError(f'You must activate a worksheet before updating column values'))
return self._update_dimension_by_header('COLUMNS', values, column_offset, header_column, case_sensitive)
[docs] def add_data_to_ws_rows(self, worksheet: str, data: List[Dict], preserve_blanks: bool=False) -> str:
"""Add data to an existing or new worksheet by header value
Args:
worksheet (str): The name of the worksheet to add to or create
data (List of dicts): Adds values to specified `worksheet`
preserve_blanks (bool, optional): Whether or not to preserve empty strings ('') when exporting data
Returns:
str: `<worksheet>!<starting range>:<ending range>`
"""
self.set_or_create_ws(worksheet)
height = len(self.get_column(1))
height = 2 if height == 0 else height + 1
if preserve_blanks:
for i, d in enumerate(data):
for k, v in d.items():
if v == '':
data[i][k] = '<blank>'
success = self.update_row_by_header(data, height)
if success:
end_range = self.format_addr((height + len(data) - 1, len(self.get_row(1))))
return f'{worksheet}!A{height}:{end_range}'
else:
return ''
[docs] def get_data_from_ws_range(self, ws_range: Union[str, Pattern]) -> List[Dict]:
"""Recursively retrieves data from other ranges in other worksheets of the same Spreadsheet
Args:
ws_range (str or Pattern): A `<worksheet>!<starting range>:<ending range>` formatted range string or a `re` regex
parsed object
Returns:
List of dicts with header values as the key
"""
if isinstance(ws_range, str):
ws_range = self.ws_range_format.match(ws_range)
if ws_range is None:
return []
previous_ws = self.ws.title
self.set_ws(title=ws_range.group('worksheet'))
starting_point = self.format_addr(ws_range.group('start_range'))
ending_point = self.format_addr(ws_range.group('end_range'))
headers = self.get_row(1)
range_list = []
for row in range(starting_point[0], ending_point[0] + 1):
row_list = self.get_row(row)
row_dict = {}
for column, cell in enumerate(row_list):
match = self.ws_range_format.match(cell)
if match is not None:
row_list[column] = self.get_data_from_ws_range(match)
else:
converted_cell = self._convert_cell_str_to_python(cell)
row_list[column] = converted_cell
if cell == '<blank>' or row_list[column] not in ['', [], {}, ()]:
row_dict[headers[column]] = row_list[column]
if row_dict:
range_list.append(row_dict)
self.set_ws(title=previous_ws)
return range_list
def _convert_cell_str_to_python(self, cell_str: str) -> object:
"""Takes a string retrieved from a worksheet cell and converts it to the matching python object
Args:
cell_str (str): The string value retrieved from a worksheet cell
Returns:
The converted python object, returns an empty string ('') if the value is blank
"""
python_obj = ''
if cell_str == 'TRUE':
python_obj = True
elif cell_str == 'FALSE':
python_obj = False
elif cell_str == 'None':
python_obj = None
elif cell_str == '<blank>':
python_obj = ''
elif cell_str and cell_str[0] == '[' and cell_str[-1] == ']':
cell_list = cell_str[1:][:-1].split(',')
if cell_list:
for index, item in enumerate(cell_list):
cell_list[index] = self._convert_cell_str_to_python(item)
# prune empty objects
cell_list = [item for item in cell_list if item not in ['', [], {}, ()]]
if cell_list:
python_obj = cell_list
else:
try:
python_obj = int(cell_str)
except (TypeError, ValueError):
python_obj = cell_str
return python_obj
def _update_dimension_by_header(self, dimension: str, values: List[Dict], dimension_offset: int, header_index: int,
case_sensitive: bool=True) -> bool:
"""Update values in either a row or a column
Args:
dimension (str): String representation of whether to update values in `ROWS` or `COLUMNS`
values (List of dicts): Adds values starting at the `dimension_offset` based on position of matching key
in `header_index`
Additional dicts in list will update the next dimension(s)
Missing headers will be added at the end of `dimension`
A value of `None` within the list will cause that dimension to be skipped
dimension_offset (int, optional): Adds dimension at specified position in the worksheet
header_index (int, optional): Specifies what dimension index to use as headers when updating values
case_sensitive (bool, optional): Specifies whether the header-key matching is case sensitive, it is by default
Returns:
True on successful update; false otherwise
"""
if dimension == 'ROWS':
headers = self.get_row(header_index)
update_range = (dimension_offset, 1)
elif dimension == 'COLUMNS':
headers = self.get_column(header_index)
update_range = (1, dimension_offset)
if not case_sensitive:
headers = [header.lower() for header in headers]
value_matrix = []
for item in values:
ordered_items = {}
if item is None:
value_matrix.append([])
continue
# Match keys to header index
for key, value in item.items():
try:
if not case_sensitive:
cell_index = headers.index(key.lower()) + 1
else:
cell_index = headers.index(key) + 1
except ValueError:
# Add missing header to `header_index` dimension
headers.append(key)
if dimension == 'ROWS':
if len(headers) > self.ws.cols:
self.add_column()
self.ws.update_value((header_index, len(headers)), key)
elif dimension == 'COLUMNS':
if len(headers) > self.ws.rows:
self.add_row()
self.ws.update_value((len(headers), header_index), key)
cell_index = len(headers)
# Save key for updating the headers later
ordered_items[cell_index] = str(value)
# Add blank values for any missing indices
for index in range(1, max(ordered_items)):
if index not in ordered_items:
ordered_items[index] = None
value_matrix.append([ordered_items[key] for key in sorted(ordered_items.keys())])
try:
self.ws.update_values(crange=update_range, values=value_matrix, extend=True, majordim=dimension)
return True
except Exception:
return False
[docs] def add_row(self, at_row: int=-1) -> 'GoogleSheets':
"""Adds a row to the active worksheet at position `at_row`
https://pygsheets.readthedocs.io/en/stable/worksheet.html#pygsheets.Worksheet.insert_rows
Args:
at_row (int, optional): Adds row at specified position in the worksheet, -1 (default) specifies end
of active rows
Returns:
A copy of the current object; this allows call chaining
Raises:
TypeError: If a worksheet has not been activated
"""
if not self.ws:
raise(TypeError('You must activate a worksheet before adding a row'))
if at_row < 0:
at_row = self.ws.rows
else:
at_row -= 1
self.ws.insert_rows(at_row, inherit=True)
return self
[docs] def add_column(self, at_column: int=-1) -> 'GoogleSheets':
"""Adds a columns to the active worksheet at position `at_column`
https://pygsheets.readthedocs.io/en/stable/worksheet.html#pygsheets.Worksheet.insert_cols
Args:
at_column (int, optional): Adds column at specified position in the worksheet, -1 (default) specifies end
of active columns
Returns:
A copy of the current object; this allows call chaining
Raises:
TypeError: If a worksheet has not been activated
"""
if not self.ws:
raise(TypeError('You must activate a worksheet before adding a column'))
if at_column < 0:
at_column = self.ws.cols
else:
at_column -= 1
self.ws.insert_cols(at_column, inherit=True)
return self
[docs] def find_cells(self, value, match_case: bool=True, match_entire_cell: bool=True) -> List[pygsheets.Cell]:
"""Finds all cells that contains `value` across all worksheets
https://pygsheets.readthedocs.io/en/stable/worksheet.html#pygsheets.Worksheet.find
Args:
value: Value to find in active worksheet (supports a compiled regular expression)
match_case (bool, optional): Whether or not match cells based on case. Default is case sensitive
match_entire_cell (bool, optional): Whether or not match the entire value of the cell. Default is full cell matching
Returns:
A list of all pygsheets Cell objects that contains `value`
https://pygsheets.readthedocs.io/en/stable/cell.html
"""
cells = []
for worksheet in self.list_ws():
cells += worksheet.find(value, matchCase=match_case, matchEntireCell=match_entire_cell)
return cells
[docs] def replace_value(self, find_value: str, replacement: str) -> 'GoogleSheets':
"""Replace the `find_value` with `replacement` across all worksheets. This performs only a replacement on
the specified `find_value`, leaving the rest of the cell contents intact.
https://pygsheets.readthedocs.io/en/stable/worksheet.html#pygsheets.Worksheet.find
Args:
find_value: The string or regex value to find within the worksheet
replacement (str): The string used to replace all found values
Returns:
A copy of the current object; this allows call chaining
"""
for worksheet in self.list_ws():
worksheet.replace(str(find_value), str(replacement))
return self