Skip to content

Cloud storage module

Alpha feature

The cloud storage feature is currently in alpha. It is still under active development: methods and behaviors can still evolve until the feature is complete.

Cloud Storage Integration and Connection

A cloud storage integration is a connection between a Kili organization and a cloud storage (AWS, GCP or Azure). Once a cloud storage integration is created, it can be used in any project of the organization. Adding a cloud storage integration from the SDK is currently not supported. More information about how to create a cloud storage integration can be found here.

A cloud storage connection is a cloud storage integration used in a Kili project. It is used to import data from a cloud storage to a project. More information about how to use a cloud storage integration in a project can be found here.

Queries

Set of cloud storage integration queries.

Source code in kili/entrypoints/queries/data_integration/__init__.py
class QueriesDataIntegration:
    """Set of cloud storage integration queries."""

    # pylint: disable=too-many-arguments,dangerous-default-value

    def __init__(self, auth: KiliAuth):
        """Initialize the subclass.

        Args:
            auth: KiliAuth object
        """
        self.auth = auth

    @overload
    def cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
        status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
        organization_id: Optional[str] = None,
        fields: List[str] = ["name", "id", "platform", "status"],
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: bool = False,
        *,
        as_generator: Literal[True],
    ) -> Generator[Dict, None, None]:
        ...

    @overload
    def cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
        status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
        organization_id: Optional[str] = None,
        fields: List[str] = ["name", "id", "platform", "status"],
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: bool = False,
        *,
        as_generator: Literal[False] = False,
    ) -> List[Dict]:
        ...

    @typechecked
    def cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
        status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
        organization_id: Optional[str] = None,
        fields: List[str] = ["name", "id", "platform", "status"],
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: bool = False,
        *,
        as_generator: bool = False,
    ) -> Iterable[Dict]:
        # pylint: disable=line-too-long
        """Get a generator or a list of cloud storage integrations that match a set of criteria.

        Args:
            cloud_storage_integration_id: ID of the cloud storage integration.
            name: Name of the cloud storage integration.
            platform: Platform of the cloud storage integration.
            status: Status of the cloud storage integration.
            organization_id: ID of the organization.
            fields: All the fields to request among the possible fields for the cloud storage integrations.
                See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
            first: Maximum number of cloud storage integrations to return.
            skip: Number of skipped cloud storage integrations.
            disable_tqdm: If `True`, the progress bar will be disabled.
            as_generator: If `True`, a generator on the cloud storage integrations is returned.

        Returns:
            A list or a generator of the cloud storage integrations that match the criteria.

        Examples:
            >>> kili.cloud_storage_integrations()
            [{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
        """
        where = DataIntegrationWhere(
            data_integration_id=cloud_storage_integration_id,
            name=name,
            platform=platform,
            status=status,
            organization_id=organization_id,
        )
        disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
        options = QueryOptions(disable_tqdm, first, skip)
        data_integrations_gen = DataIntegrationsQuery(self.auth.client)(where, fields, options)

        if as_generator:
            return data_integrations_gen
        return list(data_integrations_gen)

    @typechecked
    def count_cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
        status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
        organization_id: Optional[str] = None,
    ) -> int:
        """Count and return the number of cloud storage integrations that match a set of criteria.

        Args:
            cloud_storage_integration_id: ID of the cloud storage integration.
            name: Name of the cloud storage integration.
            platform: Platform of the cloud storage integration.
            status: Status of the cloud storage integration.
            organization_id: ID of the organization.

        Returns:
            The number of cloud storage integrations that match the criteria.
        """
        where = DataIntegrationWhere(
            data_integration_id=cloud_storage_integration_id,
            name=name,
            platform=platform,
            status=status,
            organization_id=organization_id,
        )
        return DataIntegrationsQuery(self.auth.client).count(where)

cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None, fields=['name', 'id', 'platform', 'status'], first=None, skip=0, disable_tqdm=False, *, as_generator=False)

Get a generator or a list of cloud storage integrations that match a set of criteria.

Parameters:

Name Type Description Default
cloud_storage_integration_id Optional[str]

ID of the cloud storage integration.

None
name Optional[str]

Name of the cloud storage integration.

None
platform Optional[typing_extensions.Literal['AWS', 'Azure', 'GCP']]

Platform of the cloud storage integration.

None
status Optional[typing_extensions.Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']]

Status of the cloud storage integration.

None
organization_id Optional[str]

ID of the organization.

None
fields List[str]

All the fields to request among the possible fields for the cloud storage integrations. See the documentation for all possible fields.

['name', 'id', 'platform', 'status']
first Optional[int]

Maximum number of cloud storage integrations to return.

None
skip int

Number of skipped cloud storage integrations.

0
disable_tqdm bool

If True, the progress bar will be disabled.

False
as_generator bool

If True, a generator on the cloud storage integrations is returned.

False

Returns:

Type Description
Iterable[Dict]

A list or a generator of the cloud storage integrations that match the criteria.

Examples:

>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
Source code in kili/entrypoints/queries/data_integration/__init__.py
@typechecked
def cloud_storage_integrations(
    self,
    cloud_storage_integration_id: Optional[str] = None,
    name: Optional[str] = None,
    platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
    status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
    organization_id: Optional[str] = None,
    fields: List[str] = ["name", "id", "platform", "status"],
    first: Optional[int] = None,
    skip: int = 0,
    disable_tqdm: bool = False,
    *,
    as_generator: bool = False,
) -> Iterable[Dict]:
    # pylint: disable=line-too-long
    """Get a generator or a list of cloud storage integrations that match a set of criteria.

    Args:
        cloud_storage_integration_id: ID of the cloud storage integration.
        name: Name of the cloud storage integration.
        platform: Platform of the cloud storage integration.
        status: Status of the cloud storage integration.
        organization_id: ID of the organization.
        fields: All the fields to request among the possible fields for the cloud storage integrations.
            See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
        first: Maximum number of cloud storage integrations to return.
        skip: Number of skipped cloud storage integrations.
        disable_tqdm: If `True`, the progress bar will be disabled.
        as_generator: If `True`, a generator on the cloud storage integrations is returned.

    Returns:
        A list or a generator of the cloud storage integrations that match the criteria.

    Examples:
        >>> kili.cloud_storage_integrations()
        [{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
    """
    where = DataIntegrationWhere(
        data_integration_id=cloud_storage_integration_id,
        name=name,
        platform=platform,
        status=status,
        organization_id=organization_id,
    )
    disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
    options = QueryOptions(disable_tqdm, first, skip)
    data_integrations_gen = DataIntegrationsQuery(self.auth.client)(where, fields, options)

    if as_generator:
        return data_integrations_gen
    return list(data_integrations_gen)

count_cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None)

Count and return the number of cloud storage integrations that match a set of criteria.

Parameters:

Name Type Description Default
cloud_storage_integration_id Optional[str]

ID of the cloud storage integration.

None
name Optional[str]

Name of the cloud storage integration.

None
platform Optional[typing_extensions.Literal['AWS', 'Azure', 'GCP']]

Platform of the cloud storage integration.

None
status Optional[typing_extensions.Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']]

Status of the cloud storage integration.

None
organization_id Optional[str]

ID of the organization.

None

Returns:

Type Description
int

The number of cloud storage integrations that match the criteria.

Source code in kili/entrypoints/queries/data_integration/__init__.py
@typechecked
def count_cloud_storage_integrations(
    self,
    cloud_storage_integration_id: Optional[str] = None,
    name: Optional[str] = None,
    platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
    status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
    organization_id: Optional[str] = None,
) -> int:
    """Count and return the number of cloud storage integrations that match a set of criteria.

    Args:
        cloud_storage_integration_id: ID of the cloud storage integration.
        name: Name of the cloud storage integration.
        platform: Platform of the cloud storage integration.
        status: Status of the cloud storage integration.
        organization_id: ID of the organization.

    Returns:
        The number of cloud storage integrations that match the criteria.
    """
    where = DataIntegrationWhere(
        data_integration_id=cloud_storage_integration_id,
        name=name,
        platform=platform,
        status=status,
        organization_id=organization_id,
    )
    return DataIntegrationsQuery(self.auth.client).count(where)

Set of cloud storage connection queries.

Source code in kili/entrypoints/queries/data_connection/__init__.py
class QueriesDataConnection:
    """Set of cloud storage connection queries."""

    # pylint: disable=too-many-arguments,dangerous-default-value

    def __init__(self, auth: KiliAuth):
        """Initialize the subclass.

        Args:
            auth: KiliAuth object
        """
        self.auth = auth

    @overload
    def cloud_storage_connections(
        self,
        cloud_storage_connection_id: Optional[str] = None,
        cloud_storage_integration_id: Optional[str] = None,
        project_id: Optional[str] = None,
        fields: List[str] = [
            "id",
            "lastChecked",
            "numberOfAssets",
            "selectedFolders",
            "projectId",
        ],
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: bool = False,
        *,
        as_generator: Literal[True],
    ) -> Generator[Dict, None, None]:
        ...

    @overload
    def cloud_storage_connections(
        self,
        cloud_storage_connection_id: Optional[str] = None,
        cloud_storage_integration_id: Optional[str] = None,
        project_id: Optional[str] = None,
        fields: List[str] = [
            "id",
            "lastChecked",
            "numberOfAssets",
            "selectedFolders",
            "projectId",
        ],
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: bool = False,
        *,
        as_generator: Literal[False] = False,
    ) -> List[Dict]:
        ...

    @typechecked
    def cloud_storage_connections(
        self,
        cloud_storage_connection_id: Optional[str] = None,
        cloud_storage_integration_id: Optional[str] = None,
        project_id: Optional[str] = None,
        fields: List[str] = [
            "id",
            "lastChecked",
            "numberOfAssets",
            "selectedFolders",
            "projectId",
        ],
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: bool = False,
        *,
        as_generator: bool = False,
    ) -> Iterable[Dict]:
        # pylint: disable=line-too-long
        """Get a generator or a list of cloud storage connections that match a set of criteria.

        Args:
            cloud_storage_connection_id: ID of the cloud storage connection.
            cloud_storage_integration_id: ID of the cloud storage integration.
            project_id: ID of the project.
            fields: All the fields to request among the possible fields for the cloud storage connections.
                See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
            first: Maximum number of cloud storage connections to return.
            skip: Number of skipped cloud storage connections.
            disable_tqdm: If `True`, the progress bar will be disabled.
            as_generator: If `True`, a generator on the cloud storage connections is returned.

        Returns:
            A list or a generator of the cloud storage connections that match the criteria.

        Examples:
            >>> kili.cloud_storage_connections(project_id="789465123")
            [{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
        """
        if (
            cloud_storage_connection_id is None
            and cloud_storage_integration_id is None
            and project_id is None
        ):
            raise ValueError(
                "At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
                " project_id must be specified"
            )

        # call dataConnection resolver
        if cloud_storage_connection_id is not None:
            data_connection = services.get_data_connection(
                self.auth, cloud_storage_connection_id, fields
            )
            data_connection_list = [data_connection]
            if as_generator:
                return iter(data_connection_list)
            return data_connection_list

        # call dataConnections resolver
        where = DataConnectionsWhere(
            project_id=project_id, data_integration_id=cloud_storage_integration_id
        )
        disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
        options = QueryOptions(disable_tqdm, first, skip)
        data_connections_gen = DataConnectionsQuery(self.auth.client)(where, fields, options)

        if as_generator:
            return data_connections_gen
        return list(data_connections_gen)

cloud_storage_connections(self, cloud_storage_connection_id=None, cloud_storage_integration_id=None, project_id=None, fields=['id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId'], first=None, skip=0, disable_tqdm=False, *, as_generator=False)

Get a generator or a list of cloud storage connections that match a set of criteria.

Parameters:

Name Type Description Default
cloud_storage_connection_id Optional[str]

ID of the cloud storage connection.

None
cloud_storage_integration_id Optional[str]

ID of the cloud storage integration.

None
project_id Optional[str]

ID of the project.

None
fields List[str]

All the fields to request among the possible fields for the cloud storage connections. See the documentation for all possible fields.

['id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId']
first Optional[int]

Maximum number of cloud storage connections to return.

None
skip int

Number of skipped cloud storage connections.

0
disable_tqdm bool

If True, the progress bar will be disabled.

False
as_generator bool

If True, a generator on the cloud storage connections is returned.

False

Returns:

Type Description
Iterable[Dict]

A list or a generator of the cloud storage connections that match the criteria.

Examples:

>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
Source code in kili/entrypoints/queries/data_connection/__init__.py
@typechecked
def cloud_storage_connections(
    self,
    cloud_storage_connection_id: Optional[str] = None,
    cloud_storage_integration_id: Optional[str] = None,
    project_id: Optional[str] = None,
    fields: List[str] = [
        "id",
        "lastChecked",
        "numberOfAssets",
        "selectedFolders",
        "projectId",
    ],
    first: Optional[int] = None,
    skip: int = 0,
    disable_tqdm: bool = False,
    *,
    as_generator: bool = False,
) -> Iterable[Dict]:
    # pylint: disable=line-too-long
    """Get a generator or a list of cloud storage connections that match a set of criteria.

    Args:
        cloud_storage_connection_id: ID of the cloud storage connection.
        cloud_storage_integration_id: ID of the cloud storage integration.
        project_id: ID of the project.
        fields: All the fields to request among the possible fields for the cloud storage connections.
            See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
        first: Maximum number of cloud storage connections to return.
        skip: Number of skipped cloud storage connections.
        disable_tqdm: If `True`, the progress bar will be disabled.
        as_generator: If `True`, a generator on the cloud storage connections is returned.

    Returns:
        A list or a generator of the cloud storage connections that match the criteria.

    Examples:
        >>> kili.cloud_storage_connections(project_id="789465123")
        [{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
    """
    if (
        cloud_storage_connection_id is None
        and cloud_storage_integration_id is None
        and project_id is None
    ):
        raise ValueError(
            "At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
            " project_id must be specified"
        )

    # call dataConnection resolver
    if cloud_storage_connection_id is not None:
        data_connection = services.get_data_connection(
            self.auth, cloud_storage_connection_id, fields
        )
        data_connection_list = [data_connection]
        if as_generator:
            return iter(data_connection_list)
        return data_connection_list

    # call dataConnections resolver
    where = DataConnectionsWhere(
        project_id=project_id, data_integration_id=cloud_storage_integration_id
    )
    disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
    options = QueryOptions(disable_tqdm, first, skip)
    data_connections_gen = DataConnectionsQuery(self.auth.client)(where, fields, options)

    if as_generator:
        return data_connections_gen
    return list(data_connections_gen)

Mutations

Set of DataConnection mutations.

Source code in kili/entrypoints/mutations/data_connection/__init__.py
class MutationsDataConnection:
    """Set of DataConnection mutations."""

    def __init__(self, auth: KiliAuth):
        """Initializes the subclass.

        Args:
            auth: KiliAuth object
        """
        self.auth = auth

    @typechecked
    def add_cloud_storage_connection(
        self,
        project_id: str,
        cloud_storage_integration_id: str,
        selected_folders: Optional[List[str]] = None,
    ) -> Dict:
        """Connect a cloud storage to a project.

        Args:
            project_id: ID of the project.
            cloud_storage_integration_id: ID of the cloud storage integration.
            selected_folders: List of folders of the data integration to connect to the project.
                If not provided, all folders of the data integration will be connected.

        Returns:
            A dict with the DataConnection ID.
        """
        if selected_folders is None:
            variables = {"dataIntegrationId": cloud_storage_integration_id}
            try:
                result = self.auth.client.execute(
                    GQL_GET_DATA_INTEGRATION_FOLDER_AND_SUBFOLDERS, variables=variables
                )
            except GraphQLError as err:
                raise AddDataConnectionError(
                    f"The data integration with id {cloud_storage_integration_id} is not supported"
                    " in the SDK yet. Use the Kili app to create a data connection instead."
                ) from err
            result = format_result("data", result)
            selected_folders = [folder["key"] for folder in result]

        variables = {
            "data": {
                "projectId": project_id,
                "integrationId": cloud_storage_integration_id,
                "isChecking": False,
                "lastChecked": datetime.now().isoformat(sep="T", timespec="milliseconds") + "Z",
                "selectedFolders": selected_folders,
            }
        }
        result = self.auth.client.execute(GQL_ADD_PROJECT_DATA_CONNECTION, variables)
        result = format_result("data", result)

        # We trigger data difference computation (same behavior as in the frontend)
        services.compute_differences(self.auth, result["id"])

        return result

    @typechecked
    def synchronize_cloud_storage_connection(
        self,
        cloud_storage_connection_id: str,
        delete_extraneous_files: bool = False,
    ) -> Dict:
        """Synchronize a cloud storage connection.

        This method will compute differences between the cloud storage connection and the project
            and then validate the differences.

        If `delete_extraneous_files` is True, it will also delete files that are not in the
            cloud storage integration anymore but that are still in the project.

        Args:
            cloud_storage_connection_id: ID of the cloud storage connection.
            delete_extraneous_files: If True, delete extraneous files.

        Returns:
            A dict with the DataConnection ID.
        """
        return services.synchronize_data_connection(
            self.auth, cloud_storage_connection_id, delete_extraneous_files
        )

add_cloud_storage_connection(self, project_id, cloud_storage_integration_id, selected_folders=None)

Connect a cloud storage to a project.

Parameters:

Name Type Description Default
project_id str

ID of the project.

required
cloud_storage_integration_id str

ID of the cloud storage integration.

required
selected_folders Optional[List[str]]

List of folders of the data integration to connect to the project. If not provided, all folders of the data integration will be connected.

None

Returns:

Type Description
Dict

A dict with the DataConnection ID.

Source code in kili/entrypoints/mutations/data_connection/__init__.py
@typechecked
def add_cloud_storage_connection(
    self,
    project_id: str,
    cloud_storage_integration_id: str,
    selected_folders: Optional[List[str]] = None,
) -> Dict:
    """Connect a cloud storage to a project.

    Args:
        project_id: ID of the project.
        cloud_storage_integration_id: ID of the cloud storage integration.
        selected_folders: List of folders of the data integration to connect to the project.
            If not provided, all folders of the data integration will be connected.

    Returns:
        A dict with the DataConnection ID.
    """
    if selected_folders is None:
        variables = {"dataIntegrationId": cloud_storage_integration_id}
        try:
            result = self.auth.client.execute(
                GQL_GET_DATA_INTEGRATION_FOLDER_AND_SUBFOLDERS, variables=variables
            )
        except GraphQLError as err:
            raise AddDataConnectionError(
                f"The data integration with id {cloud_storage_integration_id} is not supported"
                " in the SDK yet. Use the Kili app to create a data connection instead."
            ) from err
        result = format_result("data", result)
        selected_folders = [folder["key"] for folder in result]

    variables = {
        "data": {
            "projectId": project_id,
            "integrationId": cloud_storage_integration_id,
            "isChecking": False,
            "lastChecked": datetime.now().isoformat(sep="T", timespec="milliseconds") + "Z",
            "selectedFolders": selected_folders,
        }
    }
    result = self.auth.client.execute(GQL_ADD_PROJECT_DATA_CONNECTION, variables)
    result = format_result("data", result)

    # We trigger data difference computation (same behavior as in the frontend)
    services.compute_differences(self.auth, result["id"])

    return result

synchronize_cloud_storage_connection(self, cloud_storage_connection_id, delete_extraneous_files=False)

Synchronize a cloud storage connection.

This method will compute differences between the cloud storage connection and the project and then validate the differences.

If delete_extraneous_files is True, it will also delete files that are not in the cloud storage integration anymore but that are still in the project.

Parameters:

Name Type Description Default
cloud_storage_connection_id str

ID of the cloud storage connection.

required
delete_extraneous_files bool

If True, delete extraneous files.

False

Returns:

Type Description
Dict

A dict with the DataConnection ID.

Source code in kili/entrypoints/mutations/data_connection/__init__.py
@typechecked
def synchronize_cloud_storage_connection(
    self,
    cloud_storage_connection_id: str,
    delete_extraneous_files: bool = False,
) -> Dict:
    """Synchronize a cloud storage connection.

    This method will compute differences between the cloud storage connection and the project
        and then validate the differences.

    If `delete_extraneous_files` is True, it will also delete files that are not in the
        cloud storage integration anymore but that are still in the project.

    Args:
        cloud_storage_connection_id: ID of the cloud storage connection.
        delete_extraneous_files: If True, delete extraneous files.

    Returns:
        A dict with the DataConnection ID.
    """
    return services.synchronize_data_connection(
        self.auth, cloud_storage_connection_id, delete_extraneous_files
    )