Cloud storage module
Alpha feature
The cloud storage feature is currently in alpha. It is still under active development: methods and behaviors can still evolve until the feature is complete.
Cloud Storage Integration and Connection
A cloud storage integration is a connection between a Kili organization and a cloud storage (AWS, GCP or Azure). Once a cloud storage integration is created, it can be used in any project of the organization. Adding a cloud storage integration from the SDK is currently not supported. More information about how to create a cloud storage integration can be found here.
A cloud storage connection is a cloud storage integration used in a Kili project. It is used to import data from a cloud storage to a project. More information about how to use a cloud storage integration in a project can be found here.
Queries
Set of cloud storage integration queries.
Source code in kili/entrypoints/queries/data_integration/__init__.py
class QueriesDataIntegration:
"""Set of cloud storage integration queries."""
# pylint: disable=too-many-arguments,dangerous-default-value
def __init__(self, auth: KiliAuth):
"""Initialize the subclass.
Args:
auth: KiliAuth object
"""
self.auth = auth
@overload
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
fields: List[str] = ["name", "id", "platform", "status"],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: Literal[True],
) -> Generator[Dict, None, None]:
...
@overload
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
fields: List[str] = ["name", "id", "platform", "status"],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: Literal[False] = False,
) -> List[Dict]:
...
@typechecked
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
fields: List[str] = ["name", "id", "platform", "status"],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
fields: All the fields to request among the possible fields for the cloud storage integrations.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
first: Maximum number of cloud storage integrations to return.
skip: Number of skipped cloud storage integrations.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage integrations is returned.
Returns:
A list or a generator of the cloud storage integrations that match the criteria.
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
"""
where = DataIntegrationWhere(
data_integration_id=cloud_storage_integration_id,
name=name,
platform=platform,
status=status,
organization_id=organization_id,
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_integrations_gen = DataIntegrationsQuery(self.auth.client)(where, fields, options)
if as_generator:
return data_integrations_gen
return list(data_integrations_gen)
@typechecked
def count_cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
) -> int:
"""Count and return the number of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
Returns:
The number of cloud storage integrations that match the criteria.
"""
where = DataIntegrationWhere(
data_integration_id=cloud_storage_integration_id,
name=name,
platform=platform,
status=status,
organization_id=organization_id,
)
return DataIntegrationsQuery(self.auth.client).count(where)
cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None, fields=['name', 'id', 'platform', 'status'], first=None, skip=0, disable_tqdm=False, *, as_generator=False)
Get a generator or a list of cloud storage integrations that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
name |
Optional[str] |
Name of the cloud storage integration. |
None |
platform |
Optional[typing_extensions.Literal['AWS', 'Azure', 'GCP']] |
Platform of the cloud storage integration. |
None |
status |
Optional[typing_extensions.Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']] |
Status of the cloud storage integration. |
None |
organization_id |
Optional[str] |
ID of the organization. |
None |
fields |
List[str] |
All the fields to request among the possible fields for the cloud storage integrations. See the documentation for all possible fields. |
['name', 'id', 'platform', 'status'] |
first |
Optional[int] |
Maximum number of cloud storage integrations to return. |
None |
skip |
int |
Number of skipped cloud storage integrations. |
0 |
disable_tqdm |
bool |
If |
False |
as_generator |
bool |
If |
False |
Returns:
Type | Description |
---|---|
Iterable[Dict] |
A list or a generator of the cloud storage integrations that match the criteria. |
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
Source code in kili/entrypoints/queries/data_integration/__init__.py
@typechecked
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
fields: List[str] = ["name", "id", "platform", "status"],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
fields: All the fields to request among the possible fields for the cloud storage integrations.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
first: Maximum number of cloud storage integrations to return.
skip: Number of skipped cloud storage integrations.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage integrations is returned.
Returns:
A list or a generator of the cloud storage integrations that match the criteria.
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
"""
where = DataIntegrationWhere(
data_integration_id=cloud_storage_integration_id,
name=name,
platform=platform,
status=status,
organization_id=organization_id,
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_integrations_gen = DataIntegrationsQuery(self.auth.client)(where, fields, options)
if as_generator:
return data_integrations_gen
return list(data_integrations_gen)
count_cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None)
Count and return the number of cloud storage integrations that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
name |
Optional[str] |
Name of the cloud storage integration. |
None |
platform |
Optional[typing_extensions.Literal['AWS', 'Azure', 'GCP']] |
Platform of the cloud storage integration. |
None |
status |
Optional[typing_extensions.Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']] |
Status of the cloud storage integration. |
None |
organization_id |
Optional[str] |
ID of the organization. |
None |
Returns:
Type | Description |
---|---|
int |
The number of cloud storage integrations that match the criteria. |
Source code in kili/entrypoints/queries/data_integration/__init__.py
@typechecked
def count_cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
) -> int:
"""Count and return the number of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
Returns:
The number of cloud storage integrations that match the criteria.
"""
where = DataIntegrationWhere(
data_integration_id=cloud_storage_integration_id,
name=name,
platform=platform,
status=status,
organization_id=organization_id,
)
return DataIntegrationsQuery(self.auth.client).count(where)
Set of cloud storage connection queries.
Source code in kili/entrypoints/queries/data_connection/__init__.py
class QueriesDataConnection:
"""Set of cloud storage connection queries."""
# pylint: disable=too-many-arguments,dangerous-default-value
def __init__(self, auth: KiliAuth):
"""Initialize the subclass.
Args:
auth: KiliAuth object
"""
self.auth = auth
@overload
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: List[str] = [
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: Literal[True],
) -> Generator[Dict, None, None]:
...
@overload
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: List[str] = [
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: Literal[False] = False,
) -> List[Dict]:
...
@typechecked
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: List[str] = [
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage connections that match a set of criteria.
Args:
cloud_storage_connection_id: ID of the cloud storage connection.
cloud_storage_integration_id: ID of the cloud storage integration.
project_id: ID of the project.
fields: All the fields to request among the possible fields for the cloud storage connections.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
first: Maximum number of cloud storage connections to return.
skip: Number of skipped cloud storage connections.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage connections is returned.
Returns:
A list or a generator of the cloud storage connections that match the criteria.
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
"""
if (
cloud_storage_connection_id is None
and cloud_storage_integration_id is None
and project_id is None
):
raise ValueError(
"At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
" project_id must be specified"
)
# call dataConnection resolver
if cloud_storage_connection_id is not None:
data_connection = services.get_data_connection(
self.auth, cloud_storage_connection_id, fields
)
data_connection_list = [data_connection]
if as_generator:
return iter(data_connection_list)
return data_connection_list
# call dataConnections resolver
where = DataConnectionsWhere(
project_id=project_id, data_integration_id=cloud_storage_integration_id
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_connections_gen = DataConnectionsQuery(self.auth.client)(where, fields, options)
if as_generator:
return data_connections_gen
return list(data_connections_gen)
cloud_storage_connections(self, cloud_storage_connection_id=None, cloud_storage_integration_id=None, project_id=None, fields=['id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId'], first=None, skip=0, disable_tqdm=False, *, as_generator=False)
Get a generator or a list of cloud storage connections that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_connection_id |
Optional[str] |
ID of the cloud storage connection. |
None |
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
project_id |
Optional[str] |
ID of the project. |
None |
fields |
List[str] |
All the fields to request among the possible fields for the cloud storage connections. See the documentation for all possible fields. |
['id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId'] |
first |
Optional[int] |
Maximum number of cloud storage connections to return. |
None |
skip |
int |
Number of skipped cloud storage connections. |
0 |
disable_tqdm |
bool |
If |
False |
as_generator |
bool |
If |
False |
Returns:
Type | Description |
---|---|
Iterable[Dict] |
A list or a generator of the cloud storage connections that match the criteria. |
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
Source code in kili/entrypoints/queries/data_connection/__init__.py
@typechecked
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: List[str] = [
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage connections that match a set of criteria.
Args:
cloud_storage_connection_id: ID of the cloud storage connection.
cloud_storage_integration_id: ID of the cloud storage integration.
project_id: ID of the project.
fields: All the fields to request among the possible fields for the cloud storage connections.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
first: Maximum number of cloud storage connections to return.
skip: Number of skipped cloud storage connections.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage connections is returned.
Returns:
A list or a generator of the cloud storage connections that match the criteria.
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
"""
if (
cloud_storage_connection_id is None
and cloud_storage_integration_id is None
and project_id is None
):
raise ValueError(
"At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
" project_id must be specified"
)
# call dataConnection resolver
if cloud_storage_connection_id is not None:
data_connection = services.get_data_connection(
self.auth, cloud_storage_connection_id, fields
)
data_connection_list = [data_connection]
if as_generator:
return iter(data_connection_list)
return data_connection_list
# call dataConnections resolver
where = DataConnectionsWhere(
project_id=project_id, data_integration_id=cloud_storage_integration_id
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_connections_gen = DataConnectionsQuery(self.auth.client)(where, fields, options)
if as_generator:
return data_connections_gen
return list(data_connections_gen)
Mutations
Set of DataConnection mutations.
Source code in kili/entrypoints/mutations/data_connection/__init__.py
class MutationsDataConnection:
"""Set of DataConnection mutations."""
def __init__(self, auth: KiliAuth):
"""Initializes the subclass.
Args:
auth: KiliAuth object
"""
self.auth = auth
@typechecked
def add_cloud_storage_connection(
self,
project_id: str,
cloud_storage_integration_id: str,
selected_folders: Optional[List[str]] = None,
) -> Dict:
"""Connect a cloud storage to a project.
Args:
project_id: ID of the project.
cloud_storage_integration_id: ID of the cloud storage integration.
selected_folders: List of folders of the data integration to connect to the project.
If not provided, all folders of the data integration will be connected.
Returns:
A dict with the DataConnection ID.
"""
if selected_folders is None:
variables = {"dataIntegrationId": cloud_storage_integration_id}
try:
result = self.auth.client.execute(
GQL_GET_DATA_INTEGRATION_FOLDER_AND_SUBFOLDERS, variables=variables
)
except GraphQLError as err:
raise AddDataConnectionError(
f"The data integration with id {cloud_storage_integration_id} is not supported"
" in the SDK yet. Use the Kili app to create a data connection instead."
) from err
result = format_result("data", result)
selected_folders = [folder["key"] for folder in result]
variables = {
"data": {
"projectId": project_id,
"integrationId": cloud_storage_integration_id,
"isChecking": False,
"lastChecked": datetime.now().isoformat(sep="T", timespec="milliseconds") + "Z",
"selectedFolders": selected_folders,
}
}
result = self.auth.client.execute(GQL_ADD_PROJECT_DATA_CONNECTION, variables)
result = format_result("data", result)
# We trigger data difference computation (same behavior as in the frontend)
services.compute_differences(self.auth, result["id"])
return result
@typechecked
def synchronize_cloud_storage_connection(
self,
cloud_storage_connection_id: str,
delete_extraneous_files: bool = False,
) -> Dict:
"""Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project
and then validate the differences.
If `delete_extraneous_files` is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Args:
cloud_storage_connection_id: ID of the cloud storage connection.
delete_extraneous_files: If True, delete extraneous files.
Returns:
A dict with the DataConnection ID.
"""
return services.synchronize_data_connection(
self.auth, cloud_storage_connection_id, delete_extraneous_files
)
add_cloud_storage_connection(self, project_id, cloud_storage_integration_id, selected_folders=None)
Connect a cloud storage to a project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_id |
str |
ID of the project. |
required |
cloud_storage_integration_id |
str |
ID of the cloud storage integration. |
required |
selected_folders |
Optional[List[str]] |
List of folders of the data integration to connect to the project. If not provided, all folders of the data integration will be connected. |
None |
Returns:
Type | Description |
---|---|
Dict |
A dict with the DataConnection ID. |
Source code in kili/entrypoints/mutations/data_connection/__init__.py
@typechecked
def add_cloud_storage_connection(
self,
project_id: str,
cloud_storage_integration_id: str,
selected_folders: Optional[List[str]] = None,
) -> Dict:
"""Connect a cloud storage to a project.
Args:
project_id: ID of the project.
cloud_storage_integration_id: ID of the cloud storage integration.
selected_folders: List of folders of the data integration to connect to the project.
If not provided, all folders of the data integration will be connected.
Returns:
A dict with the DataConnection ID.
"""
if selected_folders is None:
variables = {"dataIntegrationId": cloud_storage_integration_id}
try:
result = self.auth.client.execute(
GQL_GET_DATA_INTEGRATION_FOLDER_AND_SUBFOLDERS, variables=variables
)
except GraphQLError as err:
raise AddDataConnectionError(
f"The data integration with id {cloud_storage_integration_id} is not supported"
" in the SDK yet. Use the Kili app to create a data connection instead."
) from err
result = format_result("data", result)
selected_folders = [folder["key"] for folder in result]
variables = {
"data": {
"projectId": project_id,
"integrationId": cloud_storage_integration_id,
"isChecking": False,
"lastChecked": datetime.now().isoformat(sep="T", timespec="milliseconds") + "Z",
"selectedFolders": selected_folders,
}
}
result = self.auth.client.execute(GQL_ADD_PROJECT_DATA_CONNECTION, variables)
result = format_result("data", result)
# We trigger data difference computation (same behavior as in the frontend)
services.compute_differences(self.auth, result["id"])
return result
synchronize_cloud_storage_connection(self, cloud_storage_connection_id, delete_extraneous_files=False)
Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project and then validate the differences.
If delete_extraneous_files
is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_connection_id |
str |
ID of the cloud storage connection. |
required |
delete_extraneous_files |
bool |
If True, delete extraneous files. |
False |
Returns:
Type | Description |
---|---|
Dict |
A dict with the DataConnection ID. |
Source code in kili/entrypoints/mutations/data_connection/__init__.py
@typechecked
def synchronize_cloud_storage_connection(
self,
cloud_storage_connection_id: str,
delete_extraneous_files: bool = False,
) -> Dict:
"""Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project
and then validate the differences.
If `delete_extraneous_files` is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Args:
cloud_storage_connection_id: ID of the cloud storage connection.
delete_extraneous_files: If True, delete extraneous files.
Returns:
A dict with the DataConnection ID.
"""
return services.synchronize_data_connection(
self.auth, cloud_storage_connection_id, delete_extraneous_files
)