Skip to content

Cloud storage module

Alpha feature

The cloud storage feature is currently in alpha. It is still under active development: methods and behaviors can still evolve until the feature is complete.

Cloud Storage Integration and Connection

A cloud storage integration is a connection between a Kili organization and a cloud storage (AWS, GCP or Azure). Once a cloud storage integration is created, it can be used in any project of the organization. Adding a cloud storage integration from the SDK is currently not supported. More information about how to create a cloud storage integration can be found here.

A cloud storage connection is a cloud storage integration used in a Kili project. It is used to import data from a cloud storage to a project. More information about how to use a cloud storage integration in a project can be found here.

Azure

It is recommended to install the Azure dependencies to use the Azure cloud storage integration and connection.

pip install kili[azure]

Methods attached to the Kili client, to run actions on cloud storage.

Source code in kili/presentation/client/cloud_storage.py
class CloudStorageClientMethods(BaseClientMethods):
    """Methods attached to the Kili client, to run actions on cloud storage."""

    @overload
    def cloud_storage_connections(
        self,
        cloud_storage_connection_id: Optional[str] = None,
        cloud_storage_integration_id: Optional[str] = None,
        project_id: Optional[str] = None,
        fields: ListOrTuple[str] = (
            "id",
            "lastChecked",
            "numberOfAssets",
            "selectedFolders",
            "projectId",
        ),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: Literal[True],
    ) -> Generator[Dict, None, None]:
        ...

    @overload
    def cloud_storage_connections(
        self,
        cloud_storage_connection_id: Optional[str] = None,
        cloud_storage_integration_id: Optional[str] = None,
        project_id: Optional[str] = None,
        fields: ListOrTuple[str] = (
            "id",
            "lastChecked",
            "numberOfAssets",
            "selectedFolders",
            "projectId",
        ),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: Literal[False] = False,
    ) -> List[Dict]:
        ...

    @typechecked
    def cloud_storage_connections(
        self,
        cloud_storage_connection_id: Optional[str] = None,
        cloud_storage_integration_id: Optional[str] = None,
        project_id: Optional[str] = None,
        fields: ListOrTuple[str] = (
            "id",
            "lastChecked",
            "numberOfAssets",
            "selectedFolders",
            "projectId",
        ),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: bool = False,
    ) -> Iterable[Dict]:
        # pylint: disable=line-too-long
        """Get a generator or a list of cloud storage connections that match a set of criteria.

        Args:
            cloud_storage_connection_id: ID of the cloud storage connection.
            cloud_storage_integration_id: ID of the cloud storage integration.
            project_id: ID of the project.
            fields: All the fields to request among the possible fields for the cloud storage connections.
                See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
            first: Maximum number of cloud storage connections to return.
            skip: Number of skipped cloud storage connections.
            disable_tqdm: If `True`, the progress bar will be disabled.
            as_generator: If `True`, a generator on the cloud storage connections is returned.

        Returns:
            A list or a generator of the cloud storage connections that match the criteria.

        Examples:
            >>> kili.cloud_storage_connections(project_id="789465123")
            [{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
        """
        if (
            cloud_storage_connection_id is None
            and cloud_storage_integration_id is None
            and project_id is None
        ):
            raise ValueError(
                "At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
                " project_id must be specified"
            )

        disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)

        cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)

        if cloud_storage_connection_id is None:
            data_connections_gen = cloud_storage_use_cases.list_data_connections(
                data_connection_filters=DataConnectionFilters(
                    project_id=ProjectId(project_id) if project_id is not None else None,
                    integration_id=(
                        DataIntegrationId(cloud_storage_integration_id)
                        if cloud_storage_integration_id is not None
                        else None
                    ),
                ),
                fields=fields,
                options=QueryOptions(disable_tqdm, first, skip),
            )
        else:
            data_connections_gen = (
                i
                for i in [
                    cloud_storage_use_cases.get_data_connection(
                        DataConnectionId(cloud_storage_connection_id), fields=fields
                    )
                ]
            )

        if as_generator:
            return data_connections_gen
        return list(data_connections_gen)

    @overload
    def cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[DataIntegrationPlatform] = None,
        status: Optional[DataIntegrationStatus] = None,
        organization_id: Optional[str] = None,
        fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: Literal[True],
    ) -> Generator[Dict, None, None]:
        ...

    @overload
    def cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[DataIntegrationPlatform] = None,
        status: Optional[DataIntegrationStatus] = None,
        organization_id: Optional[str] = None,
        fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: Literal[False] = False,
    ) -> List[Dict]:
        ...

    @typechecked
    def cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[DataIntegrationPlatform] = None,
        status: Optional[DataIntegrationStatus] = None,
        organization_id: Optional[str] = None,
        fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: bool = False,
    ) -> Iterable[Dict]:
        # pylint: disable=line-too-long
        """Get a generator or a list of cloud storage integrations that match a set of criteria.

        Args:
            cloud_storage_integration_id: ID of the cloud storage integration.
            name: Name of the cloud storage integration.
            platform: Platform of the cloud storage integration.
            status: Status of the cloud storage integration.
            organization_id: ID of the organization.
            fields: All the fields to request among the possible fields for the cloud storage integrations.
                See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
            first: Maximum number of cloud storage integrations to return.
            skip: Number of skipped cloud storage integrations.
            disable_tqdm: If `True`, the progress bar will be disabled.
            as_generator: If `True`, a generator on the cloud storage integrations is returned.

        Returns:
            A list or a generator of the cloud storage integrations that match the criteria.

        Examples:
            >>> kili.cloud_storage_integrations()
            [{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
        """
        disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
        options = QueryOptions(disable_tqdm, first, skip)
        data_integrations_gen = CloudStorageUseCases(self.kili_api_gateway).list_data_integrations(
            data_integration_filters=DataIntegrationFilters(
                status=status,
                id=(
                    DataIntegrationId(cloud_storage_integration_id)
                    if cloud_storage_integration_id is not None
                    else None
                ),
                name=name,
                platform=platform,
                organization_id=(
                    OrganizationId(organization_id) if organization_id is not None else None
                ),
            ),
            fields=fields,
            options=options,
        )

        if as_generator:
            return data_integrations_gen
        return list(data_integrations_gen)

    @typechecked
    def count_cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[DataIntegrationPlatform] = None,
        status: Optional[DataIntegrationStatus] = None,
        organization_id: Optional[str] = None,
    ) -> int:
        """Count and return the number of cloud storage integrations that match a set of criteria.

        Args:
            cloud_storage_integration_id: ID of the cloud storage integration.
            name: Name of the cloud storage integration.
            platform: Platform of the cloud storage integration.
            status: Status of the cloud storage integration.
            organization_id: ID of the organization.

        Returns:
            The number of cloud storage integrations that match the criteria.
        """
        return CloudStorageUseCases(self.kili_api_gateway).count_data_integrations(
            DataIntegrationFilters(
                status=status,
                id=(
                    DataIntegrationId(cloud_storage_integration_id)
                    if cloud_storage_integration_id is not None
                    else None
                ),
                name=name,
                platform=platform,
                organization_id=(
                    OrganizationId(organization_id) if organization_id is not None else None
                ),
            )
        )

    @typechecked
    def add_cloud_storage_connection(
        self,
        project_id: str,
        cloud_storage_integration_id: str,
        selected_folders: Optional[List[str]] = None,
    ) -> Dict:
        """Connect a cloud storage to a project.

        Args:
            project_id: Id of the project.
            cloud_storage_integration_id: Id of the cloud storage integration.
            selected_folders: List of folders of the data integration to connect to the project.
                If not provided, all folders of the data integration will be connected.

        Returns:
            A dict with the DataConnection Id.
        """
        data_connection_id = CloudStorageUseCases(self.kili_api_gateway).add_data_connection(
            project_id=ProjectId(project_id),
            data_integration_id=DataIntegrationId(cloud_storage_integration_id),
            selected_folders=selected_folders,
            fields=("id",),
        )["id"]

        return {"id": data_connection_id}

    @typechecked
    def synchronize_cloud_storage_connection(
        self,
        cloud_storage_connection_id: str,
        delete_extraneous_files: bool = False,
        dry_run: bool = False,
    ) -> Dict:
        """Synchronize a cloud storage connection.

        This method will compute differences between the cloud storage connection and the project,
            and then validate the differences.

        If `delete_extraneous_files` is True, it will also delete files that are not in the
            cloud storage integration anymore but that are still in the project.

        Args:
            cloud_storage_connection_id: Id of the cloud storage connection.
            delete_extraneous_files: If True, delete extraneous files.
            dry_run: If True, will not synchronize the data connection but only print the
                differences. This is useful to check the differences before applying them to the
                project.

        Returns:
            A dict with the cloud storage connection Id.
        """
        data_connection_id = DataConnectionId(cloud_storage_connection_id)

        cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)

        cloud_storage_use_cases.synchronize_data_connection(
            data_connection_id=data_connection_id,
            delete_extraneous_files=delete_extraneous_files,
            dry_run=dry_run,
            logger=logger,
        )

        return cloud_storage_use_cases.get_data_connection(
            data_connection_id=data_connection_id, fields=("numberOfAssets", "projectId")
        )

add_cloud_storage_connection(self, project_id, cloud_storage_integration_id, selected_folders=None)

Connect a cloud storage to a project.

Parameters:

Name Type Description Default
project_id str

Id of the project.

required
cloud_storage_integration_id str

Id of the cloud storage integration.

required
selected_folders Optional[List[str]]

List of folders of the data integration to connect to the project. If not provided, all folders of the data integration will be connected.

None

Returns:

Type Description
Dict

A dict with the DataConnection Id.

Source code in kili/presentation/client/cloud_storage.py
def add_cloud_storage_connection(
    self,
    project_id: str,
    cloud_storage_integration_id: str,
    selected_folders: Optional[List[str]] = None,
) -> Dict:
    """Connect a cloud storage to a project.

    Args:
        project_id: Id of the project.
        cloud_storage_integration_id: Id of the cloud storage integration.
        selected_folders: List of folders of the data integration to connect to the project.
            If not provided, all folders of the data integration will be connected.

    Returns:
        A dict with the DataConnection Id.
    """
    data_connection_id = CloudStorageUseCases(self.kili_api_gateway).add_data_connection(
        project_id=ProjectId(project_id),
        data_integration_id=DataIntegrationId(cloud_storage_integration_id),
        selected_folders=selected_folders,
        fields=("id",),
    )["id"]

    return {"id": data_connection_id}

cloud_storage_connections(self, cloud_storage_connection_id=None, cloud_storage_integration_id=None, project_id=None, fields=('id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId'), first=None, skip=0, disable_tqdm=None, *, as_generator=False)

Get a generator or a list of cloud storage connections that match a set of criteria.

Parameters:

Name Type Description Default
cloud_storage_connection_id Optional[str]

ID of the cloud storage connection.

None
cloud_storage_integration_id Optional[str]

ID of the cloud storage integration.

None
project_id Optional[str]

ID of the project.

None
fields Union[List[str], Tuple[str, ...]]

All the fields to request among the possible fields for the cloud storage connections. See the documentation for all possible fields.

('id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId')
first Optional[int]

Maximum number of cloud storage connections to return.

None
skip int

Number of skipped cloud storage connections.

0
disable_tqdm Optional[bool]

If True, the progress bar will be disabled.

None
as_generator bool

If True, a generator on the cloud storage connections is returned.

False

Returns:

Type Description
Iterable[Dict]

A list or a generator of the cloud storage connections that match the criteria.

Examples:

>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
Source code in kili/presentation/client/cloud_storage.py
def cloud_storage_connections(
    self,
    cloud_storage_connection_id: Optional[str] = None,
    cloud_storage_integration_id: Optional[str] = None,
    project_id: Optional[str] = None,
    fields: ListOrTuple[str] = (
        "id",
        "lastChecked",
        "numberOfAssets",
        "selectedFolders",
        "projectId",
    ),
    first: Optional[int] = None,
    skip: int = 0,
    disable_tqdm: Optional[bool] = None,
    *,
    as_generator: bool = False,
) -> Iterable[Dict]:
    # pylint: disable=line-too-long
    """Get a generator or a list of cloud storage connections that match a set of criteria.

    Args:
        cloud_storage_connection_id: ID of the cloud storage connection.
        cloud_storage_integration_id: ID of the cloud storage integration.
        project_id: ID of the project.
        fields: All the fields to request among the possible fields for the cloud storage connections.
            See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
        first: Maximum number of cloud storage connections to return.
        skip: Number of skipped cloud storage connections.
        disable_tqdm: If `True`, the progress bar will be disabled.
        as_generator: If `True`, a generator on the cloud storage connections is returned.

    Returns:
        A list or a generator of the cloud storage connections that match the criteria.

    Examples:
        >>> kili.cloud_storage_connections(project_id="789465123")
        [{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
    """
    if (
        cloud_storage_connection_id is None
        and cloud_storage_integration_id is None
        and project_id is None
    ):
        raise ValueError(
            "At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
            " project_id must be specified"
        )

    disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)

    cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)

    if cloud_storage_connection_id is None:
        data_connections_gen = cloud_storage_use_cases.list_data_connections(
            data_connection_filters=DataConnectionFilters(
                project_id=ProjectId(project_id) if project_id is not None else None,
                integration_id=(
                    DataIntegrationId(cloud_storage_integration_id)
                    if cloud_storage_integration_id is not None
                    else None
                ),
            ),
            fields=fields,
            options=QueryOptions(disable_tqdm, first, skip),
        )
    else:
        data_connections_gen = (
            i
            for i in [
                cloud_storage_use_cases.get_data_connection(
                    DataConnectionId(cloud_storage_connection_id), fields=fields
                )
            ]
        )

    if as_generator:
        return data_connections_gen
    return list(data_connections_gen)

cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None, fields=('name', 'id', 'platform', 'status'), first=None, skip=0, disable_tqdm=None, *, as_generator=False)

Get a generator or a list of cloud storage integrations that match a set of criteria.

Parameters:

Name Type Description Default
cloud_storage_integration_id Optional[str]

ID of the cloud storage integration.

None
name Optional[str]

Name of the cloud storage integration.

None
platform Optional[Literal['AWS', 'Azure', 'GCP']]

Platform of the cloud storage integration.

None
status Optional[Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']]

Status of the cloud storage integration.

None
organization_id Optional[str]

ID of the organization.

None
fields Union[List[str], Tuple[str, ...]]

All the fields to request among the possible fields for the cloud storage integrations. See the documentation for all possible fields.

('name', 'id', 'platform', 'status')
first Optional[int]

Maximum number of cloud storage integrations to return.

None
skip int

Number of skipped cloud storage integrations.

0
disable_tqdm Optional[bool]

If True, the progress bar will be disabled.

None
as_generator bool

If True, a generator on the cloud storage integrations is returned.

False

Returns:

Type Description
Iterable[Dict]

A list or a generator of the cloud storage integrations that match the criteria.

Examples:

>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
Source code in kili/presentation/client/cloud_storage.py
def cloud_storage_integrations(
    self,
    cloud_storage_integration_id: Optional[str] = None,
    name: Optional[str] = None,
    platform: Optional[DataIntegrationPlatform] = None,
    status: Optional[DataIntegrationStatus] = None,
    organization_id: Optional[str] = None,
    fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
    first: Optional[int] = None,
    skip: int = 0,
    disable_tqdm: Optional[bool] = None,
    *,
    as_generator: bool = False,
) -> Iterable[Dict]:
    # pylint: disable=line-too-long
    """Get a generator or a list of cloud storage integrations that match a set of criteria.

    Args:
        cloud_storage_integration_id: ID of the cloud storage integration.
        name: Name of the cloud storage integration.
        platform: Platform of the cloud storage integration.
        status: Status of the cloud storage integration.
        organization_id: ID of the organization.
        fields: All the fields to request among the possible fields for the cloud storage integrations.
            See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
        first: Maximum number of cloud storage integrations to return.
        skip: Number of skipped cloud storage integrations.
        disable_tqdm: If `True`, the progress bar will be disabled.
        as_generator: If `True`, a generator on the cloud storage integrations is returned.

    Returns:
        A list or a generator of the cloud storage integrations that match the criteria.

    Examples:
        >>> kili.cloud_storage_integrations()
        [{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
    """
    disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
    options = QueryOptions(disable_tqdm, first, skip)
    data_integrations_gen = CloudStorageUseCases(self.kili_api_gateway).list_data_integrations(
        data_integration_filters=DataIntegrationFilters(
            status=status,
            id=(
                DataIntegrationId(cloud_storage_integration_id)
                if cloud_storage_integration_id is not None
                else None
            ),
            name=name,
            platform=platform,
            organization_id=(
                OrganizationId(organization_id) if organization_id is not None else None
            ),
        ),
        fields=fields,
        options=options,
    )

    if as_generator:
        return data_integrations_gen
    return list(data_integrations_gen)

count_cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None)

Count and return the number of cloud storage integrations that match a set of criteria.

Parameters:

Name Type Description Default
cloud_storage_integration_id Optional[str]

ID of the cloud storage integration.

None
name Optional[str]

Name of the cloud storage integration.

None
platform Optional[Literal['AWS', 'Azure', 'GCP']]

Platform of the cloud storage integration.

None
status Optional[Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']]

Status of the cloud storage integration.

None
organization_id Optional[str]

ID of the organization.

None

Returns:

Type Description
int

The number of cloud storage integrations that match the criteria.

Source code in kili/presentation/client/cloud_storage.py
def count_cloud_storage_integrations(
    self,
    cloud_storage_integration_id: Optional[str] = None,
    name: Optional[str] = None,
    platform: Optional[DataIntegrationPlatform] = None,
    status: Optional[DataIntegrationStatus] = None,
    organization_id: Optional[str] = None,
) -> int:
    """Count and return the number of cloud storage integrations that match a set of criteria.

    Args:
        cloud_storage_integration_id: ID of the cloud storage integration.
        name: Name of the cloud storage integration.
        platform: Platform of the cloud storage integration.
        status: Status of the cloud storage integration.
        organization_id: ID of the organization.

    Returns:
        The number of cloud storage integrations that match the criteria.
    """
    return CloudStorageUseCases(self.kili_api_gateway).count_data_integrations(
        DataIntegrationFilters(
            status=status,
            id=(
                DataIntegrationId(cloud_storage_integration_id)
                if cloud_storage_integration_id is not None
                else None
            ),
            name=name,
            platform=platform,
            organization_id=(
                OrganizationId(organization_id) if organization_id is not None else None
            ),
        )
    )

synchronize_cloud_storage_connection(self, cloud_storage_connection_id, delete_extraneous_files=False, dry_run=False)

Synchronize a cloud storage connection.

This method will compute differences between the cloud storage connection and the project, and then validate the differences.

If delete_extraneous_files is True, it will also delete files that are not in the cloud storage integration anymore but that are still in the project.

Parameters:

Name Type Description Default
cloud_storage_connection_id str

Id of the cloud storage connection.

required
delete_extraneous_files bool

If True, delete extraneous files.

False
dry_run bool

If True, will not synchronize the data connection but only print the differences. This is useful to check the differences before applying them to the project.

False

Returns:

Type Description
Dict

A dict with the cloud storage connection Id.

Source code in kili/presentation/client/cloud_storage.py
def synchronize_cloud_storage_connection(
    self,
    cloud_storage_connection_id: str,
    delete_extraneous_files: bool = False,
    dry_run: bool = False,
) -> Dict:
    """Synchronize a cloud storage connection.

    This method will compute differences between the cloud storage connection and the project,
        and then validate the differences.

    If `delete_extraneous_files` is True, it will also delete files that are not in the
        cloud storage integration anymore but that are still in the project.

    Args:
        cloud_storage_connection_id: Id of the cloud storage connection.
        delete_extraneous_files: If True, delete extraneous files.
        dry_run: If True, will not synchronize the data connection but only print the
            differences. This is useful to check the differences before applying them to the
            project.

    Returns:
        A dict with the cloud storage connection Id.
    """
    data_connection_id = DataConnectionId(cloud_storage_connection_id)

    cloud_storage_use_cases = CloudStorageUseCases(self.kili_api_gateway)

    cloud_storage_use_cases.synchronize_data_connection(
        data_connection_id=data_connection_id,
        delete_extraneous_files=delete_extraneous_files,
        dry_run=dry_run,
        logger=logger,
    )

    return cloud_storage_use_cases.get_data_connection(
        data_connection_id=data_connection_id, fields=("numberOfAssets", "projectId")
    )