Cloud storage module
Alpha feature
The cloud storage feature is currently in alpha. It is still under active development: methods and behaviors can still evolve until the feature is complete.
Cloud Storage Integration and Connection
A cloud storage integration is a connection between a Kili organization and a cloud storage (AWS, GCP or Azure). Once a cloud storage integration is created, it can be used in any project of the organization. Adding a cloud storage integration from the SDK is currently not supported. More information about how to create a cloud storage integration can be found here.
A cloud storage connection is a cloud storage integration used in a Kili project. It is used to import data from a cloud storage to a project. More information about how to use a cloud storage integration in a project can be found here.
Queries
Set of cloud storage integration queries.
Source code in kili/entrypoints/queries/data_integration/__init__.py
class QueriesDataIntegration:
"""Set of cloud storage integration queries."""
# pylint: disable=too-many-arguments,dangerous-default-value
def __init__(self, auth: KiliAuth):
"""Initialize the subclass.
Args:
auth: KiliAuth object
"""
self.auth = auth
@overload
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
fields: List[str] = ["name", "id", "platform", "status"],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: Literal[True],
) -> Generator[Dict, None, None]:
...
@overload
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
fields: List[str] = ["name", "id", "platform", "status"],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: Literal[False] = False,
) -> List[Dict]:
...
@typechecked
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
fields: List[str] = ["name", "id", "platform", "status"],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
fields: All the fields to request among the possible fields for the cloud storage integrations.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
first: Maximum number of cloud storage integrations to return.
skip: Number of skipped cloud storage integrations.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage integrations is returned.
Returns:
A list or a generator of the cloud storage integrations that match the criteria.
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
"""
where = DataIntegrationWhere(
data_integration_id=cloud_storage_integration_id,
name=name,
platform=platform,
status=status,
organization_id=organization_id,
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_integrations_gen = DataIntegrationsQuery(self.auth.client)(where, fields, options)
if as_generator:
return data_integrations_gen
return list(data_integrations_gen)
@typechecked
def count_cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
) -> int:
"""Count and return the number of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
Returns:
The number of cloud storage integrations that match the criteria.
"""
where = DataIntegrationWhere(
data_integration_id=cloud_storage_integration_id,
name=name,
platform=platform,
status=status,
organization_id=organization_id,
)
return DataIntegrationsQuery(self.auth.client).count(where)
cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None, fields=['name', 'id', 'platform', 'status'], first=None, skip=0, disable_tqdm=False, *, as_generator=False)
Get a generator or a list of cloud storage integrations that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
name |
Optional[str] |
Name of the cloud storage integration. |
None |
platform |
Optional[typing_extensions.Literal['AWS', 'Azure', 'GCP']] |
Platform of the cloud storage integration. |
None |
status |
Optional[typing_extensions.Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']] |
Status of the cloud storage integration. |
None |
organization_id |
Optional[str] |
ID of the organization. |
None |
fields |
List[str] |
All the fields to request among the possible fields for the cloud storage integrations. See the documentation for all possible fields. |
['name', 'id', 'platform', 'status'] |
first |
Optional[int] |
Maximum number of cloud storage integrations to return. |
None |
skip |
int |
Number of skipped cloud storage integrations. |
0 |
disable_tqdm |
bool |
If |
False |
as_generator |
bool |
If |
False |
Returns:
Type | Description |
---|---|
Iterable[Dict] |
A list or a generator of the cloud storage integrations that match the criteria. |
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
Source code in kili/entrypoints/queries/data_integration/__init__.py
@typechecked
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
fields: List[str] = ["name", "id", "platform", "status"],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
fields: All the fields to request among the possible fields for the cloud storage integrations.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
first: Maximum number of cloud storage integrations to return.
skip: Number of skipped cloud storage integrations.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage integrations is returned.
Returns:
A list or a generator of the cloud storage integrations that match the criteria.
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
"""
where = DataIntegrationWhere(
data_integration_id=cloud_storage_integration_id,
name=name,
platform=platform,
status=status,
organization_id=organization_id,
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_integrations_gen = DataIntegrationsQuery(self.auth.client)(where, fields, options)
if as_generator:
return data_integrations_gen
return list(data_integrations_gen)
count_cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None)
Count and return the number of cloud storage integrations that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
name |
Optional[str] |
Name of the cloud storage integration. |
None |
platform |
Optional[typing_extensions.Literal['AWS', 'Azure', 'GCP']] |
Platform of the cloud storage integration. |
None |
status |
Optional[typing_extensions.Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']] |
Status of the cloud storage integration. |
None |
organization_id |
Optional[str] |
ID of the organization. |
None |
Returns:
Type | Description |
---|---|
int |
The number of cloud storage integrations that match the criteria. |
Source code in kili/entrypoints/queries/data_integration/__init__.py
@typechecked
def count_cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
organization_id: Optional[str] = None,
) -> int:
"""Count and return the number of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
Returns:
The number of cloud storage integrations that match the criteria.
"""
where = DataIntegrationWhere(
data_integration_id=cloud_storage_integration_id,
name=name,
platform=platform,
status=status,
organization_id=organization_id,
)
return DataIntegrationsQuery(self.auth.client).count(where)
Set of cloud storage connection queries.
Source code in kili/entrypoints/queries/data_connection/__init__.py
class QueriesDataConnection:
"""Set of cloud storage connection queries."""
# pylint: disable=too-many-arguments,dangerous-default-value
def __init__(self, auth: KiliAuth):
"""Initialize the subclass.
Args:
auth: KiliAuth object
"""
self.auth = auth
@overload
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: List[str] = [
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: Literal[True],
) -> Generator[Dict, None, None]:
...
@overload
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: List[str] = [
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: Literal[False] = False,
) -> List[Dict]:
...
@typechecked
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: List[str] = [
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage connections that match a set of criteria.
Args:
cloud_storage_connection_id: ID of the cloud storage connection.
cloud_storage_integration_id: ID of the cloud storage integration.
project_id: ID of the project.
fields: All the fields to request among the possible fields for the cloud storage connections.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
first: Maximum number of cloud storage connections to return.
skip: Number of skipped cloud storage connections.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage connections is returned.
Returns:
A list or a generator of the cloud storage connections that match the criteria.
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
"""
if (
cloud_storage_connection_id is None
and cloud_storage_integration_id is None
and project_id is None
):
raise ValueError(
"At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
" project_id must be specified"
)
# call dataConnection resolver
if cloud_storage_connection_id is not None:
data_connection = services.get_data_connection(
self.auth, cloud_storage_connection_id, fields
)
data_connection_list = [data_connection]
if as_generator:
return iter(data_connection_list)
return data_connection_list
# call dataConnections resolver
where = DataConnectionsWhere(
project_id=project_id, data_integration_id=cloud_storage_integration_id
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_connections_gen = DataConnectionsQuery(self.auth.client)(where, fields, options)
if as_generator:
return data_connections_gen
return list(data_connections_gen)
cloud_storage_connections(self, cloud_storage_connection_id=None, cloud_storage_integration_id=None, project_id=None, fields=['id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId'], first=None, skip=0, disable_tqdm=False, *, as_generator=False)
Get a generator or a list of cloud storage connections that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_connection_id |
Optional[str] |
ID of the cloud storage connection. |
None |
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
project_id |
Optional[str] |
ID of the project. |
None |
fields |
List[str] |
All the fields to request among the possible fields for the cloud storage connections. See the documentation for all possible fields. |
['id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId'] |
first |
Optional[int] |
Maximum number of cloud storage connections to return. |
None |
skip |
int |
Number of skipped cloud storage connections. |
0 |
disable_tqdm |
bool |
If |
False |
as_generator |
bool |
If |
False |
Returns:
Type | Description |
---|---|
Iterable[Dict] |
A list or a generator of the cloud storage connections that match the criteria. |
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
Source code in kili/entrypoints/queries/data_connection/__init__.py
@typechecked
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: List[str] = [
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
],
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: bool = False,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage connections that match a set of criteria.
Args:
cloud_storage_connection_id: ID of the cloud storage connection.
cloud_storage_integration_id: ID of the cloud storage integration.
project_id: ID of the project.
fields: All the fields to request among the possible fields for the cloud storage connections.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
first: Maximum number of cloud storage connections to return.
skip: Number of skipped cloud storage connections.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage connections is returned.
Returns:
A list or a generator of the cloud storage connections that match the criteria.
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
"""
if (
cloud_storage_connection_id is None
and cloud_storage_integration_id is None
and project_id is None
):
raise ValueError(
"At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
" project_id must be specified"
)
# call dataConnection resolver
if cloud_storage_connection_id is not None:
data_connection = services.get_data_connection(
self.auth, cloud_storage_connection_id, fields
)
data_connection_list = [data_connection]
if as_generator:
return iter(data_connection_list)
return data_connection_list
# call dataConnections resolver
where = DataConnectionsWhere(
project_id=project_id, data_integration_id=cloud_storage_integration_id
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_connections_gen = DataConnectionsQuery(self.auth.client)(where, fields, options)
if as_generator:
return data_connections_gen
return list(data_connections_gen)
Mutations
Set of DataConnection mutations.
Source code in kili/entrypoints/mutations/data_connection/__init__.py
class MutationsDataConnection:
"""Set of DataConnection mutations."""
def __init__(self, auth: KiliAuth):
"""Initializes the subclass.
Args:
auth: KiliAuth object
"""
self.auth = auth
@typechecked
def add_cloud_storage_connection(
self,
project_id: str,
cloud_storage_integration_id: str,
selected_folders: Optional[List[str]] = None,
) -> Dict:
"""Connect a cloud storage to a project.
Args:
project_id: ID of the project.
cloud_storage_integration_id: ID of the cloud storage integration.
selected_folders: List of folders of the data integration to connect to the project.
If not provided, all folders of the data integration will be connected.
Returns:
A dict with the DataConnection ID.
"""
if selected_folders is None:
variables = {"dataIntegrationId": cloud_storage_integration_id}
try:
result = self.auth.client.execute(
GQL_GET_DATA_INTEGRATION_FOLDER_AND_SUBFOLDERS, variables=variables
)
except GraphQLError as err:
raise AddDataConnectionError(
f"The data integration with id {cloud_storage_integration_id} is not supported"
" in the SDK yet. Use the Kili app to create a data connection instead."
) from err
result = format_result("data", result)
selected_folders = [folder["key"] for folder in result]
variables = {
"data": {
"projectId": project_id,
"integrationId": cloud_storage_integration_id,
"isChecking": False,
"lastChecked": datetime.now().isoformat(sep="T", timespec="milliseconds") + "Z",
"selectedFolders": selected_folders,
}
}
result = self.auth.client.execute(GQL_ADD_PROJECT_DATA_CONNECTION, variables)
result = format_result("data", result)
# We trigger data difference computation (same behavior as in the frontend)
services.compute_differences(self.auth, result["id"])
return result
@typechecked
def synchronize_cloud_storage_connection(
self,
cloud_storage_connection_id: str,
delete_extraneous_files: bool = False,
) -> Dict:
# pylint: disable=line-too-long
"""Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project,
and then validate the differences.
If `delete_extraneous_files` is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Args:
cloud_storage_connection_id: ID of the cloud storage connection.
delete_extraneous_files: If True, delete extraneous files.
Returns:
A dict with the DataConnection ID.
!!! warning "Azure Blob Storage"
This method currently does not work for Azure cloud storage connection using credentials mode.
Use service account mode instead or use the Kili app to synchronize the data connection.
More information in the [documentation](https://docs.kili-technology.com/docs/creating-an-azure-blob-storage-integration#service-creds).
"""
return services.synchronize_data_connection(
self.auth, cloud_storage_connection_id, delete_extraneous_files
)
add_cloud_storage_connection(self, project_id, cloud_storage_integration_id, selected_folders=None)
Connect a cloud storage to a project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_id |
str |
ID of the project. |
required |
cloud_storage_integration_id |
str |
ID of the cloud storage integration. |
required |
selected_folders |
Optional[List[str]] |
List of folders of the data integration to connect to the project. If not provided, all folders of the data integration will be connected. |
None |
Returns:
Type | Description |
---|---|
Dict |
A dict with the DataConnection ID. |
Source code in kili/entrypoints/mutations/data_connection/__init__.py
@typechecked
def add_cloud_storage_connection(
self,
project_id: str,
cloud_storage_integration_id: str,
selected_folders: Optional[List[str]] = None,
) -> Dict:
"""Connect a cloud storage to a project.
Args:
project_id: ID of the project.
cloud_storage_integration_id: ID of the cloud storage integration.
selected_folders: List of folders of the data integration to connect to the project.
If not provided, all folders of the data integration will be connected.
Returns:
A dict with the DataConnection ID.
"""
if selected_folders is None:
variables = {"dataIntegrationId": cloud_storage_integration_id}
try:
result = self.auth.client.execute(
GQL_GET_DATA_INTEGRATION_FOLDER_AND_SUBFOLDERS, variables=variables
)
except GraphQLError as err:
raise AddDataConnectionError(
f"The data integration with id {cloud_storage_integration_id} is not supported"
" in the SDK yet. Use the Kili app to create a data connection instead."
) from err
result = format_result("data", result)
selected_folders = [folder["key"] for folder in result]
variables = {
"data": {
"projectId": project_id,
"integrationId": cloud_storage_integration_id,
"isChecking": False,
"lastChecked": datetime.now().isoformat(sep="T", timespec="milliseconds") + "Z",
"selectedFolders": selected_folders,
}
}
result = self.auth.client.execute(GQL_ADD_PROJECT_DATA_CONNECTION, variables)
result = format_result("data", result)
# We trigger data difference computation (same behavior as in the frontend)
services.compute_differences(self.auth, result["id"])
return result
synchronize_cloud_storage_connection(self, cloud_storage_connection_id, delete_extraneous_files=False)
Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project, and then validate the differences.
If delete_extraneous_files
is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_connection_id |
str |
ID of the cloud storage connection. |
required |
delete_extraneous_files |
bool |
If True, delete extraneous files. |
False |
Returns:
Type | Description |
---|---|
Dict |
A dict with the DataConnection ID. |
Azure Blob Storage
This method currently does not work for Azure cloud storage connection using credentials mode. Use service account mode instead or use the Kili app to synchronize the data connection. More information in the documentation.
Source code in kili/entrypoints/mutations/data_connection/__init__.py
@typechecked
def synchronize_cloud_storage_connection(
self,
cloud_storage_connection_id: str,
delete_extraneous_files: bool = False,
) -> Dict:
# pylint: disable=line-too-long
"""Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project,
and then validate the differences.
If `delete_extraneous_files` is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Args:
cloud_storage_connection_id: ID of the cloud storage connection.
delete_extraneous_files: If True, delete extraneous files.
Returns:
A dict with the DataConnection ID.
!!! warning "Azure Blob Storage"
This method currently does not work for Azure cloud storage connection using credentials mode.
Use service account mode instead or use the Kili app to synchronize the data connection.
More information in the [documentation](https://docs.kili-technology.com/docs/creating-an-azure-blob-storage-integration#service-creds).
"""
return services.synchronize_data_connection(
self.auth, cloud_storage_connection_id, delete_extraneous_files
)