Module base
Classes
class BaserowApi (database_url: str,
token: str | None = None,
token_path: str | None = None,
jwt_token: bool = False)-
Expand source code
class BaserowApi: """BaserowAPI class: A wrapper around the Baserow API.""" table_path = "api/database/rows/table" fields_path = "api/database/fields/table" def __init__( self, database_url: str, token: Optional[str] = None, token_path: Optional[str] = None, jwt_token: bool = False, ): """Initialize the BaserowApi class. This class is a wrapper around the Baserow API. Args: database_url (str): URL of the Baserow database. token (Optional[str], optional): Token-String for Baserow access. Defaults to None. token_path (Optional[str], optional): Path to file containing the Token-String. Defaults to None. jwt_token (bool, optional): Whether JWT-Token is used instead of Token-String. Defaults to False. """ self._database_url = database_url if token_path: with open(token_path) as tokenfile: self._token = tokenfile.readline().strip() elif token: self._token = token self._token_mode = "JWT" if jwt_token else "Token" def get_fields(self, table_id: int) -> list[dict]: """Get all fields / column specifications for a table. Args: table_id (int): ID of the table of interest. Returns: list[dict]: List of column specifications (dict of fields) """ get_fields_url = f"{self._database_url}/{self.fields_path}/{table_id}/" resp = requests.get( get_fields_url, headers={"Authorization": f"{self._token_mode} {self._token}"}, ) resp.raise_for_status() data = resp.json() return data def _get_rows_data( self, url: Optional[str] = None, table_id: Optional[int] = None, row_id: Optional[int] = None, user_field_names: bool = False, paginated: bool = False, include: Optional[list[str]] = None, exclude: Optional[list[str]] = None, ) -> dict: """Get rows data from a table. Args: url (str, optional): URL to lookup data. Defaults to None. table_id (int, optional): ID of table of interest. Defaults to None. row_id (int, optional): ID of entry of interest. Defaults to None. If provided, only the entry is returned. Else, all entries are returned. user_field_names (bool, optional): Whether to use field names or field IDs. Defaults to False. paginated (bool, optional): Whether to load multiple pages of data. Defaults to False. include (list[str], optional): List of fields to include in the response. Defaults to None (all fields). exclude (list[str], optional): List of fields to exclude from the response. Defaults to None (no fields excluded). Returns: str: JSON encoded data. """ query_params = [] if (not table_id and not url) or (table_id and url): raise RuntimeError( "Either table_id or url must be provided, " "but not both." ) if row_id and not table_id: raise RuntimeError("row_id can only be provided with table_id.") if row_id and paginated: warnings.warn("row_id is not paginated.") paginated = False if url: get_rows_url = url elif table_id: get_rows_url = f"{self._database_url}/{self.table_path}/{table_id}/" if row_id: get_rows_url += f"{row_id}/" if user_field_names: query_params += ["user_field_names=true"] else: raise RuntimeError("Either table_id or url must be provided.") if include: query_params += [f"include={','.join(include)}"] if exclude: query_params += [f"exclude={','.join(exclude)}"] get_rows_url = get_rows_url + ( "?" + "&".join(query_params) if query_params else "" ) print(f"Getting rows from {get_rows_url}") resp = requests.get( get_rows_url, headers={"Authorization": f"{self._token_mode} {self._token}"}, ) resp.raise_for_status() data = resp.json() # A: specific entry if row_id: # Specific entry return data # B: all entries else: if "results" not in data: raise RuntimeError( f"Could not get query result data from {get_rows_url}" ) results = data["results"] # All results (first page) if paginated: # Get all remaining pages if data["next"]: return results + self._get_rows_data( url=data["next"], paginated=paginated ) # If no pagination, return all results return results def _create_row( self, table_id: int, data: dict, user_field_names: bool = False ) -> int: """Create a row in a table. Args: table_id (int): ID of the table of interest. data (dict): Data to add to the table. user_field_names (bool, optional): Whether to use field names of field IDs. Defaults to False. Returns: int: Row ID. """ create_row_url = f"{self._database_url}/{self.table_path}/{table_id}/" if user_field_names: create_row_url += "?user_field_names=true" resp = requests.post( create_row_url, headers={ "Authorization": f"{self._token_mode} {self._token}", "Content-Type": "application/json", }, json=data, ) resp.raise_for_status() resp_data = resp.json() if "id" in resp_data: return resp_data["id"] else: raise RuntimeError(f"Malformed response {resp_data}") def _create_rows( self, table_id: int, datas: list[dict], user_field_names: bool = False ): create_rows_url = f"{self._database_url}/{self.table_path}/{table_id}/batch/" if user_field_names: create_rows_url += "?user_field_names=true" resp = requests.post( create_rows_url, headers={ "Authorization": f"{self._token_mode} {self._token}", "Content-Type": "application/json", }, json={"items": datas}, ) resp.raise_for_status() data = resp.json() ids = [e["id"] for e in data["items"]] return ids def _update_row( self, table_id: int, data: dict, user_field_names: bool = False ) -> None: """Update a row in a table. Args: table_id (int): ID of the table of interest. data (dict): Data to update (has to include the row ID as id). user_field_names (bool, optional): Whether to use field names or field IDs for the data keys. Defaults to False. Raises: RuntimeError: If the response is malformed. """ row_id = data.pop("id") update_row_url = f"{self._database_url}/{self.table_path}/{table_id}/{row_id}/" if user_field_names: update_row_url += "?user_field_names=true" resp = requests.patch( update_row_url, headers={ "Authorization": f"{self._token_mode} {self._token}", "Content-Type": "application/json", }, json=data, ) resp.raise_for_status() resp_data = resp.json() if "id" in resp_data: return resp_data["id"] else: raise RuntimeError(f"Malformed response {resp_data}") def _update_rows( self, table_id: int, datas: list[dict], user_field_names: bool = False ): update_rows_url = f"{self._database_url}/{self.table_path}/{table_id}/batch/" if user_field_names: update_rows_url += "?user_field_names=true" resp = requests.patch( update_rows_url, headers={ "Authorization": f"{self._token_mode} {self._token}", "Content-Type": "application/json", }, json={"items": datas}, ) resp.raise_for_status() data = resp.json() ids = [e["id"] for e in data["items"]] return ids def _delete_row(self, table_id: int, row_id: int): delete_row_url = f"{self._database_url}/{self.table_path}/{table_id}/{row_id}/" resp = requests.delete( delete_row_url, headers={"Authorization": f"{self._token_mode} {self._token}"}, ) resp.raise_for_status() def _convert_selects(self, data, fields): """ Convert the values in a dataset to their corresponding IDs based on field definitions. Example: data = {"status": "active", "tags": ["urgent", "important"]} fields = [ {"name": "status", "type": "single_select", "select_options": [{"value": "active", "id": 1}, {"value": "inactive", "id": 2}], "read_only": False}, {"name": "tags", "type": "multiple_select", "select_options": [{"value": "urgent", "id": 1}, {"value": "important", "id": 2}], "read_only": False} ] converted_data = self._convert_selects(data, fields) # converted_data would be {"status": 1, "tags": [1, 2]} """ data_conv = deepcopy(data) def convert_option(v, opts): """ Return the id of the option with value v. """ if isinstance(v, int): return v for opt in opts: if opt["value"] == v: return opt["id"] raise RuntimeError(f"Could not convert {v} to any of {opts}") for field in fields: if not field["read_only"] and field["name"] in data_conv: cur_value = data_conv[field["name"]] if cur_value is None or cur_value == []: continue if field["type"] == "single_select": data_conv[field["name"]] = convert_option( cur_value, field["select_options"] ) elif field["type"] == "multiple_select": new_value = [] for single_value in cur_value: conv_value = convert_option( single_value, field["select_options"] ) new_value.append(conv_value) data_conv[field["name"]] = new_value return data_conv def get_writable_fields(self, table_id: int) -> list[dict]: """Get all writable fields in a table. Args: table_id (int): ID of the table of interest. Returns: list[dict]: List of writable fields. """ fields = self.get_fields(table_id) writable_fields = [field for field in fields if not field["read_only"]] return writable_fields def get_data( self, table_id: int, writable_only: bool = True, user_field_names: bool = True, paginated: bool = True, include: Optional[list[str]] = None, exclude: Optional[list[str]] = None, ) -> dict[int, dict[str, Any]]: """Get all data from a table. Args: table_id (int): ID of the table of interest. writable_only (bool, optional): Only return fields which can be written to. This excludes all formula and computed fields. Defaults to True (only writable fields). user_field_names (bool, optional): Whether to reference columns by name or ID. Defaults to True (use names). paginated (bool, optional): Whether to load multiple pages of data. Defaults to True. include (list[str], optional): List of fields to include in the response. Defaults to None (all fields). exclude (list[str], optional): List of fields to exclude from the response. Defaults to None (no fields excluded). Returns: dict[int, dict[str, Any]]: dictionary of data in the table. """ if writable_only: fields = self.get_writable_fields(table_id) else: fields = self.get_fields(table_id) if user_field_names: names = {f["name"]: f for f in fields} else: names = {f'field_{f["id"]}': f for f in fields} data = self._get_rows_data( table_id=table_id, user_field_names=user_field_names, paginated=paginated, include=include, exclude=exclude, ) # Collect rows with their field names and values writable_data = { d["id"]: {k: _format_value(v, names[k]) for k, v in d.items() if k in names} for d in data } return writable_data def get_entry( self, table_id: int, row_id: int, linked: bool = False, seen_tables: Optional[list] = None, user_field_names: bool = True, include: Optional[list[str]] = None, exclude: Optional[list[str]] = None, ) -> dict: """Get a single entry from a table. Args: table_id (int): ID of the table of interest. row_id (int): Entry ID for the entry of interest. linked (bool, optional): Whether to fully hydrate the output with linked tables. Defaults to False (no data of linked tables is loaded). seen_tables (list, optional): List of already linked tables. These are not loaded again. Defaults to None. user_field_names (bool, optional): Whether to reference columns by name or ID. Defaults to True (use names). include (list[str], optional): List of fields to include in the response. Defaults to None (all fields). exclude (list[str], optional): List of fields to exclude from the response. Defaults to None (no fields excluded). Returns: dict: Entry data. """ data = self._get_rows_data( table_id=table_id, row_id=row_id, paginated=False, user_field_names=user_field_names, include=include, exclude=exclude, ) fields = self.get_fields(table_id) # If include or exclude are provided, filter the fields if include: fields = [f for f in fields if f["name"] in include] if exclude: fields = [f for f in fields if f["name"] not in exclude] names = {f["name"]: f for f in fields} names = names | {f'field_{f["id"]}': f for f in fields} formatted_data = { k: _format_value(v, names[k]) for k, v in data.items() if k in names } seen_tables_next = seen_tables or [] seen_tables_next.append(table_id) # fully hydrate with linked data # --> recursively get data from linked tables if linked: link_fields = [f for f in fields if f["type"] == "link_row"] for field in link_fields: linked_table_id = field["link_row_table_id"] if not seen_tables or linked_table_id not in seen_tables: if ids := data.get(field["name"]): formatted_data[field["name"]] = [ self.get_entry( linked_table_id, e_id["id"], linked=False, seen_tables=seen_tables_next, user_field_names=user_field_names, include=include, exclude=exclude, ) for e_id in ids ] return formatted_data def add_data( self, table_id: int, data: dict, row_id: Optional[int] = None, user_field_names: bool = True, ) -> int: """Add/Change data to a table. Args: table_id (int): Table ID. data (dict): Data to add/change. row_id (int, optional): Row ID where to enter the data. Defaults to None. user_field_names (bool, optional): Whether to reference columns by name or ID. Defaults to True. Returns: int: Row ID. """ fields = self.get_fields(table_id) data_conv = self._convert_selects(data, fields) if row_id: data_conv["id"] = row_id self._update_row(table_id, data_conv, user_field_names=user_field_names) else: row_id = self._create_row( table_id, data_conv, user_field_names=user_field_names ) return row_id def add_data_batch( self, table_id: int, entries: list[dict], user_field_names: bool = True, fail_on_error: bool = False, ) -> tuple[list, list]: """Add/Change data (multiple rows) to a table. Args: table_id (int): ID of the table of interest. entries (list[dict]): List of entries to add/change. user_field_names (bool, optional): Whether to use field names or field IDs. Defaults to True. fail_on_error (bool, optional): Whether to fail if error appears. Defaults to False. Returns: tuple[list, list]: List of touched IDs and list of errors. """ def process_entries(input_entries, batch_operation, single_operation): """Helper function to process entries for create or update.""" processed_ids = [] try: processed_ids += batch_operation( table_id, input_entries, user_field_names=user_field_names ) except requests.HTTPError as err: if err.response.status_code == 504: # Handle Gateway Timeout # Sleep for 60 seconds and retry warnings.warn( f"Gateway Timeout: {err.response.text}. Retrying after 60 " f"seconds with single operations." ) time.sleep(60) # Retry the batch operation for entry in input_entries: # Process each entry individually processed_ids.append( single_operation( table_id, entry, user_field_names=user_field_names, ) ) else: raise err return processed_ids # Split entries into new and update entries_update, entries_new, errors, touched_ids = [], [], [], [] for entry in entries: if entry.get("id") is not None: entries_update.append(entry) else: entries_new.append(entry) if entries_new: try: touched_ids += process_entries( entries_new, self._create_rows, self._create_row ) except requests.HTTPError as err: errors.append( f"{self._create_rows.__name__} rows ({len(entries_new)}):\ {err.response.text}" ) if entries_update: try: touched_ids += process_entries( entries_update, self._update_rows, self._update_row ) except requests.HTTPError as err: errors.append( f"{self._update_rows.__name__} rows ({len(entries_update)}):\ {err.response.text}" ) if errors and fail_on_error: raise RuntimeError(errors) else: return touched_ids, errors
BaserowAPI class: A wrapper around the Baserow API.
Initialize the BaserowApi class. This class is a wrapper around the Baserow API.
Args
database_url
:str
- URL of the Baserow database.
token
:Optional[str]
, optional- Token-String for Baserow access. Defaults to None.
token_path
:Optional[str]
, optional- Path to file containing the Token-String. Defaults to None.
jwt_token
:bool
, optional- Whether JWT-Token is used instead of Token-String. Defaults to False.
Class variables
var fields_path
-
The type of the None singleton.
var table_path
-
The type of the None singleton.
Methods
def add_data(self,
table_id: int,
data: dict,
row_id: int | None = None,
user_field_names: bool = True) ‑> int-
Expand source code
def add_data( self, table_id: int, data: dict, row_id: Optional[int] = None, user_field_names: bool = True, ) -> int: """Add/Change data to a table. Args: table_id (int): Table ID. data (dict): Data to add/change. row_id (int, optional): Row ID where to enter the data. Defaults to None. user_field_names (bool, optional): Whether to reference columns by name or ID. Defaults to True. Returns: int: Row ID. """ fields = self.get_fields(table_id) data_conv = self._convert_selects(data, fields) if row_id: data_conv["id"] = row_id self._update_row(table_id, data_conv, user_field_names=user_field_names) else: row_id = self._create_row( table_id, data_conv, user_field_names=user_field_names ) return row_id
Add/Change data to a table.
Args
table_id
:int
- Table ID.
data
:dict
- Data to add/change.
row_id
:int
, optional- Row ID where to enter the data. Defaults to None.
user_field_names
:bool
, optional- Whether to reference columns by name or ID. Defaults to True.
Returns
int
- Row ID.
def add_data_batch(self,
table_id: int,
entries: list[dict],
user_field_names: bool = True,
fail_on_error: bool = False) ‑> tuple[list, list]-
Expand source code
def add_data_batch( self, table_id: int, entries: list[dict], user_field_names: bool = True, fail_on_error: bool = False, ) -> tuple[list, list]: """Add/Change data (multiple rows) to a table. Args: table_id (int): ID of the table of interest. entries (list[dict]): List of entries to add/change. user_field_names (bool, optional): Whether to use field names or field IDs. Defaults to True. fail_on_error (bool, optional): Whether to fail if error appears. Defaults to False. Returns: tuple[list, list]: List of touched IDs and list of errors. """ def process_entries(input_entries, batch_operation, single_operation): """Helper function to process entries for create or update.""" processed_ids = [] try: processed_ids += batch_operation( table_id, input_entries, user_field_names=user_field_names ) except requests.HTTPError as err: if err.response.status_code == 504: # Handle Gateway Timeout # Sleep for 60 seconds and retry warnings.warn( f"Gateway Timeout: {err.response.text}. Retrying after 60 " f"seconds with single operations." ) time.sleep(60) # Retry the batch operation for entry in input_entries: # Process each entry individually processed_ids.append( single_operation( table_id, entry, user_field_names=user_field_names, ) ) else: raise err return processed_ids # Split entries into new and update entries_update, entries_new, errors, touched_ids = [], [], [], [] for entry in entries: if entry.get("id") is not None: entries_update.append(entry) else: entries_new.append(entry) if entries_new: try: touched_ids += process_entries( entries_new, self._create_rows, self._create_row ) except requests.HTTPError as err: errors.append( f"{self._create_rows.__name__} rows ({len(entries_new)}):\ {err.response.text}" ) if entries_update: try: touched_ids += process_entries( entries_update, self._update_rows, self._update_row ) except requests.HTTPError as err: errors.append( f"{self._update_rows.__name__} rows ({len(entries_update)}):\ {err.response.text}" ) if errors and fail_on_error: raise RuntimeError(errors) else: return touched_ids, errors
Add/Change data (multiple rows) to a table.
Args
table_id
:int
- ID of the table of interest.
entries
:list[dict]
- List of entries to add/change.
user_field_names
:bool
, optional- Whether to use field names or field IDs. Defaults to True.
fail_on_error
:bool
, optional- Whether to fail if error appears. Defaults to False.
Returns
tuple[list, list]
- List of touched IDs and list of errors.
def get_data(self,
table_id: int,
writable_only: bool = True,
user_field_names: bool = True,
paginated: bool = True,
include: list[str] | None = None,
exclude: list[str] | None = None) ‑> dict[int, dict[str, typing.Any]]-
Expand source code
def get_data( self, table_id: int, writable_only: bool = True, user_field_names: bool = True, paginated: bool = True, include: Optional[list[str]] = None, exclude: Optional[list[str]] = None, ) -> dict[int, dict[str, Any]]: """Get all data from a table. Args: table_id (int): ID of the table of interest. writable_only (bool, optional): Only return fields which can be written to. This excludes all formula and computed fields. Defaults to True (only writable fields). user_field_names (bool, optional): Whether to reference columns by name or ID. Defaults to True (use names). paginated (bool, optional): Whether to load multiple pages of data. Defaults to True. include (list[str], optional): List of fields to include in the response. Defaults to None (all fields). exclude (list[str], optional): List of fields to exclude from the response. Defaults to None (no fields excluded). Returns: dict[int, dict[str, Any]]: dictionary of data in the table. """ if writable_only: fields = self.get_writable_fields(table_id) else: fields = self.get_fields(table_id) if user_field_names: names = {f["name"]: f for f in fields} else: names = {f'field_{f["id"]}': f for f in fields} data = self._get_rows_data( table_id=table_id, user_field_names=user_field_names, paginated=paginated, include=include, exclude=exclude, ) # Collect rows with their field names and values writable_data = { d["id"]: {k: _format_value(v, names[k]) for k, v in d.items() if k in names} for d in data } return writable_data
Get all data from a table.
Args
table_id
:int
- ID of the table of interest.
writable_only
:bool
, optional- Only return fields which can be written to. This excludes all formula and computed fields. Defaults to True (only writable fields).
user_field_names
:bool
, optional- Whether to reference columns by name or ID. Defaults to True (use names).
paginated
:bool
, optional- Whether to load multiple pages of data. Defaults to True.
include
:list[str]
, optional- List of fields to include in the response. Defaults to None (all fields).
exclude
:list[str]
, optional- List of fields to exclude from the response. Defaults to None (no fields excluded).
Returns
dict[int, dict[str, Any]]
- dictionary of data in the table.
def get_entry(self,
table_id: int,
row_id: int,
linked: bool = False,
seen_tables: list | None = None,
user_field_names: bool = True,
include: list[str] | None = None,
exclude: list[str] | None = None) ‑> dict-
Expand source code
def get_entry( self, table_id: int, row_id: int, linked: bool = False, seen_tables: Optional[list] = None, user_field_names: bool = True, include: Optional[list[str]] = None, exclude: Optional[list[str]] = None, ) -> dict: """Get a single entry from a table. Args: table_id (int): ID of the table of interest. row_id (int): Entry ID for the entry of interest. linked (bool, optional): Whether to fully hydrate the output with linked tables. Defaults to False (no data of linked tables is loaded). seen_tables (list, optional): List of already linked tables. These are not loaded again. Defaults to None. user_field_names (bool, optional): Whether to reference columns by name or ID. Defaults to True (use names). include (list[str], optional): List of fields to include in the response. Defaults to None (all fields). exclude (list[str], optional): List of fields to exclude from the response. Defaults to None (no fields excluded). Returns: dict: Entry data. """ data = self._get_rows_data( table_id=table_id, row_id=row_id, paginated=False, user_field_names=user_field_names, include=include, exclude=exclude, ) fields = self.get_fields(table_id) # If include or exclude are provided, filter the fields if include: fields = [f for f in fields if f["name"] in include] if exclude: fields = [f for f in fields if f["name"] not in exclude] names = {f["name"]: f for f in fields} names = names | {f'field_{f["id"]}': f for f in fields} formatted_data = { k: _format_value(v, names[k]) for k, v in data.items() if k in names } seen_tables_next = seen_tables or [] seen_tables_next.append(table_id) # fully hydrate with linked data # --> recursively get data from linked tables if linked: link_fields = [f for f in fields if f["type"] == "link_row"] for field in link_fields: linked_table_id = field["link_row_table_id"] if not seen_tables or linked_table_id not in seen_tables: if ids := data.get(field["name"]): formatted_data[field["name"]] = [ self.get_entry( linked_table_id, e_id["id"], linked=False, seen_tables=seen_tables_next, user_field_names=user_field_names, include=include, exclude=exclude, ) for e_id in ids ] return formatted_data
Get a single entry from a table.
Args
table_id
:int
- ID of the table of interest.
row_id
:int
- Entry ID for the entry of interest.
linked
:bool
, optional- Whether to fully hydrate the output with linked tables. Defaults to False (no data of linked tables is loaded).
seen_tables
:list
, optional- List of already linked tables. These are not loaded again. Defaults to None.
user_field_names
:bool
, optional- Whether to reference columns by name or ID. Defaults to True (use names).
include
:list[str]
, optional- List of fields to include in the response. Defaults to None (all fields).
exclude
:list[str]
, optional- List of fields to exclude from the response. Defaults to None (no fields excluded).
Returns
dict
- Entry data.
def get_fields(self, table_id: int) ‑> list[dict]
-
Expand source code
def get_fields(self, table_id: int) -> list[dict]: """Get all fields / column specifications for a table. Args: table_id (int): ID of the table of interest. Returns: list[dict]: List of column specifications (dict of fields) """ get_fields_url = f"{self._database_url}/{self.fields_path}/{table_id}/" resp = requests.get( get_fields_url, headers={"Authorization": f"{self._token_mode} {self._token}"}, ) resp.raise_for_status() data = resp.json() return data
Get all fields / column specifications for a table.
Args
table_id
:int
- ID of the table of interest.
Returns
list[dict]
- List of column specifications (dict of fields)
def get_writable_fields(self, table_id: int) ‑> list[dict]
-
Expand source code
def get_writable_fields(self, table_id: int) -> list[dict]: """Get all writable fields in a table. Args: table_id (int): ID of the table of interest. Returns: list[dict]: List of writable fields. """ fields = self.get_fields(table_id) writable_fields = [field for field in fields if not field["read_only"]] return writable_fields
Get all writable fields in a table.
Args
table_id
:int
- ID of the table of interest.
Returns
list[dict]
- List of writable fields.