Skip to content

Cloud storage module

Alpha feature

The cloud storage feature is currently in alpha. It is still under active development: methods and behaviors can still evolve until the feature is complete.

Cloud Storage Integration and Connection

A cloud storage integration is a connection between a Kili organization and a cloud storage (AWS, GCP or Azure). Once a cloud storage integration is created, it can be used in any project of the organization. Adding a cloud storage integration from the SDK is currently not supported. More information about how to create a cloud storage integration can be found here.

A cloud storage connection is a cloud storage integration used in a Kili project. It is used to import data from a cloud storage to a project. More information about how to use a cloud storage integration in a project can be found here.

Azure

It is recommended to install the Azure dependencies to use the Azure cloud storage integration and connection.

pip install kili[azure]

Methods attached to the Kili client, to run actions on cloud storage.

Source code in kili/presentation/client/cloud_storage.py
class CloudStorageClientMethods(BaseClientMethods):
    """Methods attached to the Kili client, to run actions on cloud storage."""

    @overload
    def cloud_storage_connections(
        self,
        cloud_storage_connection_id: Optional[str] = None,
        cloud_storage_integration_id: Optional[str] = None,
        project_id: Optional[str] = None,
        fields: ListOrTuple[str] = (
            "id",
            "lastChecked",
            "numberOfAssets",
            "selectedFolders",
            "projectId",
        ),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: Literal[True],
    ) -> Generator[Dict, None, None]:
        ...

    @overload
    def cloud_storage_connections(
        self,
        cloud_storage_connection_id: Optional[str] = None,
        cloud_storage_integration_id: Optional[str] = None,
        project_id: Optional[str] = None,
        fields: ListOrTuple[str] = (
            "id",
            "lastChecked",
            "numberOfAssets",
            "selectedFolders",
            "projectId",
        ),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: Literal[False] = False,
    ) -> List[Dict]:
        ...

    @typechecked
    def cloud_storage_connections(
        self,
        cloud_storage_connection_id: Optional[str] = None,
        cloud_storage_integration_id: Optional[str] = None,
        project_id: Optional[str] = None,
        fields: ListOrTuple[str] = (
            "id",
            "lastChecked",
            "numberOfAssets",
            "selectedFolders",
            "projectId",
        ),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: bool = False,
    ) -> Iterable[Dict]:
        # pylint: disable=line-too-long
        """Get a generator or a list of cloud storage connections that match a set of criteria.

        Args:
            cloud_storage_connection_id: ID of the cloud storage connection.
            cloud_storage_integration_id: ID of the cloud storage integration.
            project_id: ID of the project.
            fields: All the fields to request among the possible fields for the cloud storage connections.
                See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
            first: Maximum number of cloud storage connections to return.
            skip: Number of skipped cloud storage connections.
            disable_tqdm: If `True`, the progress bar will be disabled.
            as_generator: If `True`, a generator on the cloud storage connections is returned.

        Returns:
            A list or a generator of the cloud storage connections that match the criteria.

        Examples:
            >>> kili.cloud_storage_connections(project_id="789465123")
            [{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
        """
        if (
            cloud_storage_connection_id is None
            and cloud_storage_integration_id is None
            and project_id is None
        ):
            raise ValueError(
                "At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
                " project_id must be specified"
            )

        disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)

        cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)

        if cloud_storage_connection_id is None:
            data_connections_gen = cloud_storage_use_cases.list_data_connections(
                data_connection_filters=DataConnectionFilters(
                    project_id=ProjectId(project_id) if project_id is not None else None,
                    integration_id=(
                        DataIntegrationId(cloud_storage_integration_id)
                        if cloud_storage_integration_id is not None
                        else None
                    ),
                ),
                fields=fields,
                options=QueryOptions(disable_tqdm, first, skip),
            )
        else:
            data_connections_gen = (
                i
                for i in [
                    cloud_storage_use_cases.get_data_connection(
                        DataConnectionId(cloud_storage_connection_id), fields=fields
                    )
                ]
            )

        if as_generator:
            return data_connections_gen
        return list(data_connections_gen)

    @overload
    def cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[DataIntegrationPlatform] = None,
        status: Optional[DataIntegrationStatus] = None,
        organization_id: Optional[str] = None,
        fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: Literal[True],
    ) -> Generator[Dict, None, None]:
        ...

    @overload
    def cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[DataIntegrationPlatform] = None,
        status: Optional[DataIntegrationStatus] = None,
        organization_id: Optional[str] = None,
        fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: Literal[False] = False,
    ) -> List[Dict]:
        ...

    @typechecked
    def cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[DataIntegrationPlatform] = None,
        status: Optional[DataIntegrationStatus] = None,
        organization_id: Optional[str] = None,
        fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: bool = False,
    ) -> Iterable[Dict]:
        # pylint: disable=line-too-long
        """Get a generator or a list of cloud storage integrations that match a set of criteria.

        Args:
            cloud_storage_integration_id: ID of the cloud storage integration.
            name: Name of the cloud storage integration.
            platform: Platform of the cloud storage integration.
            status: Status of the cloud storage integration.
            organization_id: ID of the organization.
            fields: All the fields to request among the possible fields for the cloud storage integrations.
                See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
            first: Maximum number of cloud storage integrations to return.
            skip: Number of skipped cloud storage integrations.
            disable_tqdm: If `True`, the progress bar will be disabled.
            as_generator: If `True`, a generator on the cloud storage integrations is returned.

        Returns:
            A list or a generator of the cloud storage integrations that match the criteria.

        Examples:
            >>> kili.cloud_storage_integrations()
            [{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
        """
        disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
        options = QueryOptions(disable_tqdm, first, skip)
        data_integrations_gen = CloudStorageUseCases(self.kili_api_gateway).list_data_integrations(
            data_integration_filters=DataIntegrationFilters(
                status=status,
                id=(
                    DataIntegrationId(cloud_storage_integration_id)
                    if cloud_storage_integration_id is not None
                    else None
                ),
                name=name,
                platform=platform,
                organization_id=(
                    OrganizationId(organization_id) if organization_id is not None else None
                ),
            ),
            fields=fields,
            options=options,
        )

        if as_generator:
            return data_integrations_gen
        return list(data_integrations_gen)

    @typechecked
    def count_cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[DataIntegrationPlatform] = None,
        status: Optional[DataIntegrationStatus] = None,
        organization_id: Optional[str] = None,
    ) -> int:
        """Count and return the number of cloud storage integrations that match a set of criteria.

        Args:
            cloud_storage_integration_id: ID of the cloud storage integration.
            name: Name of the cloud storage integration.
            platform: Platform of the cloud storage integration.
            status: Status of the cloud storage integration.
            organization_id: ID of the organization.

        Returns:
            The number of cloud storage integrations that match the criteria.
        """
        return CloudStorageUseCases(self.kili_api_gateway).count_data_integrations(
            DataIntegrationFilters(
                status=status,
                id=(
                    DataIntegrationId(cloud_storage_integration_id)
                    if cloud_storage_integration_id is not None
                    else None
                ),
                name=name,
                platform=platform,
                organization_id=(
                    OrganizationId(organization_id) if organization_id is not None else None
                ),
            )
        )

    @typechecked
    def add_cloud_storage_connection(
        self,
        project_id: str,
        cloud_storage_integration_id: str,
        selected_folders: Optional[List[str]] = None,
        prefix: Optional[str] = None,
        include: Optional[List[str]] = None,
        exclude: Optional[List[str]] = None,
    ) -> Dict:
        """Connect a cloud storage to a project. More details about parameters
        can be found in the [documentation](https://docs.kili-technology.com/docs/filtering-assets-from-cloud-storage).

        Args:
            project_id: Id of the project.
            cloud_storage_integration_id: Id of the cloud storage integration.
            selected_folders: List of folders of the data integration to connect to the project.
                If not provided, all folders of the data integration will be connected.
                This option is deprecated and will be removed in the future.
            prefix: Filter files to synchronize based on their base path.
            include: List of pattern used to include files based on their path.
            exclude: List of pattern used to exclude files based on their path.

        Returns:
            A dict with the DataConnection Id.
        """
        if selected_folders is not None:
            logger.warning(
                "The selected_folders argument is deprecated and will be removed in the future."
            )

        data_connection_id = CloudStorageUseCases(self.kili_api_gateway).add_data_connection(
            project_id=ProjectId(project_id),
            data_integration_id=DataIntegrationId(cloud_storage_integration_id),
            selected_folders=selected_folders,
            fields=("id",),
            prefix=prefix,
            include=include,
            exclude=exclude,
        )["id"]

        return {"id": data_connection_id}

    @typechecked
    def synchronize_cloud_storage_connection(
        self,
        cloud_storage_connection_id: str,
        delete_extraneous_files: bool = False,
        dry_run: bool = False,
    ) -> Dict:
        """Synchronize a cloud storage connection.

        This method will compute differences between the cloud storage connection and the project,
            and then validate the differences.

        If `delete_extraneous_files` is True, it will also delete files that are not in the
            cloud storage integration anymore but that are still in the project.

        Args:
            cloud_storage_connection_id: Id of the cloud storage connection.
            delete_extraneous_files: If True, delete extraneous files.
            dry_run: If True, will not synchronize the data connection but only print the
                differences. This is useful to check the differences before applying them to the
                project.

        Returns:
            A dict with the cloud storage connection Id.
        """
        data_connection_id = DataConnectionId(cloud_storage_connection_id)

        cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)

        cloud_storage_use_cases.synchronize_data_connection(
            data_connection_id=data_connection_id,
            delete_extraneous_files=delete_extraneous_files,
            dry_run=dry_run,
            logger=logger,
        )

        return cloud_storage_use_cases.get_data_connection(
            data_connection_id=data_connection_id, fields=("numberOfAssets", "projectId")
        )

    @typechecked
    def create_cloud_storage_integration(
        self,
        platform: DataIntegrationPlatform,
        name: str,
        fields: ListOrTuple[str] = (
            "id",
            "name",
            "status",
            "platform",
            "allowedPaths",
        ),
        allowed_paths: Optional[List[str]] = None,
        allowed_projects: Optional[List[str]] = None,
        aws_access_point_arn: Optional[str] = None,
        aws_role_arn: Optional[str] = None,
        aws_role_external_id: Optional[str] = None,
        azure_connection_url: Optional[str] = None,
        azure_is_using_service_credentials: Optional[bool] = None,
        azure_sas_token: Optional[str] = None,
        azure_tenant_id: Optional[str] = None,
        gcp_bucket_name: Optional[str] = None,
        include_root_files: Optional[str] = None,
        internal_processing_authorized: Optional[str] = None,
        s3_access_key: Optional[str] = None,
        s3_bucket_name: Optional[str] = None,
        s3_endpoint: Optional[str] = None,
        s3_region: Optional[str] = None,
        s3_secret_key: Optional[str] = None,
        s3_session_token: Optional[str] = None,
    ) -> Dict:
        # pylint: disable=line-too-long
        """Create a cloud storage integration.

        Args:
            fields: All the fields to request among the possible fields for the cloud storage integration.
                See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
            allowed_paths: List of allowed paths.
            allowed_projects: List of allowed projects.
            aws_access_point_arn: AWS access point ARN.
            aws_role_arn: AWS role ARN.
            aws_role_external_id: AWS role external ID.
            azure_connection_url: Azure connection URL.
            azure_is_using_service_credentials: Whether Azure is using service credentials.
            azure_sas_token: Azure SAS token.
            azure_tenant_id: Azure tenant ID.
            gcp_bucket_name: GCP bucket name.
            include_root_files: Whether to include root files.
            internal_processing_authorized: Whether internal processing is authorized.
            name: Name of the cloud storage integration.
            platform: Platform of the cloud storage integration.
            s3_access_key: S3 access key.
            s3_bucket_name: S3 bucket name.
            s3_endpoint: S3 endpoint.
            s3_region: S3 region.
            s3_secret_key: S3 secret key.
            s3_session_token: S3 session token.
        """
        cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)

        return cloud_storage_use_cases.create_data_integration(
            platform=platform,
            name=name,
            fields=fields,
            allowed_paths=allowed_paths,
            allowed_projects=allowed_projects,
            aws_access_point_arn=aws_access_point_arn,
            aws_role_arn=aws_role_arn,
            aws_role_external_id=aws_role_external_id,
            azure_connection_url=azure_connection_url,
            azure_is_using_service_credentials=azure_is_using_service_credentials,
            azure_sas_token=azure_sas_token,
            azure_tenant_id=azure_tenant_id,
            gcp_bucket_name=gcp_bucket_name,
            include_root_files=include_root_files,
            internal_processing_authorized=internal_processing_authorized,
            s3_access_key=s3_access_key,
            s3_bucket_name=s3_bucket_name,
            s3_endpoint=s3_endpoint,
            s3_region=s3_region,
            s3_secret_key=s3_secret_key,
            s3_session_token=s3_session_token,
        )

    @typechecked
    def update_cloud_storage_integration(
        self,
        cloud_storage_integration_id: str,
        allowed_paths: Optional[List[str]] = None,
        allowed_projects: Optional[List[str]] = None,
        aws_access_point_arn: Optional[str] = None,
        aws_role_arn: Optional[str] = None,
        aws_role_external_id: Optional[str] = None,
        azure_connection_url: Optional[str] = None,
        azure_is_using_service_credentials: Optional[bool] = None,
        azure_sas_token: Optional[str] = None,
        azure_tenant_id: Optional[str] = None,
        gcp_bucket_name: Optional[str] = None,
        include_root_files: Optional[str] = None,
        internal_processing_authorized: Optional[str] = None,
        name: Optional[str] = None,
        organization_id: Optional[str] = None,
        platform: Optional[DataIntegrationPlatform] = None,
        status: Optional[DataIntegrationStatus] = None,
        s3_access_key: Optional[str] = None,
        s3_bucket_name: Optional[str] = None,
        s3_endpoint: Optional[str] = None,
        s3_region: Optional[str] = None,
        s3_secret_key: Optional[str] = None,
        s3_session_token: Optional[str] = None,
    ) -> Dict:
        """Update cloud storage data integration.

        Args:
            allowed_paths: List of allowed paths.
            allowed_projects: List of allowed projects.
            aws_access_point_arn: AWS access point ARN.
            aws_role_arn: AWS role ARN.
            aws_role_external_id: AWS role external ID.
            azure_connection_url: Azure connection URL.
            azure_is_using_service_credentials: Whether Azure is using service credentials.
            azure_sas_token: Azure SAS token.
            azure_tenant_id: Azure tenant ID.
            cloud_storage_integration_id: Data integration ID.
            gcp_bucket_name: GCP bucket name.
            include_root_files: Whether to include root files.
            internal_processing_authorized: Whether internal processing is authorized.
            organization_id: Organization ID.
            name: Name of the cloud storage integration.
            platform: Platform of the cloud storage integration.
            status: Status of the cloud storage integration.
            s3_access_key: S3 access key.
            s3_bucket_name: S3 bucket name.
            s3_endpoint: S3 endpoint.
            s3_region: S3 region.
            s3_secret_key: S3 secret key.
            s3_session_token: S3 session token.
        """
        return CloudStorageUseCases(self.kili_api_gateway).update_data_integration(
            data_integration_id=DataIntegrationId(cloud_storage_integration_id),
            name=name,
            platform=platform,
            allowed_paths=allowed_paths,
            allowed_projects=allowed_projects,
            aws_access_point_arn=aws_access_point_arn,
            aws_role_arn=aws_role_arn,
            aws_role_external_id=aws_role_external_id,
            azure_connection_url=azure_connection_url,
            azure_is_using_service_credentials=azure_is_using_service_credentials,
            azure_sas_token=azure_sas_token,
            azure_tenant_id=azure_tenant_id,
            gcp_bucket_name=gcp_bucket_name,
            include_root_files=include_root_files,
            internal_processing_authorized=internal_processing_authorized,
            organization_id=organization_id,
            s3_access_key=s3_access_key,
            s3_bucket_name=s3_bucket_name,
            s3_endpoint=s3_endpoint,
            s3_region=s3_region,
            s3_secret_key=s3_secret_key,
            s3_session_token=s3_session_token,
            status=status,
        )

    @typechecked
    def delete_cloud_storage_integration(self, cloud_storage_integration_id: str) -> str:
        """Delete a cloud storage integration.

        Args:
            cloud_storage_integration_id: Id of the cloud storage integration.
        """
        cloud_storage_integration_id = DataIntegrationId(cloud_storage_integration_id)

        cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)

        return cloud_storage_use_cases.delete_data_integration(
            data_integration_id=cloud_storage_integration_id
        )

add_cloud_storage_connection(self, project_id, cloud_storage_integration_id, selected_folders=None, prefix=None, include=None, exclude=None)

Connect a cloud storage to a project. More details about parameters can be found in the documentation.

Parameters:

Name Type Description Default
project_id str

Id of the project.

required
cloud_storage_integration_id str

Id of the cloud storage integration.

required
selected_folders Optional[List[str]]

List of folders of the data integration to connect to the project. If not provided, all folders of the data integration will be connected. This option is deprecated and will be removed in the future.

None
prefix Optional[str]

Filter files to synchronize based on their base path.

None
include Optional[List[str]]

List of pattern used to include files based on their path.

None
exclude Optional[List[str]]

List of pattern used to exclude files based on their path.

None

Returns:

Type Description
Dict

A dict with the DataConnection Id.

Source code in kili/presentation/client/cloud_storage.py
def add_cloud_storage_connection(
    self,
    project_id: str,
    cloud_storage_integration_id: str,
    selected_folders: Optional[List[str]] = None,
    prefix: Optional[str] = None,
    include: Optional[List[str]] = None,
    exclude: Optional[List[str]] = None,
) -> Dict:
    """Connect a cloud storage to a project. More details about parameters
    can be found in the [documentation](https://docs.kili-technology.com/docs/filtering-assets-from-cloud-storage).

    Args:
        project_id: Id of the project.
        cloud_storage_integration_id: Id of the cloud storage integration.
        selected_folders: List of folders of the data integration to connect to the project.
            If not provided, all folders of the data integration will be connected.
            This option is deprecated and will be removed in the future.
        prefix: Filter files to synchronize based on their base path.
        include: List of pattern used to include files based on their path.
        exclude: List of pattern used to exclude files based on their path.

    Returns:
        A dict with the DataConnection Id.
    """
    if selected_folders is not None:
        logger.warning(
            "The selected_folders argument is deprecated and will be removed in the future."
        )

    data_connection_id = CloudStorageUseCases(self.kili_api_gateway).add_data_connection(
        project_id=ProjectId(project_id),
        data_integration_id=DataIntegrationId(cloud_storage_integration_id),
        selected_folders=selected_folders,
        fields=("id",),
        prefix=prefix,
        include=include,
        exclude=exclude,
    )["id"]

    return {"id": data_connection_id}

cloud_storage_connections(self, cloud_storage_connection_id=None, cloud_storage_integration_id=None, project_id=None, fields=('id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId'), first=None, skip=0, disable_tqdm=None, *, as_generator=False)

Get a generator or a list of cloud storage connections that match a set of criteria.

Parameters:

Name Type Description Default
cloud_storage_connection_id Optional[str]

ID of the cloud storage connection.

None
cloud_storage_integration_id Optional[str]

ID of the cloud storage integration.

None
project_id Optional[str]

ID of the project.

None
fields Union[List[str], Tuple[str, ...]]

All the fields to request among the possible fields for the cloud storage connections. See the documentation for all possible fields.

('id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId')
first Optional[int]

Maximum number of cloud storage connections to return.

None
skip int

Number of skipped cloud storage connections.

0
disable_tqdm Optional[bool]

If True, the progress bar will be disabled.

None
as_generator bool

If True, a generator on the cloud storage connections is returned.

False

Returns:

Type Description
Iterable[Dict]

A list or a generator of the cloud storage connections that match the criteria.

Examples:

>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
Source code in kili/presentation/client/cloud_storage.py
def cloud_storage_connections(
    self,
    cloud_storage_connection_id: Optional[str] = None,
    cloud_storage_integration_id: Optional[str] = None,
    project_id: Optional[str] = None,
    fields: ListOrTuple[str] = (
        "id",
        "lastChecked",
        "numberOfAssets",
        "selectedFolders",
        "projectId",
    ),
    first: Optional[int] = None,
    skip: int = 0,
    disable_tqdm: Optional[bool] = None,
    *,
    as_generator: bool = False,
) -> Iterable[Dict]:
    # pylint: disable=line-too-long
    """Get a generator or a list of cloud storage connections that match a set of criteria.

    Args:
        cloud_storage_connection_id: ID of the cloud storage connection.
        cloud_storage_integration_id: ID of the cloud storage integration.
        project_id: ID of the project.
        fields: All the fields to request among the possible fields for the cloud storage connections.
            See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
        first: Maximum number of cloud storage connections to return.
        skip: Number of skipped cloud storage connections.
        disable_tqdm: If `True`, the progress bar will be disabled.
        as_generator: If `True`, a generator on the cloud storage connections is returned.

    Returns:
        A list or a generator of the cloud storage connections that match the criteria.

    Examples:
        >>> kili.cloud_storage_connections(project_id="789465123")
        [{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
    """
    if (
        cloud_storage_connection_id is None
        and cloud_storage_integration_id is None
        and project_id is None
    ):
        raise ValueError(
            "At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
            " project_id must be specified"
        )

    disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)

    cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)

    if cloud_storage_connection_id is None:
        data_connections_gen = cloud_storage_use_cases.list_data_connections(
            data_connection_filters=DataConnectionFilters(
                project_id=ProjectId(project_id) if project_id is not None else None,
                integration_id=(
                    DataIntegrationId(cloud_storage_integration_id)
                    if cloud_storage_integration_id is not None
                    else None
                ),
            ),
            fields=fields,
            options=QueryOptions(disable_tqdm, first, skip),
        )
    else:
        data_connections_gen = (
            i
            for i in [
                cloud_storage_use_cases.get_data_connection(
                    DataConnectionId(cloud_storage_connection_id), fields=fields
                )
            ]
        )

    if as_generator:
        return data_connections_gen
    return list(data_connections_gen)

cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None, fields=('name', 'id', 'platform', 'status'), first=None, skip=0, disable_tqdm=None, *, as_generator=False)

Get a generator or a list of cloud storage integrations that match a set of criteria.

Parameters:

Name Type Description Default
cloud_storage_integration_id Optional[str]

ID of the cloud storage integration.

None
name Optional[str]

Name of the cloud storage integration.

None
platform Optional[Literal['AWS', 'Azure', 'GCP', 'CustomS3']]

Platform of the cloud storage integration.

None
status Optional[Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']]

Status of the cloud storage integration.

None
organization_id Optional[str]

ID of the organization.

None
fields Union[List[str], Tuple[str, ...]]

All the fields to request among the possible fields for the cloud storage integrations. See the documentation for all possible fields.

('name', 'id', 'platform', 'status')
first Optional[int]

Maximum number of cloud storage integrations to return.

None
skip int

Number of skipped cloud storage integrations.

0
disable_tqdm Optional[bool]

If True, the progress bar will be disabled.

None
as_generator bool

If True, a generator on the cloud storage integrations is returned.

False

Returns:

Type Description
Iterable[Dict]

A list or a generator of the cloud storage integrations that match the criteria.

Examples:

>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
Source code in kili/presentation/client/cloud_storage.py
def cloud_storage_integrations(
    self,
    cloud_storage_integration_id: Optional[str] = None,
    name: Optional[str] = None,
    platform: Optional[DataIntegrationPlatform] = None,
    status: Optional[DataIntegrationStatus] = None,
    organization_id: Optional[str] = None,
    fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
    first: Optional[int] = None,
    skip: int = 0,
    disable_tqdm: Optional[bool] = None,
    *,
    as_generator: bool = False,
) -> Iterable[Dict]:
    # pylint: disable=line-too-long
    """Get a generator or a list of cloud storage integrations that match a set of criteria.

    Args:
        cloud_storage_integration_id: ID of the cloud storage integration.
        name: Name of the cloud storage integration.
        platform: Platform of the cloud storage integration.
        status: Status of the cloud storage integration.
        organization_id: ID of the organization.
        fields: All the fields to request among the possible fields for the cloud storage integrations.
            See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
        first: Maximum number of cloud storage integrations to return.
        skip: Number of skipped cloud storage integrations.
        disable_tqdm: If `True`, the progress bar will be disabled.
        as_generator: If `True`, a generator on the cloud storage integrations is returned.

    Returns:
        A list or a generator of the cloud storage integrations that match the criteria.

    Examples:
        >>> kili.cloud_storage_integrations()
        [{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
    """
    disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
    options = QueryOptions(disable_tqdm, first, skip)
    data_integrations_gen = CloudStorageUseCases(self.kili_api_gateway).list_data_integrations(
        data_integration_filters=DataIntegrationFilters(
            status=status,
            id=(
                DataIntegrationId(cloud_storage_integration_id)
                if cloud_storage_integration_id is not None
                else None
            ),
            name=name,
            platform=platform,
            organization_id=(
                OrganizationId(organization_id) if organization_id is not None else None
            ),
        ),
        fields=fields,
        options=options,
    )

    if as_generator:
        return data_integrations_gen
    return list(data_integrations_gen)

count_cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None)

Count and return the number of cloud storage integrations that match a set of criteria.

Parameters:

Name Type Description Default
cloud_storage_integration_id Optional[str]

ID of the cloud storage integration.

None
name Optional[str]

Name of the cloud storage integration.

None
platform Optional[Literal['AWS', 'Azure', 'GCP', 'CustomS3']]

Platform of the cloud storage integration.

None
status Optional[Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']]

Status of the cloud storage integration.

None
organization_id Optional[str]

ID of the organization.

None

Returns:

Type Description
int

The number of cloud storage integrations that match the criteria.

Source code in kili/presentation/client/cloud_storage.py
def count_cloud_storage_integrations(
    self,
    cloud_storage_integration_id: Optional[str] = None,
    name: Optional[str] = None,
    platform: Optional[DataIntegrationPlatform] = None,
    status: Optional[DataIntegrationStatus] = None,
    organization_id: Optional[str] = None,
) -> int:
    """Count and return the number of cloud storage integrations that match a set of criteria.

    Args:
        cloud_storage_integration_id: ID of the cloud storage integration.
        name: Name of the cloud storage integration.
        platform: Platform of the cloud storage integration.
        status: Status of the cloud storage integration.
        organization_id: ID of the organization.

    Returns:
        The number of cloud storage integrations that match the criteria.
    """
    return CloudStorageUseCases(self.kili_api_gateway).count_data_integrations(
        DataIntegrationFilters(
            status=status,
            id=(
                DataIntegrationId(cloud_storage_integration_id)
                if cloud_storage_integration_id is not None
                else None
            ),
            name=name,
            platform=platform,
            organization_id=(
                OrganizationId(organization_id) if organization_id is not None else None
            ),
        )
    )

create_cloud_storage_integration(self, platform, name, fields=('id', 'name', 'status', 'platform', 'allowedPaths'), allowed_paths=None, allowed_projects=None, aws_access_point_arn=None, aws_role_arn=None, aws_role_external_id=None, azure_connection_url=None, azure_is_using_service_credentials=None, azure_sas_token=None, azure_tenant_id=None, gcp_bucket_name=None, include_root_files=None, internal_processing_authorized=None, s3_access_key=None, s3_bucket_name=None, s3_endpoint=None, s3_region=None, s3_secret_key=None, s3_session_token=None)

Create a cloud storage integration.

Parameters:

Name Type Description Default
fields Union[List[str], Tuple[str, ...]]

All the fields to request among the possible fields for the cloud storage integration. See the documentation for all possible fields.

('id', 'name', 'status', 'platform', 'allowedPaths')
allowed_paths Optional[List[str]]

List of allowed paths.

None
allowed_projects Optional[List[str]]

List of allowed projects.

None
aws_access_point_arn Optional[str]

AWS access point ARN.

None
aws_role_arn Optional[str]

AWS role ARN.

None
aws_role_external_id Optional[str]

AWS role external ID.

None
azure_connection_url Optional[str]

Azure connection URL.

None
azure_is_using_service_credentials Optional[bool]

Whether Azure is using service credentials.

None
azure_sas_token Optional[str]

Azure SAS token.

None
azure_tenant_id Optional[str]

Azure tenant ID.

None
gcp_bucket_name Optional[str]

GCP bucket name.

None
include_root_files Optional[str]

Whether to include root files.

None
internal_processing_authorized Optional[str]

Whether internal processing is authorized.

None
name str

Name of the cloud storage integration.

required
platform Literal['AWS', 'Azure', 'GCP', 'CustomS3']

Platform of the cloud storage integration.

required
s3_access_key Optional[str]

S3 access key.

None
s3_bucket_name Optional[str]

S3 bucket name.

None
s3_endpoint Optional[str]

S3 endpoint.

None
s3_region Optional[str]

S3 region.

None
s3_secret_key Optional[str]

S3 secret key.

None
s3_session_token Optional[str]

S3 session token.

None
Source code in kili/presentation/client/cloud_storage.py
def create_cloud_storage_integration(
    self,
    platform: DataIntegrationPlatform,
    name: str,
    fields: ListOrTuple[str] = (
        "id",
        "name",
        "status",
        "platform",
        "allowedPaths",
    ),
    allowed_paths: Optional[List[str]] = None,
    allowed_projects: Optional[List[str]] = None,
    aws_access_point_arn: Optional[str] = None,
    aws_role_arn: Optional[str] = None,
    aws_role_external_id: Optional[str] = None,
    azure_connection_url: Optional[str] = None,
    azure_is_using_service_credentials: Optional[bool] = None,
    azure_sas_token: Optional[str] = None,
    azure_tenant_id: Optional[str] = None,
    gcp_bucket_name: Optional[str] = None,
    include_root_files: Optional[str] = None,
    internal_processing_authorized: Optional[str] = None,
    s3_access_key: Optional[str] = None,
    s3_bucket_name: Optional[str] = None,
    s3_endpoint: Optional[str] = None,
    s3_region: Optional[str] = None,
    s3_secret_key: Optional[str] = None,
    s3_session_token: Optional[str] = None,
) -> Dict:
    # pylint: disable=line-too-long
    """Create a cloud storage integration.

    Args:
        fields: All the fields to request among the possible fields for the cloud storage integration.
            See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
        allowed_paths: List of allowed paths.
        allowed_projects: List of allowed projects.
        aws_access_point_arn: AWS access point ARN.
        aws_role_arn: AWS role ARN.
        aws_role_external_id: AWS role external ID.
        azure_connection_url: Azure connection URL.
        azure_is_using_service_credentials: Whether Azure is using service credentials.
        azure_sas_token: Azure SAS token.
        azure_tenant_id: Azure tenant ID.
        gcp_bucket_name: GCP bucket name.
        include_root_files: Whether to include root files.
        internal_processing_authorized: Whether internal processing is authorized.
        name: Name of the cloud storage integration.
        platform: Platform of the cloud storage integration.
        s3_access_key: S3 access key.
        s3_bucket_name: S3 bucket name.
        s3_endpoint: S3 endpoint.
        s3_region: S3 region.
        s3_secret_key: S3 secret key.
        s3_session_token: S3 session token.
    """
    cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)

    return cloud_storage_use_cases.create_data_integration(
        platform=platform,
        name=name,
        fields=fields,
        allowed_paths=allowed_paths,
        allowed_projects=allowed_projects,
        aws_access_point_arn=aws_access_point_arn,
        aws_role_arn=aws_role_arn,
        aws_role_external_id=aws_role_external_id,
        azure_connection_url=azure_connection_url,
        azure_is_using_service_credentials=azure_is_using_service_credentials,
        azure_sas_token=azure_sas_token,
        azure_tenant_id=azure_tenant_id,
        gcp_bucket_name=gcp_bucket_name,
        include_root_files=include_root_files,
        internal_processing_authorized=internal_processing_authorized,
        s3_access_key=s3_access_key,
        s3_bucket_name=s3_bucket_name,
        s3_endpoint=s3_endpoint,
        s3_region=s3_region,
        s3_secret_key=s3_secret_key,
        s3_session_token=s3_session_token,
    )

delete_cloud_storage_integration(self, cloud_storage_integration_id)

Delete a cloud storage integration.

Parameters:

Name Type Description Default
cloud_storage_integration_id str

Id of the cloud storage integration.

required
Source code in kili/presentation/client/cloud_storage.py
def delete_cloud_storage_integration(self, cloud_storage_integration_id: str) -> str:
    """Delete a cloud storage integration.

    Args:
        cloud_storage_integration_id: Id of the cloud storage integration.
    """
    cloud_storage_integration_id = DataIntegrationId(cloud_storage_integration_id)

    cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)

    return cloud_storage_use_cases.delete_data_integration(
        data_integration_id=cloud_storage_integration_id
    )

synchronize_cloud_storage_connection(self, cloud_storage_connection_id, delete_extraneous_files=False, dry_run=False)

Synchronize a cloud storage connection.

This method will compute differences between the cloud storage connection and the project, and then validate the differences.

If delete_extraneous_files is True, it will also delete files that are not in the cloud storage integration anymore but that are still in the project.

Parameters:

Name Type Description Default
cloud_storage_connection_id str

Id of the cloud storage connection.

required
delete_extraneous_files bool

If True, delete extraneous files.

False
dry_run bool

If True, will not synchronize the data connection but only print the differences. This is useful to check the differences before applying them to the project.

False

Returns:

Type Description
Dict

A dict with the cloud storage connection Id.

Source code in kili/presentation/client/cloud_storage.py
def synchronize_cloud_storage_connection(
    self,
    cloud_storage_connection_id: str,
    delete_extraneous_files: bool = False,
    dry_run: bool = False,
) -> Dict:
    """Synchronize a cloud storage connection.

    This method will compute differences between the cloud storage connection and the project,
        and then validate the differences.

    If `delete_extraneous_files` is True, it will also delete files that are not in the
        cloud storage integration anymore but that are still in the project.

    Args:
        cloud_storage_connection_id: Id of the cloud storage connection.
        delete_extraneous_files: If True, delete extraneous files.
        dry_run: If True, will not synchronize the data connection but only print the
            differences. This is useful to check the differences before applying them to the
            project.

    Returns:
        A dict with the cloud storage connection Id.
    """
    data_connection_id = DataConnectionId(cloud_storage_connection_id)

    cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)

    cloud_storage_use_cases.synchronize_data_connection(
        data_connection_id=data_connection_id,
        delete_extraneous_files=delete_extraneous_files,
        dry_run=dry_run,
        logger=logger,
    )

    return cloud_storage_use_cases.get_data_connection(
        data_connection_id=data_connection_id, fields=("numberOfAssets", "projectId")
    )

update_cloud_storage_integration(self, cloud_storage_integration_id, allowed_paths=None, allowed_projects=None, aws_access_point_arn=None, aws_role_arn=None, aws_role_external_id=None, azure_connection_url=None, azure_is_using_service_credentials=None, azure_sas_token=None, azure_tenant_id=None, gcp_bucket_name=None, include_root_files=None, internal_processing_authorized=None, name=None, organization_id=None, platform=None, status=None, s3_access_key=None, s3_bucket_name=None, s3_endpoint=None, s3_region=None, s3_secret_key=None, s3_session_token=None)

Update cloud storage data integration.

Parameters:

Name Type Description Default
allowed_paths Optional[List[str]]

List of allowed paths.

None
allowed_projects Optional[List[str]]

List of allowed projects.

None
aws_access_point_arn Optional[str]

AWS access point ARN.

None
aws_role_arn Optional[str]

AWS role ARN.

None
aws_role_external_id Optional[str]

AWS role external ID.

None
azure_connection_url Optional[str]

Azure connection URL.

None
azure_is_using_service_credentials Optional[bool]

Whether Azure is using service credentials.

None
azure_sas_token Optional[str]

Azure SAS token.

None
azure_tenant_id Optional[str]

Azure tenant ID.

None
cloud_storage_integration_id str

Data integration ID.

required
gcp_bucket_name Optional[str]

GCP bucket name.

None
include_root_files Optional[str]

Whether to include root files.

None
internal_processing_authorized Optional[str]

Whether internal processing is authorized.

None
organization_id Optional[str]

Organization ID.

None
name Optional[str]

Name of the cloud storage integration.

None
platform Optional[Literal['AWS', 'Azure', 'GCP', 'CustomS3']]

Platform of the cloud storage integration.

None
status Optional[Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']]

Status of the cloud storage integration.

None
s3_access_key Optional[str]

S3 access key.

None
s3_bucket_name Optional[str]

S3 bucket name.

None
s3_endpoint Optional[str]

S3 endpoint.

None
s3_region Optional[str]

S3 region.

None
s3_secret_key Optional[str]

S3 secret key.

None
s3_session_token Optional[str]

S3 session token.

None
Source code in kili/presentation/client/cloud_storage.py
def update_cloud_storage_integration(
    self,
    cloud_storage_integration_id: str,
    allowed_paths: Optional[List[str]] = None,
    allowed_projects: Optional[List[str]] = None,
    aws_access_point_arn: Optional[str] = None,
    aws_role_arn: Optional[str] = None,
    aws_role_external_id: Optional[str] = None,
    azure_connection_url: Optional[str] = None,
    azure_is_using_service_credentials: Optional[bool] = None,
    azure_sas_token: Optional[str] = None,
    azure_tenant_id: Optional[str] = None,
    gcp_bucket_name: Optional[str] = None,
    include_root_files: Optional[str] = None,
    internal_processing_authorized: Optional[str] = None,
    name: Optional[str] = None,
    organization_id: Optional[str] = None,
    platform: Optional[DataIntegrationPlatform] = None,
    status: Optional[DataIntegrationStatus] = None,
    s3_access_key: Optional[str] = None,
    s3_bucket_name: Optional[str] = None,
    s3_endpoint: Optional[str] = None,
    s3_region: Optional[str] = None,
    s3_secret_key: Optional[str] = None,
    s3_session_token: Optional[str] = None,
) -> Dict:
    """Update cloud storage data integration.

    Args:
        allowed_paths: List of allowed paths.
        allowed_projects: List of allowed projects.
        aws_access_point_arn: AWS access point ARN.
        aws_role_arn: AWS role ARN.
        aws_role_external_id: AWS role external ID.
        azure_connection_url: Azure connection URL.
        azure_is_using_service_credentials: Whether Azure is using service credentials.
        azure_sas_token: Azure SAS token.
        azure_tenant_id: Azure tenant ID.
        cloud_storage_integration_id: Data integration ID.
        gcp_bucket_name: GCP bucket name.
        include_root_files: Whether to include root files.
        internal_processing_authorized: Whether internal processing is authorized.
        organization_id: Organization ID.
        name: Name of the cloud storage integration.
        platform: Platform of the cloud storage integration.
        status: Status of the cloud storage integration.
        s3_access_key: S3 access key.
        s3_bucket_name: S3 bucket name.
        s3_endpoint: S3 endpoint.
        s3_region: S3 region.
        s3_secret_key: S3 secret key.
        s3_session_token: S3 session token.
    """
    return CloudStorageUseCases(self.kili_api_gateway).update_data_integration(
        data_integration_id=DataIntegrationId(cloud_storage_integration_id),
        name=name,
        platform=platform,
        allowed_paths=allowed_paths,
        allowed_projects=allowed_projects,
        aws_access_point_arn=aws_access_point_arn,
        aws_role_arn=aws_role_arn,
        aws_role_external_id=aws_role_external_id,
        azure_connection_url=azure_connection_url,
        azure_is_using_service_credentials=azure_is_using_service_credentials,
        azure_sas_token=azure_sas_token,
        azure_tenant_id=azure_tenant_id,
        gcp_bucket_name=gcp_bucket_name,
        include_root_files=include_root_files,
        internal_processing_authorized=internal_processing_authorized,
        organization_id=organization_id,
        s3_access_key=s3_access_key,
        s3_bucket_name=s3_bucket_name,
        s3_endpoint=s3_endpoint,
        s3_region=s3_region,
        s3_secret_key=s3_secret_key,
        s3_session_token=s3_session_token,
        status=status,
    )