Cloud storage module
Alpha feature
The cloud storage feature is currently in alpha. It is still under active development: methods and behaviors can still evolve until the feature is complete.
Cloud Storage Integration and Connection
A cloud storage integration is a connection between a Kili organization and a cloud storage (AWS, GCP or Azure). Once a cloud storage integration is created, it can be used in any project of the organization. Adding a cloud storage integration from the SDK is currently not supported. More information about how to create a cloud storage integration can be found here.
A cloud storage connection is a cloud storage integration used in a Kili project. It is used to import data from a cloud storage to a project. More information about how to use a cloud storage integration in a project can be found here.
Azure
It is recommended to install the Azure dependencies to use the Azure cloud storage integration and connection.
pip install kili[azure]
Methods attached to the Kili client, to run actions on cloud storage.
Source code in kili/presentation/client/cloud_storage.py
class CloudStorageClientMethods(BaseClientMethods):
"""Methods attached to the Kili client, to run actions on cloud storage."""
@overload
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: ListOrTuple[str] = (
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: Literal[True],
) -> Generator[Dict, None, None]:
...
@overload
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: ListOrTuple[str] = (
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: Literal[False] = False,
) -> List[Dict]:
...
@typechecked
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: ListOrTuple[str] = (
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage connections that match a set of criteria.
Args:
cloud_storage_connection_id: ID of the cloud storage connection.
cloud_storage_integration_id: ID of the cloud storage integration.
project_id: ID of the project.
fields: All the fields to request among the possible fields for the cloud storage connections.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
first: Maximum number of cloud storage connections to return.
skip: Number of skipped cloud storage connections.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage connections is returned.
Returns:
A list or a generator of the cloud storage connections that match the criteria.
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
"""
if (
cloud_storage_connection_id is None
and cloud_storage_integration_id is None
and project_id is None
):
raise ValueError(
"At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
" project_id must be specified"
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)
if cloud_storage_connection_id is None:
data_connections_gen = cloud_storage_use_cases.list_data_connections(
data_connection_filters=DataConnectionFilters(
project_id=ProjectId(project_id) if project_id is not None else None,
integration_id=(
DataIntegrationId(cloud_storage_integration_id)
if cloud_storage_integration_id is not None
else None
),
),
fields=fields,
options=QueryOptions(disable_tqdm, first, skip),
)
else:
data_connections_gen = (
i
for i in [
cloud_storage_use_cases.get_data_connection(
DataConnectionId(cloud_storage_connection_id), fields=fields
)
]
)
if as_generator:
return data_connections_gen
return list(data_connections_gen)
@overload
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[DataIntegrationPlatform] = None,
status: Optional[DataIntegrationStatus] = None,
organization_id: Optional[str] = None,
fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: Literal[True],
) -> Generator[Dict, None, None]:
...
@overload
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[DataIntegrationPlatform] = None,
status: Optional[DataIntegrationStatus] = None,
organization_id: Optional[str] = None,
fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: Literal[False] = False,
) -> List[Dict]:
...
@typechecked
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[DataIntegrationPlatform] = None,
status: Optional[DataIntegrationStatus] = None,
organization_id: Optional[str] = None,
fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
fields: All the fields to request among the possible fields for the cloud storage integrations.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
first: Maximum number of cloud storage integrations to return.
skip: Number of skipped cloud storage integrations.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage integrations is returned.
Returns:
A list or a generator of the cloud storage integrations that match the criteria.
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
"""
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_integrations_gen = CloudStorageUseCases(self.kili_api_gateway).list_data_integrations(
data_integration_filters=DataIntegrationFilters(
status=status,
id=(
DataIntegrationId(cloud_storage_integration_id)
if cloud_storage_integration_id is not None
else None
),
name=name,
platform=platform,
organization_id=(
OrganizationId(organization_id) if organization_id is not None else None
),
),
fields=fields,
options=options,
)
if as_generator:
return data_integrations_gen
return list(data_integrations_gen)
@typechecked
def count_cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[DataIntegrationPlatform] = None,
status: Optional[DataIntegrationStatus] = None,
organization_id: Optional[str] = None,
) -> int:
"""Count and return the number of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
Returns:
The number of cloud storage integrations that match the criteria.
"""
return CloudStorageUseCases(self.kili_api_gateway).count_data_integrations(
DataIntegrationFilters(
status=status,
id=(
DataIntegrationId(cloud_storage_integration_id)
if cloud_storage_integration_id is not None
else None
),
name=name,
platform=platform,
organization_id=(
OrganizationId(organization_id) if organization_id is not None else None
),
)
)
@typechecked
def add_cloud_storage_connection(
self,
project_id: str,
cloud_storage_integration_id: str,
selected_folders: Optional[List[str]] = None,
) -> Dict:
"""Connect a cloud storage to a project.
Args:
project_id: Id of the project.
cloud_storage_integration_id: Id of the cloud storage integration.
selected_folders: List of folders of the data integration to connect to the project.
If not provided, all folders of the data integration will be connected.
Returns:
A dict with the DataConnection Id.
"""
data_connection_id = CloudStorageUseCases(self.kili_api_gateway).add_data_connection(
project_id=ProjectId(project_id),
data_integration_id=DataIntegrationId(cloud_storage_integration_id),
selected_folders=selected_folders,
fields=("id",),
)["id"]
return {"id": data_connection_id}
@typechecked
def synchronize_cloud_storage_connection(
self,
cloud_storage_connection_id: str,
delete_extraneous_files: bool = False,
dry_run: bool = False,
) -> Dict:
"""Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project,
and then validate the differences.
If `delete_extraneous_files` is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Args:
cloud_storage_connection_id: Id of the cloud storage connection.
delete_extraneous_files: If True, delete extraneous files.
dry_run: If True, will not synchronize the data connection but only print the
differences. This is useful to check the differences before applying them to the
project.
Returns:
A dict with the cloud storage connection Id.
"""
data_connection_id = DataConnectionId(cloud_storage_connection_id)
cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)
cloud_storage_use_cases.synchronize_data_connection(
data_connection_id=data_connection_id,
delete_extraneous_files=delete_extraneous_files,
dry_run=dry_run,
logger=logger,
)
return cloud_storage_use_cases.get_data_connection(
data_connection_id=data_connection_id, fields=("numberOfAssets", "projectId")
)
add_cloud_storage_connection(self, project_id, cloud_storage_integration_id, selected_folders=None)
Connect a cloud storage to a project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_id |
str |
Id of the project. |
required |
cloud_storage_integration_id |
str |
Id of the cloud storage integration. |
required |
selected_folders |
Optional[List[str]] |
List of folders of the data integration to connect to the project. If not provided, all folders of the data integration will be connected. |
None |
Returns:
Type | Description |
---|---|
Dict |
A dict with the DataConnection Id. |
Source code in kili/presentation/client/cloud_storage.py
def add_cloud_storage_connection(
self,
project_id: str,
cloud_storage_integration_id: str,
selected_folders: Optional[List[str]] = None,
) -> Dict:
"""Connect a cloud storage to a project.
Args:
project_id: Id of the project.
cloud_storage_integration_id: Id of the cloud storage integration.
selected_folders: List of folders of the data integration to connect to the project.
If not provided, all folders of the data integration will be connected.
Returns:
A dict with the DataConnection Id.
"""
data_connection_id = CloudStorageUseCases(self.kili_api_gateway).add_data_connection(
project_id=ProjectId(project_id),
data_integration_id=DataIntegrationId(cloud_storage_integration_id),
selected_folders=selected_folders,
fields=("id",),
)["id"]
return {"id": data_connection_id}
cloud_storage_connections(self, cloud_storage_connection_id=None, cloud_storage_integration_id=None, project_id=None, fields=('id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId'), first=None, skip=0, disable_tqdm=None, *, as_generator=False)
Get a generator or a list of cloud storage connections that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_connection_id |
Optional[str] |
ID of the cloud storage connection. |
None |
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
project_id |
Optional[str] |
ID of the project. |
None |
fields |
Union[List[str], Tuple[str, ...]] |
All the fields to request among the possible fields for the cloud storage connections. See the documentation for all possible fields. |
('id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId') |
first |
Optional[int] |
Maximum number of cloud storage connections to return. |
None |
skip |
int |
Number of skipped cloud storage connections. |
0 |
disable_tqdm |
Optional[bool] |
If |
None |
as_generator |
bool |
If |
False |
Returns:
Type | Description |
---|---|
Iterable[Dict] |
A list or a generator of the cloud storage connections that match the criteria. |
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
Source code in kili/presentation/client/cloud_storage.py
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: ListOrTuple[str] = (
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage connections that match a set of criteria.
Args:
cloud_storage_connection_id: ID of the cloud storage connection.
cloud_storage_integration_id: ID of the cloud storage integration.
project_id: ID of the project.
fields: All the fields to request among the possible fields for the cloud storage connections.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
first: Maximum number of cloud storage connections to return.
skip: Number of skipped cloud storage connections.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage connections is returned.
Returns:
A list or a generator of the cloud storage connections that match the criteria.
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
"""
if (
cloud_storage_connection_id is None
and cloud_storage_integration_id is None
and project_id is None
):
raise ValueError(
"At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
" project_id must be specified"
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)
if cloud_storage_connection_id is None:
data_connections_gen = cloud_storage_use_cases.list_data_connections(
data_connection_filters=DataConnectionFilters(
project_id=ProjectId(project_id) if project_id is not None else None,
integration_id=(
DataIntegrationId(cloud_storage_integration_id)
if cloud_storage_integration_id is not None
else None
),
),
fields=fields,
options=QueryOptions(disable_tqdm, first, skip),
)
else:
data_connections_gen = (
i
for i in [
cloud_storage_use_cases.get_data_connection(
DataConnectionId(cloud_storage_connection_id), fields=fields
)
]
)
if as_generator:
return data_connections_gen
return list(data_connections_gen)
cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None, fields=('name', 'id', 'platform', 'status'), first=None, skip=0, disable_tqdm=None, *, as_generator=False)
Get a generator or a list of cloud storage integrations that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
name |
Optional[str] |
Name of the cloud storage integration. |
None |
platform |
Optional[Literal['AWS', 'Azure', 'GCP']] |
Platform of the cloud storage integration. |
None |
status |
Optional[Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']] |
Status of the cloud storage integration. |
None |
organization_id |
Optional[str] |
ID of the organization. |
None |
fields |
Union[List[str], Tuple[str, ...]] |
All the fields to request among the possible fields for the cloud storage integrations. See the documentation for all possible fields. |
('name', 'id', 'platform', 'status') |
first |
Optional[int] |
Maximum number of cloud storage integrations to return. |
None |
skip |
int |
Number of skipped cloud storage integrations. |
0 |
disable_tqdm |
Optional[bool] |
If |
None |
as_generator |
bool |
If |
False |
Returns:
Type | Description |
---|---|
Iterable[Dict] |
A list or a generator of the cloud storage integrations that match the criteria. |
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
Source code in kili/presentation/client/cloud_storage.py
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[DataIntegrationPlatform] = None,
status: Optional[DataIntegrationStatus] = None,
organization_id: Optional[str] = None,
fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
fields: All the fields to request among the possible fields for the cloud storage integrations.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
first: Maximum number of cloud storage integrations to return.
skip: Number of skipped cloud storage integrations.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage integrations is returned.
Returns:
A list or a generator of the cloud storage integrations that match the criteria.
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
"""
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_integrations_gen = CloudStorageUseCases(self.kili_api_gateway).list_data_integrations(
data_integration_filters=DataIntegrationFilters(
status=status,
id=(
DataIntegrationId(cloud_storage_integration_id)
if cloud_storage_integration_id is not None
else None
),
name=name,
platform=platform,
organization_id=(
OrganizationId(organization_id) if organization_id is not None else None
),
),
fields=fields,
options=options,
)
if as_generator:
return data_integrations_gen
return list(data_integrations_gen)
count_cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None)
Count and return the number of cloud storage integrations that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
name |
Optional[str] |
Name of the cloud storage integration. |
None |
platform |
Optional[Literal['AWS', 'Azure', 'GCP']] |
Platform of the cloud storage integration. |
None |
status |
Optional[Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']] |
Status of the cloud storage integration. |
None |
organization_id |
Optional[str] |
ID of the organization. |
None |
Returns:
Type | Description |
---|---|
int |
The number of cloud storage integrations that match the criteria. |
Source code in kili/presentation/client/cloud_storage.py
def count_cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[DataIntegrationPlatform] = None,
status: Optional[DataIntegrationStatus] = None,
organization_id: Optional[str] = None,
) -> int:
"""Count and return the number of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
Returns:
The number of cloud storage integrations that match the criteria.
"""
return CloudStorageUseCases(self.kili_api_gateway).count_data_integrations(
DataIntegrationFilters(
status=status,
id=(
DataIntegrationId(cloud_storage_integration_id)
if cloud_storage_integration_id is not None
else None
),
name=name,
platform=platform,
organization_id=(
OrganizationId(organization_id) if organization_id is not None else None
),
)
)
synchronize_cloud_storage_connection(self, cloud_storage_connection_id, delete_extraneous_files=False, dry_run=False)
Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project, and then validate the differences.
If delete_extraneous_files
is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_connection_id |
str |
Id of the cloud storage connection. |
required |
delete_extraneous_files |
bool |
If True, delete extraneous files. |
False |
dry_run |
bool |
If True, will not synchronize the data connection but only print the differences. This is useful to check the differences before applying them to the project. |
False |
Returns:
Type | Description |
---|---|
Dict |
A dict with the cloud storage connection Id. |
Source code in kili/presentation/client/cloud_storage.py
def synchronize_cloud_storage_connection(
self,
cloud_storage_connection_id: str,
delete_extraneous_files: bool = False,
dry_run: bool = False,
) -> Dict:
"""Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project,
and then validate the differences.
If `delete_extraneous_files` is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Args:
cloud_storage_connection_id: Id of the cloud storage connection.
delete_extraneous_files: If True, delete extraneous files.
dry_run: If True, will not synchronize the data connection but only print the
differences. This is useful to check the differences before applying them to the
project.
Returns:
A dict with the cloud storage connection Id.
"""
data_connection_id = DataConnectionId(cloud_storage_connection_id)
cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)
cloud_storage_use_cases.synchronize_data_connection(
data_connection_id=data_connection_id,
delete_extraneous_files=delete_extraneous_files,
dry_run=dry_run,
logger=logger,
)
return cloud_storage_use_cases.get_data_connection(
data_connection_id=data_connection_id, fields=("numberOfAssets", "projectId")
)