Source code for tq42.dataset

from __future__ import annotations

import base64
import hashlib
import os.path
from pathlib import Path
from typing import Optional, List
from urllib.error import HTTPError

from google.protobuf.json_format import MessageToJson
from tqdm import tqdm
import requests
import validators

from tq42.client import TQ42Client
from tq42.utils.exception_handling import handle_generic_sdk_errors

# This is important to re-export it!
from com.terraquantum.storage.v1alpha1.storage_pb2 import (
    DatasetSensitivityProto,
    StorageProto,
    StorageType,
)
from com.terraquantum.storage.v1alpha1.create_storage_from_file_pb2 import (
    CreateStorageFromFileRequest,
    CreateStorageFromFileResponse,
)

from com.terraquantum.storage.v1alpha1.delete_storage_pb2 import (
    DeleteStorageRequest,
)
from com.terraquantum.storage.v1alpha1.create_storage_from_external_pb2 import (
    CreateStorageFromExternalBucketRequest,
)
from com.terraquantum.storage.v1alpha1.get_storage_request_pb2 import GetStorageRequest
from com.terraquantum.storage.v1alpha1.list_storages_pb2 import (
    ListStoragesRequest,
    ListStoragesResponse,
)
from com.terraquantum.storage.v1alpha1.export_storage_pb2 import (
    ExportStorageRequest,
    ExportStorageResponse,
)

from tq42.utils.pretty_list import PrettyList


[docs]class Dataset: """ Reference an existing dataset. :param client: a client instance :param id: the id of the existing dataset :param data: only used internally """ id: str """ID of the dataset""" data: StorageProto """Object containing all attributes of the dataset""" _client: TQ42Client def __init__( self, client: TQ42Client, id: str, data: Optional[StorageProto] = None ): self._client = client self.id = id if data: self.data = data else: self.data = self._get() def __repr__(self) -> str: return f"<Dataset Id={self.id} Name={self.data.name}>" def __str__(self) -> str: return f"Dataset: {MessageToJson(self.data, preserving_proto_field_name=True)}" @handle_generic_sdk_errors def _get(self) -> StorageProto: get_storage_request = GetStorageRequest(storage_id=self.id) storage_data: StorageProto = self._client.storage_client.GetStorage( request=get_storage_request, metadata=self._client.metadata ) return storage_data def _refresh(self) -> None: self.data = self._get() @staticmethod def from_proto(client: TQ42Client, msg: StorageProto) -> Dataset: """ Creates Dataset instance from a protobuf message. :meta private: """ return Dataset(client=client, id=msg.id, data=msg)
[docs] @staticmethod @handle_generic_sdk_errors def create( client: TQ42Client, project_id: str, name: str, description: str, sensitivity: DatasetSensitivityProto, file: str = None, url: str = None, ) -> Dataset: """ Create a dataset for a project. :params client: a client instance :param project_id: the id of the project where the dataset should be created in :param name: name for the dataset :param description: description for the dataset :param sensitivity: sensitivity of the dataset (e.g. `DatasetSensitivityProto.SENSITIVE` for a sensitive dataset) :param file: path to local file that should be uploaded to the dataset :param url: url to remote file that should be uploaded to the dataset :returns: the created dataset Only one of `url` or `file` can be specified. """ if (file and url) or (not file and not url): raise ValueError("Please provide (only) one of: file or url") if url: res = Dataset._create_from_external_bucket( client=client, project_id=project_id, name=name, description=description, url=url, sensitivity=sensitivity, ) return Dataset.from_proto(client=client, msg=res) res = Dataset._create_from_file( client=client, project_id=project_id, name=name, description=description, file=file, sensitivity=sensitivity, ) return Dataset.from_proto(client=client, msg=res)
@staticmethod def _create_from_file( client: TQ42Client, project_id: str, name: str, description: str, file: str, sensitivity: DatasetSensitivityProto, ) -> StorageProto: file_path = Path(file) if not file_path.exists(): raise FileNotFoundError("The specified file does not exist") with file_path.open(mode="rb") as f: data = f.read() file_hash = hashlib.md5(data).digest() file_hash_b64 = base64.b64encode(file_hash).decode("utf-8") create_dataset_request = CreateStorageFromFileRequest( project_id=project_id, name=name, description=description, hash_md5=file_hash_b64, file_name=file_path.name, sensitivity=sensitivity, ) res: CreateStorageFromFileResponse = ( client.storage_client.CreateStorageFromFile( request=create_dataset_request, metadata=client.metadata ) ) headers = { "Content-Type": "application/octet-stream", "Content-MD5": file_hash_b64, } file_upload_response = requests.put( url=res.signed_url, headers=headers, data=data, ) if not file_upload_response.ok: raise HTTPError( url=res.signed_url, code=file_upload_response.status_code, msg=f"Upload of file {file} to storage failed. Please make sure your network is working. " "If issues persist please get in touch via https://help.terraquantum.io/en", fp=None, hdrs=file_upload_response.headers, ) return res.storage @staticmethod def _create_from_external_bucket( client: TQ42Client, project_id: str, name: str, description: str, url: str, sensitivity: DatasetSensitivityProto, ) -> StorageProto: create_dataset_request = CreateStorageFromExternalBucketRequest( project_id=project_id, name=name, description=description, url=url, sensitivity=sensitivity, ) return client.storage_client.CreateStorageFromExternalBucket( request=create_dataset_request, metadata=client.metadata )
[docs] @handle_generic_sdk_errors def delete(self): """ Delete this dataset. """ delete_dataset_request = DeleteStorageRequest(storage_id=self.id) self.data = self._client.storage_client.DeleteStorage( request=delete_dataset_request, metadata=self._client.metadata )
[docs] @handle_generic_sdk_errors def export(self, directory_path: str = ".") -> List[str]: """ Export all files within a dataset to a local path :param directory_path: local path where all files should be exported to (must exist and be a directory) :returns: a list of exported file paths """ if not os.path.isdir(directory_path): raise ValueError( f"Provided directory path {directory_path} is not a valid directory" ) export_storage_request = ExportStorageRequest(storage_id=self.id) res: ExportStorageResponse = self._client.storage_client.ExportStorage( request=export_storage_request, metadata=self._client.metadata ) export_dir = os.path.join(directory_path, self.data.name) os.makedirs(export_dir, exist_ok=True) exported_file_paths = [] for signed_url in res.signed_urls: file_path = os.path.join( export_dir, self._get_file_name_from_signed_url(signed_url=signed_url), ) self._download_file_from_url(url=signed_url, file_path=file_path) exported_file_paths.append(file_path) return exported_file_paths
@staticmethod def _download_file_from_url(url: str, file_path: str): response = requests.get(url, stream=True) if os.path.exists(file_path): raise FileExistsError(file_path) print("Downloading file to {}".format(file_path)) with open(file_path, "wb") as handle: for data in tqdm(response.iter_content()): handle.write(data) @staticmethod def _get_file_name_from_signed_url(signed_url: str) -> str: if not validators.url(signed_url): raise ValueError(f"The signed URL {signed_url} is not a valid URL") url_without_parameters = signed_url.split("?")[0] return url_without_parameters.split("/")[-1]
[docs]@handle_generic_sdk_errors def list_all(client: TQ42Client, project_id: str) -> List[Dataset]: """ List all datasets in a project. :param client: a client instance :param project_id: the id of a project :returns: a list of datasets """ list_datasets_request = ListStoragesRequest( project_id=project_id, type=StorageType.DATASET ) res: ListStoragesResponse = client.storage_client.ListStorages( request=list_datasets_request, metadata=client.metadata ) return PrettyList( [Dataset.from_proto(client=client, msg=dataset) for dataset in res.storages] )