import logging
import time
from typing import (
Any,
Dict,
List,
Optional,
TYPE_CHECKING,
Union,
)
from bioblend import (
CHUNK_SIZE,
TimeoutException,
)
from bioblend.galaxy.client import Client
from bioblend.galaxy.datasets import TERMINAL_STATES
if TYPE_CHECKING:
from bioblend.galaxy import GalaxyInstance
log = logging.getLogger(__name__)
class HasElements:
def __init__(
self,
name: str,
type: str = "list",
elements: Optional[Union[List[Union["CollectionElement", "SimpleElement"]], Dict[str, Any]]] = None,
) -> None:
self.name = name
self.type = type
if isinstance(elements, dict):
self.elements: List[Union["CollectionElement", "SimpleElement"]] = [
HistoryDatasetElement(name=key, id=value) for key, value in elements.values()
]
elif elements:
self.elements = elements
def add(self, element: Union["CollectionElement", "SimpleElement"]) -> "HasElements":
self.elements.append(element)
return self
[docs]class CollectionDescription(HasElements):
[docs] def to_dict(self) -> Dict[str, Union[str, List]]:
return dict(name=self.name, collection_type=self.type, element_identifiers=[e.to_dict() for e in self.elements])
[docs]class CollectionElement(HasElements):
[docs] def to_dict(self) -> Dict[str, Union[str, List]]:
return dict(
src="new_collection",
name=self.name,
collection_type=self.type,
element_identifiers=[e.to_dict() for e in self.elements],
)
class SimpleElement:
def __init__(self, value: Dict[str, str]) -> None:
self.value = value
def to_dict(self) -> Dict[str, str]:
return self.value
[docs]class HistoryDatasetElement(SimpleElement):
def __init__(self, name: str, id: str) -> None:
super().__init__(
dict(
name=name,
src="hda",
id=id,
)
)
[docs]class HistoryDatasetCollectionElement(SimpleElement):
def __init__(self, name: str, id: str) -> None:
super().__init__(
dict(
name=name,
src="hdca",
id=id,
)
)
[docs]class LibraryDatasetElement(SimpleElement):
def __init__(self, name: str, id: str) -> None:
super().__init__(
dict(
name=name,
src="ldda",
id=id,
)
)
[docs]class DatasetCollectionClient(Client):
gi: "GalaxyInstance"
module = "dataset_collections"
def __init__(self, galaxy_instance: "GalaxyInstance") -> None:
super().__init__(galaxy_instance)
[docs] def show_dataset_collection(self, dataset_collection_id: str, instance_type: str = "history") -> Dict[str, Any]:
"""
Get details of a given dataset collection of the current user
:type dataset_collection_id: str
:param dataset_collection_id: dataset collection ID
:type instance_type: str
:param instance_type: instance type of the collection - 'history' or 'library'
:rtype: dict
:return: element view of the dataset collection
"""
params = {
"instance_type": instance_type,
}
url = self._make_url(module_id=dataset_collection_id)
return self._get(id=dataset_collection_id, url=url, params=params)
[docs] def download_dataset_collection(self, dataset_collection_id: str, file_path: str) -> Dict[str, Any]:
"""
Download a history dataset collection as an archive.
:type dataset_collection_id: str
:param dataset_collection_id: Encoded dataset collection ID
:type file_path: str
:param file_path: The path to which the archive will be downloaded
:rtype: dict
:return: Information about the downloaded archive.
.. note::
This method downloads a ``zip`` archive for Galaxy 21.01 and later.
For earlier versions of Galaxy this method downloads a ``tgz`` archive.
"""
url = self._make_url(module_id=dataset_collection_id) + "/download"
r = self.gi.make_get_request(url, stream=True)
r.raise_for_status()
archive_type = "zip" if self.gi.config.get_version()["version_major"] >= "21.01" else "tgz"
with open(file_path, "wb") as fp:
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
if chunk:
fp.write(chunk)
return {"file_path": file_path, "archive_type": archive_type}
[docs] def wait_for_dataset_collection(
self,
dataset_collection_id: str,
maxwait: float = 12000,
interval: float = 3,
proportion_complete: float = 1.0,
check: bool = True,
) -> Dict[str, Any]:
"""
Wait until all or a specified proportion of elements of a dataset
collection are in a terminal state.
:type dataset_collection_id: str
:param dataset_collection_id: dataset collection ID
:type maxwait: float
:param maxwait: Total time (in seconds) to wait for the dataset
states in the dataset collection to become terminal. If not
all datasets are in a terminal state within this time, a
``DatasetCollectionTimeoutException`` will be raised.
:type interval: float
:param interval: Time (in seconds) to wait between two consecutive checks.
:type proportion_complete: float
:param proportion_complete: Proportion of elements in this collection
that have to be in a terminal state for this method to return. Must
be a number between 0 and 1. For example: if the dataset collection
contains 2 elements, and proportion_complete=0.5 is specified, then
wait_for_dataset_collection will return as soon as 1 of the 2
datasets is in a terminal state. Default is 1, i.e. all elements must
complete.
:type check: bool
:param check: Whether to check if all the terminal states of datasets
in the dataset collection are 'ok'. This will raise an Exception if
a dataset is in a terminal state other than 'ok'.
:rtype: dict
:return: Details of the given dataset collection.
"""
assert maxwait >= 0
assert interval > 0
assert 0 <= proportion_complete <= 1
time_left = maxwait
while True:
dataset_collection = self.show_dataset_collection(dataset_collection_id)
states = [elem["object"]["state"] for elem in dataset_collection["elements"]]
terminal_states = [state for state in states if state in TERMINAL_STATES]
if set(terminal_states) not in [{"ok"}, set()]:
raise Exception(
f"Dataset collection {dataset_collection_id} contains elements in the "
f"following non-ok terminal states: {', '.join(set(terminal_states) - {'ok'})}"
)
proportion = len(terminal_states) / len(states)
if proportion >= proportion_complete:
return dataset_collection
if time_left > 0:
log.info(
f"The dataset collection {dataset_collection_id} has {len(terminal_states)} out of {len(states)} datasets in a terminal state. Will wait {time_left} more s"
)
time.sleep(min(time_left, interval))
time_left -= interval
else:
raise DatasetCollectionTimeoutException(
f"Less than {proportion_complete * 100}% of datasets in the dataset collection is in a terminal state after {maxwait} s"
)
class DatasetCollectionTimeoutException(TimeoutException):
pass
__all__ = (
"CollectionDescription",
"CollectionElement",
"DatasetCollectionClient",
"HistoryDatasetElement",
"HistoryDatasetCollectionElement",
"LibraryDatasetElement",
)