Cloud storage module
Alpha feature
The cloud storage feature is currently in alpha. It is still under active development: methods and behaviors can still evolve until the feature is complete.
Cloud Storage Integration and Connection
A cloud storage integration is a connection between a Kili organization and a cloud storage (AWS, GCP or Azure). Once a cloud storage integration is created, it can be used in any project of the organization. Adding a cloud storage integration from the SDK is currently not supported. More information about how to create a cloud storage integration can be found here.
A cloud storage connection is a cloud storage integration used in a Kili project. It is used to import data from a cloud storage to a project. More information about how to use a cloud storage integration in a project can be found here.
Azure
It is recommended to install the Azure dependencies to use the Azure cloud storage integration and connection.
pip install kili[azure]
Methods attached to the Kili client, to run actions on cloud storage.
Source code in kili/presentation/client/cloud_storage.py
class CloudStorageClientMethods(BaseClientMethods):
"""Methods attached to the Kili client, to run actions on cloud storage."""
@overload
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: ListOrTuple[str] = (
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: Literal[True],
) -> Generator[Dict, None, None]:
...
@overload
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: ListOrTuple[str] = (
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: Literal[False] = False,
) -> List[Dict]:
...
@typechecked
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: ListOrTuple[str] = (
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage connections that match a set of criteria.
Args:
cloud_storage_connection_id: ID of the cloud storage connection.
cloud_storage_integration_id: ID of the cloud storage integration.
project_id: ID of the project.
fields: All the fields to request among the possible fields for the cloud storage connections.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
first: Maximum number of cloud storage connections to return.
skip: Number of skipped cloud storage connections.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage connections is returned.
Returns:
A list or a generator of the cloud storage connections that match the criteria.
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
"""
if (
cloud_storage_connection_id is None
and cloud_storage_integration_id is None
and project_id is None
):
raise ValueError(
"At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
" project_id must be specified"
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)
if cloud_storage_connection_id is None:
data_connections_gen = cloud_storage_use_cases.list_data_connections(
data_connection_filters=DataConnectionFilters(
project_id=ProjectId(project_id) if project_id is not None else None,
integration_id=(
DataIntegrationId(cloud_storage_integration_id)
if cloud_storage_integration_id is not None
else None
),
),
fields=fields,
options=QueryOptions(disable_tqdm, first, skip),
)
else:
data_connections_gen = (
i
for i in [
cloud_storage_use_cases.get_data_connection(
DataConnectionId(cloud_storage_connection_id), fields=fields
)
]
)
if as_generator:
return data_connections_gen
return list(data_connections_gen)
@overload
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[DataIntegrationPlatform] = None,
status: Optional[DataIntegrationStatus] = None,
organization_id: Optional[str] = None,
fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: Literal[True],
) -> Generator[Dict, None, None]:
...
@overload
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[DataIntegrationPlatform] = None,
status: Optional[DataIntegrationStatus] = None,
organization_id: Optional[str] = None,
fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: Literal[False] = False,
) -> List[Dict]:
...
@typechecked
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[DataIntegrationPlatform] = None,
status: Optional[DataIntegrationStatus] = None,
organization_id: Optional[str] = None,
fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
fields: All the fields to request among the possible fields for the cloud storage integrations.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
first: Maximum number of cloud storage integrations to return.
skip: Number of skipped cloud storage integrations.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage integrations is returned.
Returns:
A list or a generator of the cloud storage integrations that match the criteria.
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
"""
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_integrations_gen = CloudStorageUseCases(self.kili_api_gateway).list_data_integrations(
data_integration_filters=DataIntegrationFilters(
status=status,
id=(
DataIntegrationId(cloud_storage_integration_id)
if cloud_storage_integration_id is not None
else None
),
name=name,
platform=platform,
organization_id=(
OrganizationId(organization_id) if organization_id is not None else None
),
),
fields=fields,
options=options,
)
if as_generator:
return data_integrations_gen
return list(data_integrations_gen)
@typechecked
def count_cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[DataIntegrationPlatform] = None,
status: Optional[DataIntegrationStatus] = None,
organization_id: Optional[str] = None,
) -> int:
"""Count and return the number of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
Returns:
The number of cloud storage integrations that match the criteria.
"""
return CloudStorageUseCases(self.kili_api_gateway).count_data_integrations(
DataIntegrationFilters(
status=status,
id=(
DataIntegrationId(cloud_storage_integration_id)
if cloud_storage_integration_id is not None
else None
),
name=name,
platform=platform,
organization_id=(
OrganizationId(organization_id) if organization_id is not None else None
),
)
)
@typechecked
def add_cloud_storage_connection(
self,
project_id: str,
cloud_storage_integration_id: str,
selected_folders: Optional[List[str]] = None,
prefix: Optional[str] = None,
include: Optional[List[str]] = None,
exclude: Optional[List[str]] = None,
) -> Dict:
"""Connect a cloud storage to a project. More details about parameters
can be found in the [documentation](https://docs.kili-technology.com/docs/filtering-assets-from-cloud-storage).
Args:
project_id: Id of the project.
cloud_storage_integration_id: Id of the cloud storage integration.
selected_folders: List of folders of the data integration to connect to the project.
If not provided, all folders of the data integration will be connected.
This option is deprecated and will be removed in the future.
prefix: Filter files to synchronize based on their base path.
include: List of pattern used to include files based on their path.
exclude: List of pattern used to exclude files based on their path.
Returns:
A dict with the DataConnection Id.
"""
if selected_folders is not None:
logger.warning(
"The selected_folders argument is deprecated and will be removed in the future."
)
data_connection_id = CloudStorageUseCases(self.kili_api_gateway).add_data_connection(
project_id=ProjectId(project_id),
data_integration_id=DataIntegrationId(cloud_storage_integration_id),
selected_folders=selected_folders,
fields=("id",),
prefix=prefix,
include=include,
exclude=exclude,
)["id"]
return {"id": data_connection_id}
@typechecked
def synchronize_cloud_storage_connection(
self,
cloud_storage_connection_id: str,
delete_extraneous_files: bool = False,
dry_run: bool = False,
) -> Dict:
"""Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project,
and then validate the differences.
If `delete_extraneous_files` is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Args:
cloud_storage_connection_id: Id of the cloud storage connection.
delete_extraneous_files: If True, delete extraneous files.
dry_run: If True, will not synchronize the data connection but only print the
differences. This is useful to check the differences before applying them to the
project.
Returns:
A dict with the cloud storage connection Id.
"""
data_connection_id = DataConnectionId(cloud_storage_connection_id)
cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)
cloud_storage_use_cases.synchronize_data_connection(
data_connection_id=data_connection_id,
delete_extraneous_files=delete_extraneous_files,
dry_run=dry_run,
logger=logger,
)
return cloud_storage_use_cases.get_data_connection(
data_connection_id=data_connection_id, fields=("numberOfAssets", "projectId")
)
@typechecked
def create_cloud_storage_integration(
self,
platform: DataIntegrationPlatform,
name: str,
fields: ListOrTuple[str] = (
"id",
"name",
"status",
"platform",
"allowedPaths",
),
allowed_paths: Optional[List[str]] = None,
allowed_projects: Optional[List[str]] = None,
aws_access_point_arn: Optional[str] = None,
aws_role_arn: Optional[str] = None,
aws_role_external_id: Optional[str] = None,
azure_connection_url: Optional[str] = None,
azure_is_using_service_credentials: Optional[bool] = None,
azure_sas_token: Optional[str] = None,
azure_tenant_id: Optional[str] = None,
gcp_bucket_name: Optional[str] = None,
include_root_files: Optional[str] = None,
internal_processing_authorized: Optional[str] = None,
s3_access_key: Optional[str] = None,
s3_bucket_name: Optional[str] = None,
s3_endpoint: Optional[str] = None,
s3_region: Optional[str] = None,
s3_secret_key: Optional[str] = None,
s3_session_token: Optional[str] = None,
) -> Dict:
# pylint: disable=line-too-long
"""Create a cloud storage integration.
Args:
fields: All the fields to request among the possible fields for the cloud storage integration.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
allowed_paths: List of allowed paths.
allowed_projects: List of allowed projects.
aws_access_point_arn: AWS access point ARN.
aws_role_arn: AWS role ARN.
aws_role_external_id: AWS role external ID.
azure_connection_url: Azure connection URL.
azure_is_using_service_credentials: Whether Azure is using service credentials.
azure_sas_token: Azure SAS token.
azure_tenant_id: Azure tenant ID.
gcp_bucket_name: GCP bucket name.
include_root_files: Whether to include root files.
internal_processing_authorized: Whether internal processing is authorized.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
s3_access_key: S3 access key.
s3_bucket_name: S3 bucket name.
s3_endpoint: S3 endpoint.
s3_region: S3 region.
s3_secret_key: S3 secret key.
s3_session_token: S3 session token.
"""
cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)
return cloud_storage_use_cases.create_data_integration(
platform=platform,
name=name,
fields=fields,
allowed_paths=allowed_paths,
allowed_projects=allowed_projects,
aws_access_point_arn=aws_access_point_arn,
aws_role_arn=aws_role_arn,
aws_role_external_id=aws_role_external_id,
azure_connection_url=azure_connection_url,
azure_is_using_service_credentials=azure_is_using_service_credentials,
azure_sas_token=azure_sas_token,
azure_tenant_id=azure_tenant_id,
gcp_bucket_name=gcp_bucket_name,
include_root_files=include_root_files,
internal_processing_authorized=internal_processing_authorized,
s3_access_key=s3_access_key,
s3_bucket_name=s3_bucket_name,
s3_endpoint=s3_endpoint,
s3_region=s3_region,
s3_secret_key=s3_secret_key,
s3_session_token=s3_session_token,
)
@typechecked
def update_cloud_storage_integration(
self,
cloud_storage_integration_id: str,
allowed_paths: Optional[List[str]] = None,
allowed_projects: Optional[List[str]] = None,
aws_access_point_arn: Optional[str] = None,
aws_role_arn: Optional[str] = None,
aws_role_external_id: Optional[str] = None,
azure_connection_url: Optional[str] = None,
azure_is_using_service_credentials: Optional[bool] = None,
azure_sas_token: Optional[str] = None,
azure_tenant_id: Optional[str] = None,
gcp_bucket_name: Optional[str] = None,
include_root_files: Optional[str] = None,
internal_processing_authorized: Optional[str] = None,
name: Optional[str] = None,
organization_id: Optional[str] = None,
platform: Optional[DataIntegrationPlatform] = None,
status: Optional[DataIntegrationStatus] = None,
s3_access_key: Optional[str] = None,
s3_bucket_name: Optional[str] = None,
s3_endpoint: Optional[str] = None,
s3_region: Optional[str] = None,
s3_secret_key: Optional[str] = None,
s3_session_token: Optional[str] = None,
) -> Dict:
"""Update cloud storage data integration.
Args:
allowed_paths: List of allowed paths.
allowed_projects: List of allowed projects.
aws_access_point_arn: AWS access point ARN.
aws_role_arn: AWS role ARN.
aws_role_external_id: AWS role external ID.
azure_connection_url: Azure connection URL.
azure_is_using_service_credentials: Whether Azure is using service credentials.
azure_sas_token: Azure SAS token.
azure_tenant_id: Azure tenant ID.
cloud_storage_integration_id: Data integration ID.
gcp_bucket_name: GCP bucket name.
include_root_files: Whether to include root files.
internal_processing_authorized: Whether internal processing is authorized.
organization_id: Organization ID.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
s3_access_key: S3 access key.
s3_bucket_name: S3 bucket name.
s3_endpoint: S3 endpoint.
s3_region: S3 region.
s3_secret_key: S3 secret key.
s3_session_token: S3 session token.
"""
return CloudStorageUseCases(self.kili_api_gateway).update_data_integration(
data_integration_id=DataIntegrationId(cloud_storage_integration_id),
name=name,
platform=platform,
allowed_paths=allowed_paths,
allowed_projects=allowed_projects,
aws_access_point_arn=aws_access_point_arn,
aws_role_arn=aws_role_arn,
aws_role_external_id=aws_role_external_id,
azure_connection_url=azure_connection_url,
azure_is_using_service_credentials=azure_is_using_service_credentials,
azure_sas_token=azure_sas_token,
azure_tenant_id=azure_tenant_id,
gcp_bucket_name=gcp_bucket_name,
include_root_files=include_root_files,
internal_processing_authorized=internal_processing_authorized,
organization_id=organization_id,
s3_access_key=s3_access_key,
s3_bucket_name=s3_bucket_name,
s3_endpoint=s3_endpoint,
s3_region=s3_region,
s3_secret_key=s3_secret_key,
s3_session_token=s3_session_token,
status=status,
)
@typechecked
def delete_cloud_storage_integration(self, cloud_storage_integration_id: str) -> str:
"""Delete a cloud storage integration.
Args:
cloud_storage_integration_id: Id of the cloud storage integration.
"""
cloud_storage_integration_id = DataIntegrationId(cloud_storage_integration_id)
cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)
return cloud_storage_use_cases.delete_data_integration(
data_integration_id=cloud_storage_integration_id
)
add_cloud_storage_connection(self, project_id, cloud_storage_integration_id, selected_folders=None, prefix=None, include=None, exclude=None)
Connect a cloud storage to a project. More details about parameters can be found in the documentation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_id |
str |
Id of the project. |
required |
cloud_storage_integration_id |
str |
Id of the cloud storage integration. |
required |
selected_folders |
Optional[List[str]] |
List of folders of the data integration to connect to the project. If not provided, all folders of the data integration will be connected. This option is deprecated and will be removed in the future. |
None |
prefix |
Optional[str] |
Filter files to synchronize based on their base path. |
None |
include |
Optional[List[str]] |
List of pattern used to include files based on their path. |
None |
exclude |
Optional[List[str]] |
List of pattern used to exclude files based on their path. |
None |
Returns:
Type | Description |
---|---|
Dict |
A dict with the DataConnection Id. |
Source code in kili/presentation/client/cloud_storage.py
def add_cloud_storage_connection(
self,
project_id: str,
cloud_storage_integration_id: str,
selected_folders: Optional[List[str]] = None,
prefix: Optional[str] = None,
include: Optional[List[str]] = None,
exclude: Optional[List[str]] = None,
) -> Dict:
"""Connect a cloud storage to a project. More details about parameters
can be found in the [documentation](https://docs.kili-technology.com/docs/filtering-assets-from-cloud-storage).
Args:
project_id: Id of the project.
cloud_storage_integration_id: Id of the cloud storage integration.
selected_folders: List of folders of the data integration to connect to the project.
If not provided, all folders of the data integration will be connected.
This option is deprecated and will be removed in the future.
prefix: Filter files to synchronize based on their base path.
include: List of pattern used to include files based on their path.
exclude: List of pattern used to exclude files based on their path.
Returns:
A dict with the DataConnection Id.
"""
if selected_folders is not None:
logger.warning(
"The selected_folders argument is deprecated and will be removed in the future."
)
data_connection_id = CloudStorageUseCases(self.kili_api_gateway).add_data_connection(
project_id=ProjectId(project_id),
data_integration_id=DataIntegrationId(cloud_storage_integration_id),
selected_folders=selected_folders,
fields=("id",),
prefix=prefix,
include=include,
exclude=exclude,
)["id"]
return {"id": data_connection_id}
cloud_storage_connections(self, cloud_storage_connection_id=None, cloud_storage_integration_id=None, project_id=None, fields=('id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId'), first=None, skip=0, disable_tqdm=None, *, as_generator=False)
Get a generator or a list of cloud storage connections that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_connection_id |
Optional[str] |
ID of the cloud storage connection. |
None |
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
project_id |
Optional[str] |
ID of the project. |
None |
fields |
Union[List[str], Tuple[str, ...]] |
All the fields to request among the possible fields for the cloud storage connections. See the documentation for all possible fields. |
('id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId') |
first |
Optional[int] |
Maximum number of cloud storage connections to return. |
None |
skip |
int |
Number of skipped cloud storage connections. |
0 |
disable_tqdm |
Optional[bool] |
If |
None |
as_generator |
bool |
If |
False |
Returns:
Type | Description |
---|---|
Iterable[Dict] |
A list or a generator of the cloud storage connections that match the criteria. |
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
Source code in kili/presentation/client/cloud_storage.py
def cloud_storage_connections(
self,
cloud_storage_connection_id: Optional[str] = None,
cloud_storage_integration_id: Optional[str] = None,
project_id: Optional[str] = None,
fields: ListOrTuple[str] = (
"id",
"lastChecked",
"numberOfAssets",
"selectedFolders",
"projectId",
),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage connections that match a set of criteria.
Args:
cloud_storage_connection_id: ID of the cloud storage connection.
cloud_storage_integration_id: ID of the cloud storage integration.
project_id: ID of the project.
fields: All the fields to request among the possible fields for the cloud storage connections.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
first: Maximum number of cloud storage connections to return.
skip: Number of skipped cloud storage connections.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage connections is returned.
Returns:
A list or a generator of the cloud storage connections that match the criteria.
Examples:
>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
"""
if (
cloud_storage_connection_id is None
and cloud_storage_integration_id is None
and project_id is None
):
raise ValueError(
"At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
" project_id must be specified"
)
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)
if cloud_storage_connection_id is None:
data_connections_gen = cloud_storage_use_cases.list_data_connections(
data_connection_filters=DataConnectionFilters(
project_id=ProjectId(project_id) if project_id is not None else None,
integration_id=(
DataIntegrationId(cloud_storage_integration_id)
if cloud_storage_integration_id is not None
else None
),
),
fields=fields,
options=QueryOptions(disable_tqdm, first, skip),
)
else:
data_connections_gen = (
i
for i in [
cloud_storage_use_cases.get_data_connection(
DataConnectionId(cloud_storage_connection_id), fields=fields
)
]
)
if as_generator:
return data_connections_gen
return list(data_connections_gen)
cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None, fields=('name', 'id', 'platform', 'status'), first=None, skip=0, disable_tqdm=None, *, as_generator=False)
Get a generator or a list of cloud storage integrations that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
name |
Optional[str] |
Name of the cloud storage integration. |
None |
platform |
Optional[Literal['AWS', 'Azure', 'GCP', 'CustomS3']] |
Platform of the cloud storage integration. |
None |
status |
Optional[Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']] |
Status of the cloud storage integration. |
None |
organization_id |
Optional[str] |
ID of the organization. |
None |
fields |
Union[List[str], Tuple[str, ...]] |
All the fields to request among the possible fields for the cloud storage integrations. See the documentation for all possible fields. |
('name', 'id', 'platform', 'status') |
first |
Optional[int] |
Maximum number of cloud storage integrations to return. |
None |
skip |
int |
Number of skipped cloud storage integrations. |
0 |
disable_tqdm |
Optional[bool] |
If |
None |
as_generator |
bool |
If |
False |
Returns:
Type | Description |
---|---|
Iterable[Dict] |
A list or a generator of the cloud storage integrations that match the criteria. |
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
Source code in kili/presentation/client/cloud_storage.py
def cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[DataIntegrationPlatform] = None,
status: Optional[DataIntegrationStatus] = None,
organization_id: Optional[str] = None,
fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
first: Optional[int] = None,
skip: int = 0,
disable_tqdm: Optional[bool] = None,
*,
as_generator: bool = False,
) -> Iterable[Dict]:
# pylint: disable=line-too-long
"""Get a generator or a list of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
fields: All the fields to request among the possible fields for the cloud storage integrations.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
first: Maximum number of cloud storage integrations to return.
skip: Number of skipped cloud storage integrations.
disable_tqdm: If `True`, the progress bar will be disabled.
as_generator: If `True`, a generator on the cloud storage integrations is returned.
Returns:
A list or a generator of the cloud storage integrations that match the criteria.
Examples:
>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
"""
disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
options = QueryOptions(disable_tqdm, first, skip)
data_integrations_gen = CloudStorageUseCases(self.kili_api_gateway).list_data_integrations(
data_integration_filters=DataIntegrationFilters(
status=status,
id=(
DataIntegrationId(cloud_storage_integration_id)
if cloud_storage_integration_id is not None
else None
),
name=name,
platform=platform,
organization_id=(
OrganizationId(organization_id) if organization_id is not None else None
),
),
fields=fields,
options=options,
)
if as_generator:
return data_integrations_gen
return list(data_integrations_gen)
count_cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None)
Count and return the number of cloud storage integrations that match a set of criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_integration_id |
Optional[str] |
ID of the cloud storage integration. |
None |
name |
Optional[str] |
Name of the cloud storage integration. |
None |
platform |
Optional[Literal['AWS', 'Azure', 'GCP', 'CustomS3']] |
Platform of the cloud storage integration. |
None |
status |
Optional[Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']] |
Status of the cloud storage integration. |
None |
organization_id |
Optional[str] |
ID of the organization. |
None |
Returns:
Type | Description |
---|---|
int |
The number of cloud storage integrations that match the criteria. |
Source code in kili/presentation/client/cloud_storage.py
def count_cloud_storage_integrations(
self,
cloud_storage_integration_id: Optional[str] = None,
name: Optional[str] = None,
platform: Optional[DataIntegrationPlatform] = None,
status: Optional[DataIntegrationStatus] = None,
organization_id: Optional[str] = None,
) -> int:
"""Count and return the number of cloud storage integrations that match a set of criteria.
Args:
cloud_storage_integration_id: ID of the cloud storage integration.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
organization_id: ID of the organization.
Returns:
The number of cloud storage integrations that match the criteria.
"""
return CloudStorageUseCases(self.kili_api_gateway).count_data_integrations(
DataIntegrationFilters(
status=status,
id=(
DataIntegrationId(cloud_storage_integration_id)
if cloud_storage_integration_id is not None
else None
),
name=name,
platform=platform,
organization_id=(
OrganizationId(organization_id) if organization_id is not None else None
),
)
)
create_cloud_storage_integration(self, platform, name, fields=('id', 'name', 'status', 'platform', 'allowedPaths'), allowed_paths=None, allowed_projects=None, aws_access_point_arn=None, aws_role_arn=None, aws_role_external_id=None, azure_connection_url=None, azure_is_using_service_credentials=None, azure_sas_token=None, azure_tenant_id=None, gcp_bucket_name=None, include_root_files=None, internal_processing_authorized=None, s3_access_key=None, s3_bucket_name=None, s3_endpoint=None, s3_region=None, s3_secret_key=None, s3_session_token=None)
Create a cloud storage integration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fields |
Union[List[str], Tuple[str, ...]] |
All the fields to request among the possible fields for the cloud storage integration. See the documentation for all possible fields. |
('id', 'name', 'status', 'platform', 'allowedPaths') |
allowed_paths |
Optional[List[str]] |
List of allowed paths. |
None |
allowed_projects |
Optional[List[str]] |
List of allowed projects. |
None |
aws_access_point_arn |
Optional[str] |
AWS access point ARN. |
None |
aws_role_arn |
Optional[str] |
AWS role ARN. |
None |
aws_role_external_id |
Optional[str] |
AWS role external ID. |
None |
azure_connection_url |
Optional[str] |
Azure connection URL. |
None |
azure_is_using_service_credentials |
Optional[bool] |
Whether Azure is using service credentials. |
None |
azure_sas_token |
Optional[str] |
Azure SAS token. |
None |
azure_tenant_id |
Optional[str] |
Azure tenant ID. |
None |
gcp_bucket_name |
Optional[str] |
GCP bucket name. |
None |
include_root_files |
Optional[str] |
Whether to include root files. |
None |
internal_processing_authorized |
Optional[str] |
Whether internal processing is authorized. |
None |
name |
str |
Name of the cloud storage integration. |
required |
platform |
Literal['AWS', 'Azure', 'GCP', 'CustomS3'] |
Platform of the cloud storage integration. |
required |
s3_access_key |
Optional[str] |
S3 access key. |
None |
s3_bucket_name |
Optional[str] |
S3 bucket name. |
None |
s3_endpoint |
Optional[str] |
S3 endpoint. |
None |
s3_region |
Optional[str] |
S3 region. |
None |
s3_secret_key |
Optional[str] |
S3 secret key. |
None |
s3_session_token |
Optional[str] |
S3 session token. |
None |
Source code in kili/presentation/client/cloud_storage.py
def create_cloud_storage_integration(
self,
platform: DataIntegrationPlatform,
name: str,
fields: ListOrTuple[str] = (
"id",
"name",
"status",
"platform",
"allowedPaths",
),
allowed_paths: Optional[List[str]] = None,
allowed_projects: Optional[List[str]] = None,
aws_access_point_arn: Optional[str] = None,
aws_role_arn: Optional[str] = None,
aws_role_external_id: Optional[str] = None,
azure_connection_url: Optional[str] = None,
azure_is_using_service_credentials: Optional[bool] = None,
azure_sas_token: Optional[str] = None,
azure_tenant_id: Optional[str] = None,
gcp_bucket_name: Optional[str] = None,
include_root_files: Optional[str] = None,
internal_processing_authorized: Optional[str] = None,
s3_access_key: Optional[str] = None,
s3_bucket_name: Optional[str] = None,
s3_endpoint: Optional[str] = None,
s3_region: Optional[str] = None,
s3_secret_key: Optional[str] = None,
s3_session_token: Optional[str] = None,
) -> Dict:
# pylint: disable=line-too-long
"""Create a cloud storage integration.
Args:
fields: All the fields to request among the possible fields for the cloud storage integration.
See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
allowed_paths: List of allowed paths.
allowed_projects: List of allowed projects.
aws_access_point_arn: AWS access point ARN.
aws_role_arn: AWS role ARN.
aws_role_external_id: AWS role external ID.
azure_connection_url: Azure connection URL.
azure_is_using_service_credentials: Whether Azure is using service credentials.
azure_sas_token: Azure SAS token.
azure_tenant_id: Azure tenant ID.
gcp_bucket_name: GCP bucket name.
include_root_files: Whether to include root files.
internal_processing_authorized: Whether internal processing is authorized.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
s3_access_key: S3 access key.
s3_bucket_name: S3 bucket name.
s3_endpoint: S3 endpoint.
s3_region: S3 region.
s3_secret_key: S3 secret key.
s3_session_token: S3 session token.
"""
cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)
return cloud_storage_use_cases.create_data_integration(
platform=platform,
name=name,
fields=fields,
allowed_paths=allowed_paths,
allowed_projects=allowed_projects,
aws_access_point_arn=aws_access_point_arn,
aws_role_arn=aws_role_arn,
aws_role_external_id=aws_role_external_id,
azure_connection_url=azure_connection_url,
azure_is_using_service_credentials=azure_is_using_service_credentials,
azure_sas_token=azure_sas_token,
azure_tenant_id=azure_tenant_id,
gcp_bucket_name=gcp_bucket_name,
include_root_files=include_root_files,
internal_processing_authorized=internal_processing_authorized,
s3_access_key=s3_access_key,
s3_bucket_name=s3_bucket_name,
s3_endpoint=s3_endpoint,
s3_region=s3_region,
s3_secret_key=s3_secret_key,
s3_session_token=s3_session_token,
)
delete_cloud_storage_integration(self, cloud_storage_integration_id)
Delete a cloud storage integration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_integration_id |
str |
Id of the cloud storage integration. |
required |
Source code in kili/presentation/client/cloud_storage.py
def delete_cloud_storage_integration(self, cloud_storage_integration_id: str) -> str:
"""Delete a cloud storage integration.
Args:
cloud_storage_integration_id: Id of the cloud storage integration.
"""
cloud_storage_integration_id = DataIntegrationId(cloud_storage_integration_id)
cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)
return cloud_storage_use_cases.delete_data_integration(
data_integration_id=cloud_storage_integration_id
)
synchronize_cloud_storage_connection(self, cloud_storage_connection_id, delete_extraneous_files=False, dry_run=False)
Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project, and then validate the differences.
If delete_extraneous_files
is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_storage_connection_id |
str |
Id of the cloud storage connection. |
required |
delete_extraneous_files |
bool |
If True, delete extraneous files. |
False |
dry_run |
bool |
If True, will not synchronize the data connection but only print the differences. This is useful to check the differences before applying them to the project. |
False |
Returns:
Type | Description |
---|---|
Dict |
A dict with the cloud storage connection Id. |
Source code in kili/presentation/client/cloud_storage.py
def synchronize_cloud_storage_connection(
self,
cloud_storage_connection_id: str,
delete_extraneous_files: bool = False,
dry_run: bool = False,
) -> Dict:
"""Synchronize a cloud storage connection.
This method will compute differences between the cloud storage connection and the project,
and then validate the differences.
If `delete_extraneous_files` is True, it will also delete files that are not in the
cloud storage integration anymore but that are still in the project.
Args:
cloud_storage_connection_id: Id of the cloud storage connection.
delete_extraneous_files: If True, delete extraneous files.
dry_run: If True, will not synchronize the data connection but only print the
differences. This is useful to check the differences before applying them to the
project.
Returns:
A dict with the cloud storage connection Id.
"""
data_connection_id = DataConnectionId(cloud_storage_connection_id)
cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)
cloud_storage_use_cases.synchronize_data_connection(
data_connection_id=data_connection_id,
delete_extraneous_files=delete_extraneous_files,
dry_run=dry_run,
logger=logger,
)
return cloud_storage_use_cases.get_data_connection(
data_connection_id=data_connection_id, fields=("numberOfAssets", "projectId")
)
update_cloud_storage_integration(self, cloud_storage_integration_id, allowed_paths=None, allowed_projects=None, aws_access_point_arn=None, aws_role_arn=None, aws_role_external_id=None, azure_connection_url=None, azure_is_using_service_credentials=None, azure_sas_token=None, azure_tenant_id=None, gcp_bucket_name=None, include_root_files=None, internal_processing_authorized=None, name=None, organization_id=None, platform=None, status=None, s3_access_key=None, s3_bucket_name=None, s3_endpoint=None, s3_region=None, s3_secret_key=None, s3_session_token=None)
Update cloud storage data integration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
allowed_paths |
Optional[List[str]] |
List of allowed paths. |
None |
allowed_projects |
Optional[List[str]] |
List of allowed projects. |
None |
aws_access_point_arn |
Optional[str] |
AWS access point ARN. |
None |
aws_role_arn |
Optional[str] |
AWS role ARN. |
None |
aws_role_external_id |
Optional[str] |
AWS role external ID. |
None |
azure_connection_url |
Optional[str] |
Azure connection URL. |
None |
azure_is_using_service_credentials |
Optional[bool] |
Whether Azure is using service credentials. |
None |
azure_sas_token |
Optional[str] |
Azure SAS token. |
None |
azure_tenant_id |
Optional[str] |
Azure tenant ID. |
None |
cloud_storage_integration_id |
str |
Data integration ID. |
required |
gcp_bucket_name |
Optional[str] |
GCP bucket name. |
None |
include_root_files |
Optional[str] |
Whether to include root files. |
None |
internal_processing_authorized |
Optional[str] |
Whether internal processing is authorized. |
None |
organization_id |
Optional[str] |
Organization ID. |
None |
name |
Optional[str] |
Name of the cloud storage integration. |
None |
platform |
Optional[Literal['AWS', 'Azure', 'GCP', 'CustomS3']] |
Platform of the cloud storage integration. |
None |
status |
Optional[Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']] |
Status of the cloud storage integration. |
None |
s3_access_key |
Optional[str] |
S3 access key. |
None |
s3_bucket_name |
Optional[str] |
S3 bucket name. |
None |
s3_endpoint |
Optional[str] |
S3 endpoint. |
None |
s3_region |
Optional[str] |
S3 region. |
None |
s3_secret_key |
Optional[str] |
S3 secret key. |
None |
s3_session_token |
Optional[str] |
S3 session token. |
None |
Source code in kili/presentation/client/cloud_storage.py
def update_cloud_storage_integration(
self,
cloud_storage_integration_id: str,
allowed_paths: Optional[List[str]] = None,
allowed_projects: Optional[List[str]] = None,
aws_access_point_arn: Optional[str] = None,
aws_role_arn: Optional[str] = None,
aws_role_external_id: Optional[str] = None,
azure_connection_url: Optional[str] = None,
azure_is_using_service_credentials: Optional[bool] = None,
azure_sas_token: Optional[str] = None,
azure_tenant_id: Optional[str] = None,
gcp_bucket_name: Optional[str] = None,
include_root_files: Optional[str] = None,
internal_processing_authorized: Optional[str] = None,
name: Optional[str] = None,
organization_id: Optional[str] = None,
platform: Optional[DataIntegrationPlatform] = None,
status: Optional[DataIntegrationStatus] = None,
s3_access_key: Optional[str] = None,
s3_bucket_name: Optional[str] = None,
s3_endpoint: Optional[str] = None,
s3_region: Optional[str] = None,
s3_secret_key: Optional[str] = None,
s3_session_token: Optional[str] = None,
) -> Dict:
"""Update cloud storage data integration.
Args:
allowed_paths: List of allowed paths.
allowed_projects: List of allowed projects.
aws_access_point_arn: AWS access point ARN.
aws_role_arn: AWS role ARN.
aws_role_external_id: AWS role external ID.
azure_connection_url: Azure connection URL.
azure_is_using_service_credentials: Whether Azure is using service credentials.
azure_sas_token: Azure SAS token.
azure_tenant_id: Azure tenant ID.
cloud_storage_integration_id: Data integration ID.
gcp_bucket_name: GCP bucket name.
include_root_files: Whether to include root files.
internal_processing_authorized: Whether internal processing is authorized.
organization_id: Organization ID.
name: Name of the cloud storage integration.
platform: Platform of the cloud storage integration.
status: Status of the cloud storage integration.
s3_access_key: S3 access key.
s3_bucket_name: S3 bucket name.
s3_endpoint: S3 endpoint.
s3_region: S3 region.
s3_secret_key: S3 secret key.
s3_session_token: S3 session token.
"""
return CloudStorageUseCases(self.kili_api_gateway).update_data_integration(
data_integration_id=DataIntegrationId(cloud_storage_integration_id),
name=name,
platform=platform,
allowed_paths=allowed_paths,
allowed_projects=allowed_projects,
aws_access_point_arn=aws_access_point_arn,
aws_role_arn=aws_role_arn,
aws_role_external_id=aws_role_external_id,
azure_connection_url=azure_connection_url,
azure_is_using_service_credentials=azure_is_using_service_credentials,
azure_sas_token=azure_sas_token,
azure_tenant_id=azure_tenant_id,
gcp_bucket_name=gcp_bucket_name,
include_root_files=include_root_files,
internal_processing_authorized=internal_processing_authorized,
organization_id=organization_id,
s3_access_key=s3_access_key,
s3_bucket_name=s3_bucket_name,
s3_endpoint=s3_endpoint,
s3_region=s3_region,
s3_secret_key=s3_secret_key,
s3_session_token=s3_session_token,
status=status,
)