Cloud storage module
Alpha feature
The cloud storage feature is currently in alpha. It is still under active development: methods and behaviors can still evolve until the feature is complete.
Cloud Storage Integration and Connection
A cloud storage integration is a connection between a Kili organization and a cloud storage (AWS, GCP or Azure). Once a cloud storage integration is created, it can be used in any project of the organization. Adding a cloud storage integration from the SDK is currently not supported. More information about how to create a cloud storage integration can be found here.
A cloud storage connection is a cloud storage integration used in a Kili project. It is used to import data from a cloud storage to a project. More information about how to use a cloud storage integration in a project can be found here.
Azure
It is recommended to install the Azure dependencies to use the Azure cloud storage integration and connection.
pip install kili[azure]
Queries
Set of cloud storage integration queries.
Source code in kili/entrypoints/queries/data_integration/__init__.py
class QueriesDataIntegration:
"""Set of cloud storage integration queries."""
graphql_client: GraphQLClient
# pylint: disable=too-many-arguments,dangerous-default-value
@overload
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
fields: List[str] = ["name", "id", "platform", "status"],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: Literal[True],
) -> Generator[Dict, None, None]:
...
@overload
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
fields: List[str] = ["name", "id", "platform", "status"],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: Literal[False] = False,
) -> List[Dict]:
...
@typechecked
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
fields: List[str] = ["name", "id", "platform", "status"],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
fields: All the fields to request among the possible fields for the cloud storage integrations.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
first: Maximum number of cloud storage integrations to return.
skip: Number of skipped cloud storage integrations.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage integrations is returned.
Returns:
A list or a generator of the cloud storage integrations that match the criteria.
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
"""
where = DataIntegrationWhere(
data_integration_id=cloud_storage_integration_id,
name=name,
platform=platform,
status=status,
organization_id=organization_id,
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_integrations_gen = DataIntegrationsQuery(self.graphql_client)(where, fields, options)
if as_generator:
return data_integrations_gen
return list(data_integrations_gen)
@typechecked
def count_cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
) -> int:
"""Count and return the number of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
Returns:
The number of cloud storage integrations that match the criteria.
"""
where = DataIntegrationWhere(
data_integration_id=cloud_storage_integration_id,
name=name,
platform=platform,
status=status,
organization_id=organization_id,
)
return DataIntegrationsQuery(self.graphql_client).count(where)
cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None, fields=['name', 'id', 'platform', 'status'], first=None, skip=0, disable_tqdm=False, *, as_generator=False)
Get a generator or a list of cloud storage integrations that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
name |
Optional[str] |
Name of the cloud storage integration. |
None |
platform |
Optional[Literal['AWS', 'Azure', 'GCP']] |
Platform of the cloud storage integration. |
None |
status |
Optional[Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']] |
Status of the cloud storage integration. |
None |
organization_id |
Optional[str] |
ID of the organization. |
None |
fields |
List[str] |
All the fields to request among the possible fields for the cloud storage integrations. See the documentation for all possible fields. |
['name', 'id', 'platform', 'status'] |
first |
Optional[int] |
Maximum number of cloud storage integrations to return. |
None |
skip |
int |
Number of skipped cloud storage integrations. |
0 |
disable_tqdm |
bool |
If |
False |
as_generator |
bool |
If |
False |
Returns:
Type | Description |
---|---|
Iterable[Dict] |
A list or a generator of the cloud storage integrations that match the criteria. |
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
Source code in kili/entrypoints/queries/data_integration/__init__.py
@typechecked
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
fields: List[str] = ["name", "id", "platform", "status"],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
fields: All the fields to request among the possible fields for the cloud storage integrations.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
first: Maximum number of cloud storage integrations to return.
skip: Number of skipped cloud storage integrations.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage integrations is returned.
Returns:
A list or a generator of the cloud storage integrations that match the criteria.
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
"""
where = DataIntegrationWhere(
data_integration_id=cloud_storage_integration_id,
name=name,
platform=platform,
status=status,
organization_id=organization_id,
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_integrations_gen = DataIntegrationsQuery(self.graphql_client)(where, fields, options)
if as_generator:
return data_integrations_gen
return list(data_integrations_gen)
count_cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None)
Count and return the number of cloud storage integrations that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
name |
Optional[str] |
Name of the cloud storage integration. |
None |
platform |
Optional[Literal['AWS', 'Azure', 'GCP']] |
Platform of the cloud storage integration. |
None |
status |
Optional[Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']] |
Status of the cloud storage integration. |
None |
organization_id |
Optional[str] |
ID of the organization. |
None |
Returns:
Type | Description |
---|---|
int |
The number of cloud storage integrations that match the criteria. |
Source code in kili/entrypoints/queries/data_integration/__init__.py
@typechecked
def count_cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
) -> int:
"""Count and return the number of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
Returns:
The number of cloud storage integrations that match the criteria.
"""
where = DataIntegrationWhere(
data_integration_id=cloud_storage_integration_id,
name=name,
platform=platform,
status=status,
organization_id=organization_id,
)
return DataIntegrationsQuery(self.graphql_client).count(where)
Set of cloud storage connection queries.
Source code in kili/entrypoints/queries/data_connection/__init__.py
class QueriesDataConnection:
"""Set of cloud storage connection queries."""
graphql_client: GraphQLClient
# pylint: disable=too-many-arguments,dangerous-default-value
@overload
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: List[str] = [
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: Literal[True],
) -> Generator[Dict, None, None]:
...
@overload
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: List[str] = [
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: Literal[False] = False,
) -> List[Dict]:
...
@typechecked
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: List[str] = [
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage connections that match a set of criteria.
Args:
cloud_storage_connection_id: ID of the cloud storage connection.
cloud_storage_integration_id: ID of the cloud storage integration.
project_id: ID of the project.
fields: All the fields to request among the possible fields for the cloud storage connections.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
first: Maximum number of cloud storage connections to return.
skip: Number of skipped cloud storage connections.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage connections is returned.
Returns:
A list or a generator of the cloud storage connections that match the criteria.
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
"""
if (
cloud_storage_connection_id is None
and cloud_storage_integration_id is None
and project_id is None
):
raise ValueError(
"At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
" project_id must be specified"
)
# call dataConnection resolver
if cloud_storage_connection_id is not None:
data_connection = services.get_data_connection(
self, cloud_storage_connection_id, fields
)
data_connection_list = [data_connection]
if as_generator:
return iter(data_connection_list)
return data_connection_list
# call dataConnections resolver
where = DataConnectionsWhere(
project_id=project_id, data_integration_id=cloud_storage_integration_id
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_connections_gen = DataConnectionsQuery(self.graphql_client)(where, fields, options)
if as_generator:
return data_connections_gen
return list(data_connections_gen)
cloud_storage_connections(self, cloud_storage_connection_id=None, cloud_storage_integration_id=None, project_id=None, fields=['id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId'], first=None, skip=0, disable_tqdm=False, *, as_generator=False)
Get a generator or a list of cloud storage connections that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_connection_id |
Optional[str] |
ID of the cloud storage connection. |
None |
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
project_id |
Optional[str] |
ID of the project. |
None |
fields |
List[str] |
All the fields to request among the possible fields for the cloud storage connections. See the documentation for all possible fields. |
['id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId'] |
first |
Optional[int] |
Maximum number of cloud storage connections to return. |
None |
skip |
int |
Number of skipped cloud storage connections. |
0 |
disable_tqdm |
bool |
If |
False |
as_generator |
bool |
If |
False |
Returns:
Type | Description |
---|---|
Iterable[Dict] |
A list or a generator of the cloud storage connections that match the criteria. |
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
Source code in kili/entrypoints/queries/data_connection/__init__.py
@typechecked
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: List[str] = [
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage connections that match a set of criteria.
Args:
cloud_storage_connection_id: ID of the cloud storage connection.
cloud_storage_integration_id: ID of the cloud storage integration.
project_id: ID of the project.
fields: All the fields to request among the possible fields for the cloud storage connections.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
first: Maximum number of cloud storage connections to return.
skip: Number of skipped cloud storage connections.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage connections is returned.
Returns:
A list or a generator of the cloud storage connections that match the criteria.
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
"""
if (
cloud_storage_connection_id is None
and cloud_storage_integration_id is None
and project_id is None
):
raise ValueError(
"At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
" project_id must be specified"
)
# call dataConnection resolver
if cloud_storage_connection_id is not None:
data_connection = services.get_data_connection(
self, cloud_storage_connection_id, fields
)
data_connection_list = [data_connection]
if as_generator:
return iter(data_connection_list)
return data_connection_list
# call dataConnections resolver
where = DataConnectionsWhere(
project_id=project_id, data_integration_id=cloud_storage_integration_id
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_connections_gen = DataConnectionsQuery(self.graphql_client)(where, fields, options)
if as_generator:
return data_connections_gen
return list(data_connections_gen)
Mutations
Set of DataConnection mutations.
Source code in kili/entrypoints/mutations/data_connection/__init__.py
class MutationsDataConnection:
"""Set of DataConnection mutations."""
graphql_client: GraphQLClient
@typechecked
def add_cloud_storage_connection(
self,
project_id: str,
cloud_storage_integration_id: str,
selected_folders: Optional[List[str]] = None,
) -> Dict:
"""Connect a cloud storage to a project.
Args:
project_id: Id of the project.
cloud_storage_integration_id: Id of the cloud storage integration.
selected_folders: List of folders of the data integration to connect to the project.
If not provided, all folders of the data integration will be connected.
Returns:
A dict with the DataConnection Id.
"""
data_integrations = list(
DataIntegrationsQuery(self.graphql_client)(
where=DataIntegrationWhere(data_integration_id=cloud_storage_integration_id),
fields=["id"],
options=QueryOptions(disable_tqdm=True, first=1, skip=0),
)
)
if len(data_integrations) == 0:
raise ValueError(
f"Cloud storage integration with id {cloud_storage_integration_id} not found."
)
variables = {
"data": {
"projectId": project_id,
"integrationId": cloud_storage_integration_id,
"isChecking": False,
"lastChecked": datetime.now().isoformat(sep="T", timespec="milliseconds") + "Z",
"selectedFolders": selected_folders,
}
}
result = self.graphql_client.execute(GQL_ADD_PROJECT_DATA_CONNECTION, variables)
result = format_result("data", result)
# We trigger data difference computation (same behavior as in the frontend)
services.compute_differences(self, result["id"])
return result
@typechecked
def synchronize_cloud_storage_connection(
self,
cloud_storage_connection_id: str,
delete_extraneous_files: bool = False,
dry_run: bool = False,
) -> Dict:
"""Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project,
and then validate the differences.
If `delete_extraneous_files` is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Args:
cloud_storage_connection_id: Id of the cloud storage connection.
delete_extraneous_files: If True, delete extraneous files.
dry_run: If True, will not synchronize the data connection but only print the
differences. This is useful to check the differences before applying them to the
project.
Returns:
A dict with the cloud storage connection Id.
"""
return services.synchronize_data_connection(
self, cloud_storage_connection_id, delete_extraneous_files, dry_run
)
add_cloud_storage_connection(self, project_id, cloud_storage_integration_id, selected_folders=None)
Connect a cloud storage to a project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_id |
str |
Id of the project. |
required |
cloud_storage_integration_id |
str |
Id of the cloud storage integration. |
required |
selected_folders |
Optional[List[str]] |
List of folders of the data integration to connect to the project. If not provided, all folders of the data integration will be connected. |
None |
Returns:
Type | Description |
---|---|
Dict |
A dict with the DataConnection Id. |
Source code in kili/entrypoints/mutations/data_connection/__init__.py
@typechecked
def add_cloud_storage_connection(
self,
project_id: str,
cloud_storage_integration_id: str,
selected_folders: Optional[List[str]] = None,
) -> Dict:
"""Connect a cloud storage to a project.
Args:
project_id: Id of the project.
cloud_storage_integration_id: Id of the cloud storage integration.
selected_folders: List of folders of the data integration to connect to the project.
If not provided, all folders of the data integration will be connected.
Returns:
A dict with the DataConnection Id.
"""
data_integrations = list(
DataIntegrationsQuery(self.graphql_client)(
where=DataIntegrationWhere(data_integration_id=cloud_storage_integration_id),
fields=["id"],
options=QueryOptions(disable_tqdm=True, first=1, skip=0),
)
)
if len(data_integrations) == 0:
raise ValueError(
f"Cloud storage integration with id {cloud_storage_integration_id} not found."
)
variables = {
"data": {
"projectId": project_id,
"integrationId": cloud_storage_integration_id,
"isChecking": False,
"lastChecked": datetime.now().isoformat(sep="T", timespec="milliseconds") + "Z",
"selectedFolders": selected_folders,
}
}
result = self.graphql_client.execute(GQL_ADD_PROJECT_DATA_CONNECTION, variables)
result = format_result("data", result)
# We trigger data difference computation (same behavior as in the frontend)
services.compute_differences(self, result["id"])
return result
synchronize_cloud_storage_connection(self, cloud_storage_connection_id, delete_extraneous_files=False, dry_run=False)
Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project, and then validate the differences.
If delete_extraneous_files
is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_connection_id |
str |
Id of the cloud storage connection. |
required |
delete_extraneous_files |
bool |
If True, delete extraneous files. |
False |
dry_run |
bool |
If True, will not synchronize the data connection but only print the differences. This is useful to check the differences before applying them to the project. |
False |
Returns:
Type | Description |
---|---|
Dict |
A dict with the cloud storage connection Id. |
Source code in kili/entrypoints/mutations/data_connection/__init__.py
@typechecked
def synchronize_cloud_storage_connection(
self,
cloud_storage_connection_id: str,
delete_extraneous_files: bool = False,
dry_run: bool = False,
) -> Dict:
"""Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project,
and then validate the differences.
If `delete_extraneous_files` is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Args:
cloud_storage_connection_id: Id of the cloud storage connection.
delete_extraneous_files: If True, delete extraneous files.
dry_run: If True, will not synchronize the data connection but only print the
differences. This is useful to check the differences before applying them to the
project.
Returns:
A dict with the cloud storage connection Id.
"""
return services.synchronize_data_connection(
self, cloud_storage_connection_id, delete_extraneous_files, dry_run
)