"""
Module for everything related to storage.
"""
from __future__ import annotations
import hashlib
import json
import logging
from pathlib import Path
from typing import Any, Callable, Dict
import pandas as pd
import requests
import keyring
from pytupli.schema import (
ArtifactMetadata,
ArtifactMetadataItem,
BaseFilter,
Benchmark,
BenchmarkHeader,
BenchmarkQuery,
User,
UserOut,
UserRole,
Episode,
EpisodeHeader,
EpisodeItem,
FilterType,
)
# Set up logger
logger = logging.getLogger(__name__)
[docs]
class TupliStorageError(Exception):
"""
Exception raised for errors in the storage operations.
"""
pass
[docs]
class TupliStorage:
"""
Base class for storing StorableObjects.
"""
def __init__(self):
raise NotImplementedError
[docs]
def store_benchmark(self, benchmark_query: BenchmarkQuery) -> BenchmarkHeader:
"""
Saves the serialized object to the specified storage.
Args:
benchmark_query (BenchmarkQuery): The serialized benchmark object to be saved.
Returns:
BenchmarkHeader: The header of the saved benchmark.
"""
raise NotImplementedError
[docs]
def load_benchmark(
self,
uri: str,
) -> Benchmark:
"""
Loads data from the specified URI.
Args:
uri (str): The URI of the data to be loaded.
Returns:
Benchmark: The loaded benchmark object.
"""
raise NotImplementedError
[docs]
def list_benchmarks(self, filter: BaseFilter) -> list[BenchmarkHeader]:
"""
Lists all benchmarks in the storage that match the specified filter.
Args:
filter (BaseFilter): The filter to apply when listing benchmarks.
Returns:
list[BenchmarkHeader]: A list of benchmark headers that match the filter.
"""
raise NotImplementedError
[docs]
def delete_benchmark(self, uri: str) -> None:
"""
Deletes the specified benchmark from the storage.
Args:
uri (str): The URI/ID of the benchmark to delete.
"""
raise NotImplementedError
[docs]
def store_artifact(self, artifact: bytes, metadata: ArtifactMetadata) -> ArtifactMetadataItem:
"""
Stores the artifact in the storage.
Args:
artifact (bytes): The artifact to store.
metadata (ArtifactMetadata): Metadata for the artifact.
Returns:
ArtifactMetadataItem: Metadata item for the stored artifact.
"""
raise NotImplementedError
[docs]
def load_artifact(self, uri: str, **kwargs) -> pd.DataFrame:
"""
Loads the artifact from the storage.
Args:
uri (str): The URI/ID of the artifact to load.
kwargs: Additional arguments for loading the artifact.
"""
raise NotImplementedError
[docs]
def list_artifacts(self, filter: BaseFilter) -> list[ArtifactMetadataItem]:
"""
Lists all artifacts in the storage that match the specified filter.
Args:
filter (BaseFilter): The filter to apply when listing artifacts.
Returns:
list[ArtifactMetadataItem]: A list of artifacts that match the filter.
"""
raise NotImplementedError
[docs]
def delete_artifact(self, uri: str) -> None:
"""
Deletes the specified artifact from the storage.
Args:
uri (str): The URI/ID of the artifact to delete.
"""
raise NotImplementedError
[docs]
def record_episode(self, episode: Episode) -> EpisodeHeader:
"""
Records an episode in the storage.
Args:
episode (Episode): The episode to record.
Returns:
EpisodeHeader: The header of the recorded episode.
"""
raise NotImplementedError
[docs]
def publish_episode(self, uri: str) -> None:
"""
Publishes the specified episode in the storage.
Args:
uri (str): The URI/ID of the episode to publish.
"""
raise NotImplementedError
[docs]
def list_episodes(
self, filter: BaseFilter = None, include_tuples: bool = False
) -> list[EpisodeHeader] | list[EpisodeItem]:
"""
Lists all episodes in the storage that match the specified filter.
Args:
filter (BaseFilter, optional): The filter to apply when listing episodes.
include_tuples (bool, optional): Whether to include tuples in the episode data.
Returns:
list[EpisodeHeader] | list[EpisodeItem]: A list of episode headers or full episode items with tuples.
"""
raise NotImplementedError
[docs]
def delete_episode(self, uri: str) -> None:
"""
Deletes the specified episode from the storage.
Args:
uri (str): The URI/ID of the episode to delete.
"""
raise NotImplementedError
[docs]
class TupliAPIClient(TupliStorage):
"""
Class for storing StorableObjects in the API.
This class provides methods for interacting with the Tupli API, including user management,
benchmark operations, artifact handling, and episode management.
Methods:
User Management:
signup(username: str, password: str) -> User
Creates a new user account.
login(username: str, password: str, url: str | None = None) -> None
Authenticates with the API and stores access tokens.
list_users() -> list[User]
Lists all users.
list_roles() -> list[UserRole]
Lists all available user roles.
change_password(username: str, new_password: str) -> None
Changes a user's password.
change_roles(username: str, roles: list[str]) -> None
Changes a user's roles.
delete_user(username: str) -> None
Deletes a user and their content.
Benchmark Operations:
store_benchmark(benchmark_query: BenchmarkQuery) -> BenchmarkHeader
Saves a benchmark to the API.
load_benchmark(uri: str) -> Benchmark
Loads a benchmark from the API.
list_benchmarks(filter: BaseFilter = None) -> list[BenchmarkHeader]
Lists benchmarks matching the filter.
delete_benchmark(uri: str) -> None
Deletes a benchmark.
publish_benchmark(uri: str) -> None
Publishes a benchmark.
Artifact Operations:
store_artifact(artifact: bytes, metadata: ArtifactMetadata) -> ArtifactMetadataItem
Stores an artifact in the API.
load_artifact(uri: str, **kwargs) -> bytes
Loads an artifact from the API.
list_artifacts(filter: BaseFilter = None) -> list[ArtifactMetadataItem]
Lists artifacts matching the filter.
delete_artifact(uri: str) -> None
Deletes an artifact.
publish_artifact(uri: str) -> None
Publishes an artifact.
Episode Operations:
record_episode(episode: Episode) -> EpisodeHeader
Records an episode in the API.
publish_episode(uri: str) -> None
Publishes an episode.
list_episodes(filter: BaseFilter = None, include_tuples: bool = False) -> list[EpisodeHeader] | list[EpisodeItem]
Lists episodes matching the filter.
delete_episode(uri: str) -> None
Deletes an episode.
Configuration:
set_url(url: str) -> None
Sets the base URL for the API.
"""
def __init__(self) -> TupliAPIClient:
self.base_url = keyring.get_password('pytupli', 'base_url')
if not self.base_url:
self.base_url = 'http://localhost:8080'
[docs]
def _get_bearer_token(self) -> dict:
"""
Gets the bearer token for API requests.
First tries to use the stored access token, and if that fails,
tries to refresh the token.
Returns:
dict: Headers with the bearer token.
"""
# Try to get the stored access token
access_token = keyring.get_password('pytupli', 'access_token')
if access_token:
# First try to use the existing token
return {'Authorization': f'Bearer {access_token}'}
# If no access token stored, refresh token
return self._refresh_token()
[docs]
def _refresh_token(self) -> dict:
"""
Refreshes the access token using the stored refresh token.
Returns:
dict: Headers with the refreshed bearer token.
Raises:
TupliStorageError: If the refresh token is not available or the refresh fails.
"""
refresh_token = keyring.get_password('pytupli', 'refresh_token')
if not refresh_token:
raise TupliStorageError('No refresh token available. Please login first.')
try:
response = requests.post(
f'{self.base_url}/access/refresh-token',
headers={'Authorization': f'Bearer {refresh_token}'},
)
response.raise_for_status()
new_access_token = response.json()['access_token']
# Store the new token
keyring.set_password('pytupli', 'access_token', new_access_token)
return {'Authorization': f'Bearer {new_access_token}'}
except Exception as e:
# If refresh fails, both tokens might be invalid
keyring.delete_password('pytupli', 'access_token')
keyring.delete_password('pytupli', 'refresh_token')
raise TupliStorageError(f'Token refresh failed: {str(e)}. Please login again.')
[docs]
def _authenticated_request(self, method, url, **kwargs) -> requests.Response:
"""
Executes an authenticated request to the API.
Handles token refresh if the access token is expired.
Args:
method (str): HTTP method (get, post, put, delete)
url (str): URL for the request
**kwargs: Additional arguments for the request
Returns:
Response: The response from the request
Raises:
TupliStorageError: If the request fails or the token refresh fails.
"""
# First try with current access token
try:
headers = self._get_bearer_token()
if 'headers' in kwargs:
kwargs['headers'].update(headers)
else:
kwargs['headers'] = headers
response = getattr(requests, method.lower())(url, **kwargs)
response.raise_for_status()
return response
except requests.HTTPError as e:
if e.response.status_code == 401: # Unauthorized, token might be expired
# Try to refresh token and retry
headers = self._refresh_token()
if 'headers' in kwargs:
kwargs['headers'].update(headers)
else:
kwargs['headers'] = headers
response = getattr(requests, method.lower())(url, **kwargs)
response.raise_for_status()
return response
raise TupliStorageError(f'API request failed: {str(e)}')
except Exception as e:
raise TupliStorageError(f'Request failed: {str(e)}')
[docs]
def set_url(self, url: str) -> None:
"""
Sets the base URL for the API.
Args:
url (str): The base URL for the API.
"""
self.base_url = url
keyring.set_password('pytupli', 'base_url', url)
# User management methods
[docs]
def signup(self, username: str, password: str) -> User:
"""
Creates a new user account.
Args:
username (str): The username for the new account
password (str): The password for the new account
Returns:
User: The created user object
"""
try: # first try authenticated request
response = self._authenticated_request(
'post',
f'{self.base_url}/access/signup',
json={'username': username, 'password': password},
)
return UserOut(**response.json())
except TupliStorageError: # if that fails, try unauthenticated request
try:
response = requests.post(
f'{self.base_url}/access/signup',
json={'username': username, 'password': password},
)
response.raise_for_status()
return UserOut(**response.json())
except requests.HTTPError as e:
raise TupliStorageError(f'Signup failed: {str(e)}')
[docs]
def login(self, username: str, password: str, url: str | None = None) -> None:
"""
Authenticates with the API and stores the access and refresh tokens.
Args:
username (str): The username for the API.
password (str): The password for the API.
url (str, optional): The base URL for the API.
If not provided, uses the stored URL.
If provided, it will be set as the new base URL.
"""
if url:
self.set_url(url)
try:
response = requests.post(
f'{self.base_url}/access/token',
json={'username': username, 'password': password},
)
response.raise_for_status()
except requests.HTTPError as e:
raise TupliStorageError(f'Login failed: {str(e)}')
data = response.json()
access_token = data['access_token']['token']
refresh_token = data['refresh_token']['token']
# Store tokens in keyring
keyring.set_password('pytupli', 'access_token', access_token)
keyring.set_password('pytupli', 'refresh_token', refresh_token)
[docs]
def list_users(self) -> list[User]:
"""
Lists all users.
Returns:
list[User]: A list of all users
"""
response = self._authenticated_request('get', f'{self.base_url}/access/list-users')
return [UserOut(**user) for user in response.json()]
[docs]
def list_roles(self) -> list[UserRole]:
"""
Lists all user roles.
Returns:
list[UserRole]: A list of all user roles
"""
response = self._authenticated_request('get', f'{self.base_url}/access/list-roles')
return [UserRole(**role) for role in response.json()]
[docs]
def change_password(self, username: str, new_password: str) -> None:
"""
Changes a user's password.
Args:
username (str): The username of the account to change
new_password (str): The new password
"""
_ = self._authenticated_request(
'put',
f'{self.base_url}/access/change-password',
json={'username': username, 'password': new_password},
)
[docs]
def change_roles(self, username: str, roles: list[str]) -> None:
"""
Changes a user's roles.
Args:
username (str): The username of the account to change
roles (list[str]): The list of new roles
"""
_ = self._authenticated_request(
'put',
f'{self.base_url}/access/change-roles',
json={'username': username, 'roles': roles},
)
[docs]
def delete_user(self, username: str) -> None:
"""
Deletes a user and all their content.
Args:
username (str): The username of the account to delete
"""
self._authenticated_request(
'delete', f'{self.base_url}/access/delete-user', params={'username': username}
)
[docs]
def store_benchmark(self, benchmark_query: BenchmarkQuery) -> BenchmarkHeader:
"""
Saves the serialized object to the API.
Args:
benchmark_query (BenchmarkQuery): The serialized benchmark object to be saved as well as some metadata.
Returns:
BenchmarkHeader: The header of the saved benchmark.
"""
response = self._authenticated_request(
'post', f'{self.base_url}/benchmarks/create', json=benchmark_query.model_dump()
)
return BenchmarkHeader(**response.json())
[docs]
def load_benchmark(self, uri: str) -> Benchmark:
"""
Loads the serialized benchmark from the API.
Args:
uri (str): hash of the object to be loaded.
Returns:
Benchmark: The loaded benchmark object.
"""
response = self._authenticated_request(
'get', f'{self.base_url}/benchmarks/load?benchmark_id={uri}'
)
return Benchmark(**response.json())
[docs]
def store_artifact(self, artifact: bytes, metadata: ArtifactMetadata) -> ArtifactMetadataItem:
"""
Stores the artifact in the API.
Args:
artifact (bytes): The artifact to store.
metadata (dict, optional): Metadata for the artifact.
Returns:
ArtifactMetadataItem: Metadata item for the stored artifact.
"""
response = self._authenticated_request(
'post',
f'{self.base_url}/artifacts/upload',
files={'data': artifact},
data={'metadata': metadata.model_dump_json(serialize_as_any=True)},
)
return ArtifactMetadataItem(**response.json())
[docs]
def load_artifact(self, uri: str, **kwargs) -> bytes:
"""
Load artifact from the API.
Args:
uri (str): hash of the object to be loaded.
Returns:
bytes: The raw artifact data
"""
response = self._authenticated_request(
'get', f'{self.base_url}/artifacts/download?artifact_id={uri}'
)
return response.content
[docs]
def publish_benchmark(self, uri: str) -> None:
"""
Publishes the benchmark in the API.
Args:
uri (str): The hash of the benchmark to be published.
"""
self._authenticated_request('put', f'{self.base_url}/benchmarks/publish?benchmark_id={uri}')
[docs]
def delete_benchmark(self, uri: str) -> None:
"""
Deletes the specified object from the API.
Args:
uri (str): The hash of the object to be deleted.
"""
self._authenticated_request(
'delete', f'{self.base_url}/benchmarks/delete?benchmark_id={uri}'
)
[docs]
def delete_artifact(self, uri: str) -> None:
"""
Deletes the specified artifact from the API.
Args:
uri (str): The hash of the artifact to be deleted.
"""
self._authenticated_request('delete', f'{self.base_url}/artifacts/delete?artifact_id={uri}')
[docs]
def publish_artifact(self, uri: str) -> None:
"""
Publishes the artifact in the API.
Args:
uri (str): The hash of the artifact to be published.
"""
self._authenticated_request('put', f'{self.base_url}/artifacts/publish?artifact_id={uri}')
[docs]
def list_benchmarks(self, filter: BaseFilter = None) -> list[BenchmarkHeader]:
"""
Lists all benchmarks in the storage that match the specified filter.
Args:
filter (BaseFilter, optional): The filter to apply when listing benchmarks.
Returns:
list[BenchmarkHeader]: A list of benchmark headers that match the filter.
"""
params = {'filter': filter.model_dump_json()} if filter else {}
response = self._authenticated_request(
'get', f'{self.base_url}/benchmarks/list', params=params
)
return [BenchmarkHeader(**benchmark) for benchmark in response.json()]
[docs]
def list_artifacts(self, filter: BaseFilter = None) -> list[ArtifactMetadataItem]:
"""
Lists all artifacts in the storage that match the specified filter.
Args:
filter (BaseFilter, optional): The filter to apply when listing artifacts.
Returns:
list[ArtifactMetadataItem]: A list of artifacts that match the filter.
"""
params = {'filter': filter.model_dump_json()} if filter else {}
response = self._authenticated_request(
'get', f'{self.base_url}/artifacts/list', params=params
)
return [ArtifactMetadataItem(**artifact) for artifact in response.json()]
# Episode-related methods
[docs]
def record_episode(self, episode: Episode) -> EpisodeHeader:
"""
Records an episode in the API.
Args:
episode (Episode): The episode to record.
Returns:
EpisodeHeader: The header of the recorded episode.
"""
response = self._authenticated_request(
'post',
f'{self.base_url}/episodes/record',
json=episode.model_dump(),
)
episode_data = response.json()
return EpisodeHeader(**episode_data)
[docs]
def publish_episode(self, uri: str) -> None:
"""
Publishes an episode in the API.
Args:
uri (str): The ID of the episode to publish.
"""
self._authenticated_request(
'put',
f'{self.base_url}/episodes/publish?episode_id={uri}',
)
[docs]
def list_episodes(
self, filter: BaseFilter = None, include_tuples: bool = False
) -> list[EpisodeHeader] | list[EpisodeItem]:
"""
Lists all episodes in the API that match the specified filter.
Args:
filter (BaseFilter, optional): The filter to apply when listing episodes.
include_tuples (bool, optional): Whether to include tuples in the episode data.
Returns:
list[EpisodeHeader] | list[EpisodeItem]: A list of episode headers or full episode items.
"""
params = {}
if filter:
params['filter'] = filter.model_dump_json()
params['include_tuples'] = str(include_tuples).lower()
response = self._authenticated_request(
'get',
f'{self.base_url}/episodes/list',
params=params,
)
if include_tuples:
return [EpisodeItem(**episode) for episode in response.json()]
else:
return [EpisodeHeader(**episode) for episode in response.json()]
[docs]
def delete_episode(self, uri: str) -> None:
"""
Deletes an episode from the API.
Args:
uri (str): The ID of the episode to delete.
"""
self._authenticated_request(
'delete',
f'{self.base_url}/episodes/delete?episode_id={uri}',
)
[docs]
class FileStorage(TupliStorage):
"""
Storage class for saving and loading benchmarks to/from files.
"""
def __init__(
self,
storage_base_dir: str = '_local_storage',
) -> FileStorage:
"""
Initializes the FileStorage with the specified base directory.
Args:
storage_base_dir (str): The base directory for storage.
Returns:
FileStorage: The initialized FileStorage object.
Raises:
TupliStorageError: If the base directory cannot be created.
"""
self.storage_dir = Path(storage_base_dir)
# Create base storage directory if it doesn't exist
try:
self.storage_dir.mkdir(exist_ok=True)
except Exception as e:
raise TupliStorageError(f'Failed to create storage directory: {str(e)}')
[docs]
def store_benchmark(self, benchmark_query: BenchmarkQuery) -> BenchmarkHeader:
"""
Saves the benchmark object to a file.
Args:
benchmark_query (BenchmarkQuery): The benchmark query to be saved.
Returns:
BenchmarkHeader: The header of the saved benchmark.
Raises:
TupliStorageError: If the benchmark cannot be saved or serialized.
"""
# Create a directory if it doesn't exist
directory = self.storage_dir / 'benchmarks'
try:
directory.mkdir(exist_ok=True, parents=True)
except Exception as e:
raise TupliStorageError(f'Failed to create benchmarks directory: {str(e)}')
# Create a benchmark from the query
benchmark = Benchmark.create_new(**benchmark_query.model_dump(), created_by='local_storage')
# Check if benchmark with the same ID already exists
for existing_file in directory.glob('*.json'):
try:
with open(existing_file, 'r', encoding='UTF-8') as f:
existing_benchmark = json.loads(f.read())
if existing_benchmark['hash'] == benchmark.hash:
logger.info('Benchmark with hash %s already exists', benchmark.hash)
return BenchmarkHeader(**existing_benchmark)
except Exception as e:
logger.warning(f'Error reading benchmark file {existing_file}: {str(e)}')
continue
# Create filename based on benchmark ID
file_name = f'{benchmark.id}.json'
file_path = directory / file_name
if file_path.exists():
raise TupliStorageError(
f'The file {file_path} already exists and will not be overwritten.'
)
# Serialize the benchmark to JSON
try:
serialized_object = json.dumps(benchmark.model_dump(), indent=2)
except Exception as e:
raise TupliStorageError(f'Failed to serialize benchmark: {str(e)}')
try:
with open(file_path, 'w', encoding='UTF-8') as f:
f.write(serialized_object)
# Check if the file was saved correctly
if not file_path.exists():
raise TupliStorageError(f'Failed to save benchmark to {file_path}')
else:
logger.info('Saved benchmark to %s', file_path)
except Exception as e:
raise TupliStorageError(f'Failed to write benchmark to file: {str(e)}')
# Return the benchmark header
return BenchmarkHeader(**benchmark.model_dump())
[docs]
def load_benchmark(self, uri: str) -> Benchmark:
"""
Loads a benchmark from the file using the benchmark ID.
Args:
uri (str): The ID of the benchmark to be loaded.
Returns:
Benchmark: The loaded benchmark object.
Raises:
TupliStorageError: If the benchmark cannot be loaded or parsed.
"""
# Construct the file path from the benchmark ID
file_path = self.storage_dir / 'benchmarks' / f'{uri}.json'
if not file_path.exists():
raise TupliStorageError(f'Benchmark with ID {uri} does not exist.')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
benchmark_dict = json.loads(f.read())
except json.JSONDecodeError as e:
raise TupliStorageError(f'Failed to parse JSON from benchmark {uri}: {str(e)}')
except Exception as e:
raise TupliStorageError(f'Failed to read benchmark file for {uri}: {str(e)}')
try:
# Create and return a Benchmark object from the loaded JSON
return Benchmark(**benchmark_dict)
except Exception as e:
raise TupliStorageError(f'Invalid benchmark data for {uri}: {str(e)}')
# Helper methods for artifacts
[docs]
def store_artifact(self, artifact: bytes, metadata: ArtifactMetadata) -> ArtifactMetadataItem:
"""
Stores an artifact as a file and returns its metadata.
Args:
artifact (bytes): The artifact data to store.
metadata (ArtifactMetadata): Metadata for the artifact.
Returns:
ArtifactMetadataItem: Metadata for the stored artifact.
Raises:
TupliStorageError: If the artifact cannot be stored or metadata cannot be serialized.
"""
# Create a directory for artifacts if it doesn't exist
data_dir = self.storage_dir / 'artifacts'
try:
data_dir.mkdir(exist_ok=True, parents=True)
except Exception as e:
raise TupliStorageError(f'Failed to create artifacts directory: {str(e)}')
# Generate hash for the artifact
artifact_hash = hashlib.sha256(artifact).hexdigest()
# Check if artifact with the same hash already exists
for metadata_path in data_dir.glob('*.metadata.json'):
try:
with open(metadata_path, 'r', encoding='UTF-8') as f:
existing_metadata = json.loads(f.read())
if existing_metadata.get('hash') == artifact_hash:
logger.info('Artifact with hash %s already exists', artifact_hash)
return ArtifactMetadataItem(**existing_metadata)
except Exception as e:
logger.warning(f'Error reading metadata file {metadata_path}: {str(e)}')
continue
metadata_item = ArtifactMetadataItem.create_new(
hash=artifact_hash,
created_by='local_storage',
**metadata.model_dump(),
)
# Create file path using the artifact ID
file_path = data_dir / f'{metadata_item.id}'
if file_path.exists():
raise TupliStorageError(
f'The file {file_path} already exists and will not be overwritten.'
)
try:
# Write the artifact data to the file
with open(file_path, 'wb') as f:
f.write(artifact)
# Store the metadata
metadata_path = file_path.with_suffix('.metadata.json')
with open(metadata_path, 'w', encoding='UTF-8') as f:
json.dump(metadata_item.model_dump(serialize_as_any=True), f, indent=2)
logger.info('Stored artifact to %s with metadata', file_path)
return metadata_item
except Exception as e:
raise TupliStorageError(f'Failed to store artifact: {str(e)}')
[docs]
def load_artifact(self, uri: str, **kwargs) -> bytes:
"""
Loads an artifact from a file.
Args:
uri (str): The ID of the artifact to load.
kwargs: Additional arguments (ignored in file storage)
Returns:
bytes: The raw artifact data
Raises:
TupliStorageError: If the artifact cannot be loaded.
"""
# Construct file path from artifact ID
file_path = self.storage_dir / 'artifacts' / uri
if not file_path.exists():
raise TupliStorageError(f'Artifact with ID {uri} does not exist.')
try:
# Read the file as bytes
with open(file_path, 'rb') as f:
data = f.read()
return data
except Exception as e:
raise TupliStorageError(f'Failed to load artifact {uri}: {str(e)}')
[docs]
def convert_filter_to_function(
self, filter_obj: BaseFilter
) -> Callable[[Dict[str, Any]], bool]:
"""
Convert a BaseFilter object to a filter function that can be applied to dictionaries.
Supports nested dictionary access with keys in the form of "a.b.key".
Args:
filter_obj (BaseFilter): The filter object to convert.
Returns:
Callable[[Dict[str, Any]], bool]: A function that takes a dictionary and returns True if the dictionary matches the filter.
Raises:
TupliStorageError: If the filter type is unknown.
"""
if filter_obj is None:
return lambda item: True
def get_nested_value(item: Dict[str, Any], key_path: str) -> Any:
"""Get value from nested dictionary using dot notation."""
keys = key_path.split('.')
value = item
for k in keys:
if not isinstance(value, dict) or k not in value:
return None
value = value[k]
return value
def key_exists(item: Dict[str, Any], key_path: str) -> bool:
"""Check if a key path exists in nested dictionary."""
keys = key_path.split('.')
value = item
for k in keys:
if not isinstance(value, dict) or k not in value:
return False
value = value[k]
return True
match filter_obj.type:
case FilterType.AND:
sub_filters = [self.convert_filter_to_function(f) for f in filter_obj.filters]
return lambda item: all(f(item) for f in sub_filters)
case FilterType.OR:
sub_filters = [self.convert_filter_to_function(f) for f in filter_obj.filters]
return lambda item: any(f(item) for f in sub_filters)
case FilterType.EQ:
return (
lambda item: key_exists(item, filter_obj.key)
and get_nested_value(item, filter_obj.key) == filter_obj.value
)
case FilterType.GEQ:
return (
lambda item: key_exists(item, filter_obj.key)
and get_nested_value(item, filter_obj.key) >= filter_obj.value
)
case FilterType.LEQ:
return (
lambda item: key_exists(item, filter_obj.key)
and get_nested_value(item, filter_obj.key) <= filter_obj.value
)
case FilterType.GT:
return (
lambda item: key_exists(item, filter_obj.key)
and get_nested_value(item, filter_obj.key) > filter_obj.value
)
case FilterType.LT:
return (
lambda item: key_exists(item, filter_obj.key)
and get_nested_value(item, filter_obj.key) < filter_obj.value
)
case FilterType.NE:
return (
lambda item: key_exists(item, filter_obj.key)
and get_nested_value(item, filter_obj.key) != filter_obj.value
)
case _:
raise TupliStorageError(f'Unknown filter type: {filter_obj.type}')
[docs]
def list_benchmarks(self, filter: BaseFilter = None) -> list[BenchmarkHeader]:
"""
Lists all benchmarks in the storage that match the specified filter.
Args:
filter (BaseFilter, optional): The filter to apply when listing benchmarks.
Returns:
list[BenchmarkHeader]: A list of benchmark headers that match the filter.
"""
benchmark_dir = self.storage_dir / 'benchmarks'
if not benchmark_dir.exists():
return []
results = []
filter_func = self.convert_filter_to_function(filter)
for file_path in benchmark_dir.glob('*.json'):
try:
with open(file_path, 'r', encoding='UTF-8') as f:
benchmark_dict = json.loads(f.read())
# Apply filter function
if filter_func(benchmark_dict):
# Create header from benchmark data
header = BenchmarkHeader(**benchmark_dict)
results.append(header)
except Exception as e:
logger.info('Error loading benchmark header from %s: %s', file_path, str(e))
continue
return results
[docs]
def list_artifacts(self, filter: BaseFilter = None) -> list[ArtifactMetadataItem]:
"""
Lists all artifacts in the storage that match the specified filter.
Args:
filter (BaseFilter, optional): The filter to apply when listing artifacts.
Returns:
list[ArtifactMetadataItem]: A list of artifacts that match the filter.
"""
artifacts_dir = self.storage_dir / 'artifacts'
if not artifacts_dir.exists():
return []
results = []
filter_func = self.convert_filter_to_function(filter)
# Look for files that have an accompanying metadata file
for metadata_path in artifacts_dir.glob('*.metadata.json'):
try:
# Read metadata from JSON file
with open(metadata_path, 'r', encoding='UTF-8') as f:
metadata_dict = json.loads(f.read())
# Apply filter
if filter_func(metadata_dict):
metadata_item = ArtifactMetadataItem(**metadata_dict)
results.append(metadata_item)
except Exception as e:
logger.info('Error loading artifact metadata from %s: %s', metadata_path, str(e))
continue
return results
[docs]
def delete_benchmark(self, uri: str) -> None:
"""
Deletes the specified benchmark from the storage.
Args:
uri (str): The ID of the benchmark to delete.
"""
# Construct file path from benchmark ID
file_path = self.storage_dir / 'benchmarks' / f'{uri}.json'
if not file_path.exists():
raise TupliStorageError(f'Benchmark with ID {uri} does not exist.')
try:
file_path.unlink()
logger.info('Deleted benchmark with ID %s', uri)
except Exception as e:
raise TupliStorageError(f'Failed to delete benchmark {uri}: {str(e)}')
[docs]
def delete_artifact(self, uri: str) -> None:
"""
Deletes the specified artifact from the storage.
Args:
uri (str): The ID of the artifact to delete.
"""
# Construct file path from artifact ID
file_path = self.storage_dir / 'artifacts' / uri
metadata_path = file_path.with_suffix('.metadata.json')
if not file_path.exists():
raise TupliStorageError(f'Artifact with ID {uri} does not exist.')
try:
file_path.unlink()
if metadata_path.exists():
metadata_path.unlink()
logger.info('Deleted artifact with ID %s', uri)
except Exception as e:
raise TupliStorageError(f'Failed to delete artifact {uri}: {str(e)}')
[docs]
def publish_benchmark(self, uri: str) -> None:
"""
Publishing functionality is not available in FileStorage.
This is a placeholder to implement the interface.
Args:
uri (str): The URI of the benchmark to publish.
Returns:
str: The URI of the benchmark.
"""
logger.info('Publishing functionality is not available in FileStorage')
[docs]
def publish_artifact(self, uri: str) -> None:
"""
Publishing functionality is not available in FileStorage.
This is a placeholder to implement the interface.
Args:
uri (str): The URI of the artifact to publish.
"""
logger.info('Publishing functionality is not available in FileStorage')
# Episode-related methods
[docs]
def record_episode(self, episode: Episode) -> EpisodeHeader:
"""
Records an episode in the local file storage.
Args:
episode (Episode): The episode to record.
Returns:
EpisodeHeader: The header of the recorded episode.
Raises:
TupliStorageError: If the episode cannot be saved or serialized.
"""
# Create a directory for episodes if it doesn't exist
episodes_dir = self.storage_dir / 'episodes'
try:
episodes_dir.mkdir(exist_ok=True, parents=True)
except Exception as e:
raise TupliStorageError(f'Failed to create episodes directory: {str(e)}')
# Check if the referenced benchmark exists
benchmark_dir = self.storage_dir / 'benchmarks'
benchmark_found = False
for file_path in benchmark_dir.glob('*.json'):
try:
with open(file_path, 'r', encoding='UTF-8') as f:
benchmark_dict = json.loads(f.read())
if benchmark_dict['id'] == episode.benchmark_id:
benchmark_found = True
break
except Exception:
continue
if not benchmark_found:
raise TupliStorageError(
f'Referenced benchmark with ID {episode.benchmark_id} does not exist'
)
# Create a full EpisodeItem with metadata
episode_item = EpisodeItem.create_new(**episode.model_dump(), created_by='local_storage')
# Create filename based on episode ID
file_name = f'{episode_item.id}.json'
file_path = episodes_dir / file_name
if file_path.exists():
raise TupliStorageError(
f'The file {file_path} already exists and will not be overwritten.'
)
# Serialize the episode to JSON
try:
serialized_object = json.dumps(episode_item.model_dump(), indent=2)
except Exception as e:
raise TupliStorageError(f'Failed to serialize episode: {str(e)}')
try:
with open(file_path, 'w', encoding='UTF-8') as f:
f.write(serialized_object)
# Check if the file was saved correctly
if not file_path.exists():
raise TupliStorageError(f'Failed to save episode to {file_path}')
else:
logger.info('Saved episode to %s', file_path)
except Exception as e:
raise TupliStorageError(f'Failed to write episode to file: {str(e)}')
# Return the episode header
return EpisodeHeader(
**{k: v for k, v in episode_item.model_dump().items() if k != 'tuples'}
)
[docs]
def publish_episode(self, uri: str) -> None:
"""
Sets the is_public flag of an episode to True in local file storage.
This is a placeholder to implement the interface.
Args:
uri (str): The ID of the episode to publish.
"""
logger.info('Publishing functionality is not available in FileStorage')
[docs]
def list_episodes(
self, filter: BaseFilter = None, include_tuples: bool = False
) -> list[EpisodeHeader] | list[EpisodeItem]:
"""
Lists all episodes in the local file storage that match the specified filter.
Args:
filter (BaseFilter, optional): The filter to apply when listing episodes.
include_tuples (bool, optional): Whether to include tuples in the episode data.
Returns:
list[EpisodeHeader] | list[EpisodeItem]: A list of episode headers or full episode items.
"""
episodes_dir = self.storage_dir / 'episodes'
if not episodes_dir.exists():
return []
results = []
filter_func = self.convert_filter_to_function(filter)
for file_path in episodes_dir.glob('*.json'):
try:
with open(file_path, 'r', encoding='UTF-8') as f:
episode_dict = json.loads(f.read())
# Apply filter function
if filter_func(episode_dict):
# If we don't need to include tuples, create an EpisodeHeader
if not include_tuples:
# Create episode header object without tuples
header_dict = {k: v for k, v in episode_dict.items() if k != 'tuples'}
results.append(EpisodeHeader(**header_dict))
else:
# Include full episode with tuples
results.append(EpisodeItem(**episode_dict))
except Exception as e:
logger.info('Error loading episode from %s: %s', file_path, str(e))
continue
return results
[docs]
def delete_episode(self, uri: str) -> None:
"""
Deletes an episode from the local file storage.
Args:
uri (str): The ID of the episode to delete.
Raises:
TupliStorageError: If the episode cannot be deleted.
"""
# Construct file path from episode ID
file_path = self.storage_dir / 'episodes' / f'{uri}.json'
if not file_path.exists():
raise TupliStorageError(f'Episode with ID {uri} does not exist.')
try:
file_path.unlink()
logger.info('Deleted episode with ID %s', uri)
except Exception as e:
raise TupliStorageError(f'Failed to delete episode {uri}: {str(e)}')