Skip to content

Cloud storage module

Alpha feature

The cloud storage feature is currently in alpha. It is still under active development: methods and behaviors can still evolve until the feature is complete.

Cloud Storage Integration and Connection

A cloud storage integration is a connection between a Kili organization and a cloud storage (AWS, GCP or Azure). Once a cloud storage integration is created, it can be used in any project of the organization. Adding a cloud storage integration from the SDK is currently not supported. More information about how to create a cloud storage integration can be found here.

A cloud storage connection is a cloud storage integration used in a Kili project. It is used to import data from a cloud storage to a project. More information about how to use a cloud storage integration in a project can be found here.

Azure

It is recommended to install the Azure dependencies to use the Azure cloud storage integration and connection.

pip install kili[azure]

Queries

Set of cloud storage integration queries.

Source code in kili/entrypoints/queries/data_integration/__init__.py
class QueriesDataIntegration(BaseOperationEntrypointMixin):
    """Set of cloud storage integration queries."""

    # pylint: disable=too-many-arguments

    @overload
    def cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
        status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
        organization_id: Optional[str] = None,
        fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: Literal[True],
    ) -> Generator[Dict, None, None]: ...

    @overload
    def cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
        status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
        organization_id: Optional[str] = None,
        fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: Literal[False] = False,
    ) -> List[Dict]: ...

    @typechecked
    def cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
        status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
        organization_id: Optional[str] = None,
        fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: bool = False,
    ) -> Iterable[Dict]:
        # pylint: disable=line-too-long
        """Get a generator or a list of cloud storage integrations that match a set of criteria.

        Args:
            cloud_storage_integration_id: ID of the cloud storage integration.
            name: Name of the cloud storage integration.
            platform: Platform of the cloud storage integration.
            status: Status of the cloud storage integration.
            organization_id: ID of the organization.
            fields: All the fields to request among the possible fields for the cloud storage integrations.
                See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
            first: Maximum number of cloud storage integrations to return.
            skip: Number of skipped cloud storage integrations.
            disable_tqdm: If `True`, the progress bar will be disabled.
            as_generator: If `True`, a generator on the cloud storage integrations is returned.

        Returns:
            A list or a generator of the cloud storage integrations that match the criteria.

        Examples:
            >>> kili.cloud_storage_integrations()
            [{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
        """
        where = DataIntegrationWhere(
            data_integration_id=cloud_storage_integration_id,
            name=name,
            platform=platform,
            status=status,
            organization_id=organization_id,
        )
        disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
        options = QueryOptions(disable_tqdm, first, skip)
        data_integrations_gen = DataIntegrationsQuery(self.graphql_client, self.http_client)(
            where, fields, options
        )

        if as_generator:
            return data_integrations_gen
        return list(data_integrations_gen)

    @typechecked
    def count_cloud_storage_integrations(
        self,
        cloud_storage_integration_id: Optional[str] = None,
        name: Optional[str] = None,
        platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
        status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
        organization_id: Optional[str] = None,
    ) -> int:
        """Count and return the number of cloud storage integrations that match a set of criteria.

        Args:
            cloud_storage_integration_id: ID of the cloud storage integration.
            name: Name of the cloud storage integration.
            platform: Platform of the cloud storage integration.
            status: Status of the cloud storage integration.
            organization_id: ID of the organization.

        Returns:
            The number of cloud storage integrations that match the criteria.
        """
        where = DataIntegrationWhere(
            data_integration_id=cloud_storage_integration_id,
            name=name,
            platform=platform,
            status=status,
            organization_id=organization_id,
        )
        return DataIntegrationsQuery(self.graphql_client, self.http_client).count(where)

cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None, fields=('name', 'id', 'platform', 'status'), first=None, skip=0, disable_tqdm=None, *, as_generator=False)

Get a generator or a list of cloud storage integrations that match a set of criteria.

Parameters:

Name Type Description Default
cloud_storage_integration_id Optional[str]

ID of the cloud storage integration.

None
name Optional[str]

Name of the cloud storage integration.

None
platform Optional[Literal['AWS', 'Azure', 'GCP']]

Platform of the cloud storage integration.

None
status Optional[Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']]

Status of the cloud storage integration.

None
organization_id Optional[str]

ID of the organization.

None
fields Union[List[str], Tuple[str, ...]]

All the fields to request among the possible fields for the cloud storage integrations. See the documentation for all possible fields.

('name', 'id', 'platform', 'status')
first Optional[int]

Maximum number of cloud storage integrations to return.

None
skip int

Number of skipped cloud storage integrations.

0
disable_tqdm Optional[bool]

If True, the progress bar will be disabled.

None
as_generator bool

If True, a generator on the cloud storage integrations is returned.

False

Returns:

Type Description
Iterable[Dict]

A list or a generator of the cloud storage integrations that match the criteria.

Examples:

>>> kili.cloud_storage_integrations()
[{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
Source code in kili/entrypoints/queries/data_integration/__init__.py
def cloud_storage_integrations(
    self,
    cloud_storage_integration_id: Optional[str] = None,
    name: Optional[str] = None,
    platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
    status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
    organization_id: Optional[str] = None,
    fields: ListOrTuple[str] = ("name", "id", "platform", "status"),
    first: Optional[int] = None,
    skip: int = 0,
    disable_tqdm: Optional[bool] = None,
    *,
    as_generator: bool = False,
) -> Iterable[Dict]:
    # pylint: disable=line-too-long
    """Get a generator or a list of cloud storage integrations that match a set of criteria.

    Args:
        cloud_storage_integration_id: ID of the cloud storage integration.
        name: Name of the cloud storage integration.
        platform: Platform of the cloud storage integration.
        status: Status of the cloud storage integration.
        organization_id: ID of the organization.
        fields: All the fields to request among the possible fields for the cloud storage integrations.
            See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataintegration) for all possible fields.
        first: Maximum number of cloud storage integrations to return.
        skip: Number of skipped cloud storage integrations.
        disable_tqdm: If `True`, the progress bar will be disabled.
        as_generator: If `True`, a generator on the cloud storage integrations is returned.

    Returns:
        A list or a generator of the cloud storage integrations that match the criteria.

    Examples:
        >>> kili.cloud_storage_integrations()
        [{'name': 'My bucket', 'id': '123456789', 'platform': 'AWS', 'status': 'CONNECTED'}]
    """
    where = DataIntegrationWhere(
        data_integration_id=cloud_storage_integration_id,
        name=name,
        platform=platform,
        status=status,
        organization_id=organization_id,
    )
    disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
    options = QueryOptions(disable_tqdm, first, skip)
    data_integrations_gen = DataIntegrationsQuery(self.graphql_client, self.http_client)(
        where, fields, options
    )

    if as_generator:
        return data_integrations_gen
    return list(data_integrations_gen)

count_cloud_storage_integrations(self, cloud_storage_integration_id=None, name=None, platform=None, status=None, organization_id=None)

Count and return the number of cloud storage integrations that match a set of criteria.

Parameters:

Name Type Description Default
cloud_storage_integration_id Optional[str]

ID of the cloud storage integration.

None
name Optional[str]

Name of the cloud storage integration.

None
platform Optional[Literal['AWS', 'Azure', 'GCP']]

Platform of the cloud storage integration.

None
status Optional[Literal['CONNECTED', 'DISCONNECTED', 'CHECKING']]

Status of the cloud storage integration.

None
organization_id Optional[str]

ID of the organization.

None

Returns:

Type Description
int

The number of cloud storage integrations that match the criteria.

Source code in kili/entrypoints/queries/data_integration/__init__.py
def count_cloud_storage_integrations(
    self,
    cloud_storage_integration_id: Optional[str] = None,
    name: Optional[str] = None,
    platform: Optional[Literal["AWS", "Azure", "GCP"]] = None,
    status: Optional[Literal["CONNECTED", "DISCONNECTED", "CHECKING"]] = None,
    organization_id: Optional[str] = None,
) -> int:
    """Count and return the number of cloud storage integrations that match a set of criteria.

    Args:
        cloud_storage_integration_id: ID of the cloud storage integration.
        name: Name of the cloud storage integration.
        platform: Platform of the cloud storage integration.
        status: Status of the cloud storage integration.
        organization_id: ID of the organization.

    Returns:
        The number of cloud storage integrations that match the criteria.
    """
    where = DataIntegrationWhere(
        data_integration_id=cloud_storage_integration_id,
        name=name,
        platform=platform,
        status=status,
        organization_id=organization_id,
    )
    return DataIntegrationsQuery(self.graphql_client, self.http_client).count(where)

Set of cloud storage connection queries.

Source code in kili/entrypoints/queries/data_connection/__init__.py
class QueriesDataConnection(BaseOperationEntrypointMixin):
    """Set of cloud storage connection queries."""

    # pylint: disable=too-many-arguments

    @overload
    def cloud_storage_connections(
        self,
        cloud_storage_connection_id: Optional[str] = None,
        cloud_storage_integration_id: Optional[str] = None,
        project_id: Optional[str] = None,
        fields: ListOrTuple[str] = (
            "id",
            "lastChecked",
            "numberOfAssets",
            "selectedFolders",
            "projectId",
        ),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: Literal[True],
    ) -> Generator[Dict, None, None]: ...

    @overload
    def cloud_storage_connections(
        self,
        cloud_storage_connection_id: Optional[str] = None,
        cloud_storage_integration_id: Optional[str] = None,
        project_id: Optional[str] = None,
        fields: ListOrTuple[str] = (
            "id",
            "lastChecked",
            "numberOfAssets",
            "selectedFolders",
            "projectId",
        ),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: Literal[False] = False,
    ) -> List[Dict]: ...

    @typechecked
    def cloud_storage_connections(
        self,
        cloud_storage_connection_id: Optional[str] = None,
        cloud_storage_integration_id: Optional[str] = None,
        project_id: Optional[str] = None,
        fields: ListOrTuple[str] = (
            "id",
            "lastChecked",
            "numberOfAssets",
            "selectedFolders",
            "projectId",
        ),
        first: Optional[int] = None,
        skip: int = 0,
        disable_tqdm: Optional[bool] = None,
        *,
        as_generator: bool = False,
    ) -> Iterable[Dict]:
        # pylint: disable=line-too-long
        """Get a generator or a list of cloud storage connections that match a set of criteria.

        Args:
            cloud_storage_connection_id: ID of the cloud storage connection.
            cloud_storage_integration_id: ID of the cloud storage integration.
            project_id: ID of the project.
            fields: All the fields to request among the possible fields for the cloud storage connections.
                See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
            first: Maximum number of cloud storage connections to return.
            skip: Number of skipped cloud storage connections.
            disable_tqdm: If `True`, the progress bar will be disabled.
            as_generator: If `True`, a generator on the cloud storage connections is returned.

        Returns:
            A list or a generator of the cloud storage connections that match the criteria.

        Examples:
            >>> kili.cloud_storage_connections(project_id="789465123")
            [{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
        """
        if (
            cloud_storage_connection_id is None
            and cloud_storage_integration_id is None
            and project_id is None
        ):
            raise ValueError(
                "At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
                " project_id must be specified"
            )

        # call dataConnection resolver
        if cloud_storage_connection_id is not None:
            data_connection = get_data_connection(self, cloud_storage_connection_id, fields)
            data_connection_list = [data_connection]
            if as_generator:
                return iter(data_connection_list)
            return data_connection_list

        # call dataConnections resolver
        where = DataConnectionsWhere(
            project_id=project_id, data_integration_id=cloud_storage_integration_id
        )
        disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
        options = QueryOptions(disable_tqdm, first, skip)
        data_connections_gen = DataConnectionsQuery(self.graphql_client, self.http_client)(
            where, fields, options
        )

        if as_generator:
            return data_connections_gen
        return list(data_connections_gen)

cloud_storage_connections(self, cloud_storage_connection_id=None, cloud_storage_integration_id=None, project_id=None, fields=('id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId'), first=None, skip=0, disable_tqdm=None, *, as_generator=False)

Get a generator or a list of cloud storage connections that match a set of criteria.

Parameters:

Name Type Description Default
cloud_storage_connection_id Optional[str]

ID of the cloud storage connection.

None
cloud_storage_integration_id Optional[str]

ID of the cloud storage integration.

None
project_id Optional[str]

ID of the project.

None
fields Union[List[str], Tuple[str, ...]]

All the fields to request among the possible fields for the cloud storage connections. See the documentation for all possible fields.

('id', 'lastChecked', 'numberOfAssets', 'selectedFolders', 'projectId')
first Optional[int]

Maximum number of cloud storage connections to return.

None
skip int

Number of skipped cloud storage connections.

0
disable_tqdm Optional[bool]

If True, the progress bar will be disabled.

None
as_generator bool

If True, a generator on the cloud storage connections is returned.

False

Returns:

Type Description
Iterable[Dict]

A list or a generator of the cloud storage connections that match the criteria.

Examples:

>>> kili.cloud_storage_connections(project_id="789465123")
[{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
Source code in kili/entrypoints/queries/data_connection/__init__.py
def cloud_storage_connections(
    self,
    cloud_storage_connection_id: Optional[str] = None,
    cloud_storage_integration_id: Optional[str] = None,
    project_id: Optional[str] = None,
    fields: ListOrTuple[str] = (
        "id",
        "lastChecked",
        "numberOfAssets",
        "selectedFolders",
        "projectId",
    ),
    first: Optional[int] = None,
    skip: int = 0,
    disable_tqdm: Optional[bool] = None,
    *,
    as_generator: bool = False,
) -> Iterable[Dict]:
    # pylint: disable=line-too-long
    """Get a generator or a list of cloud storage connections that match a set of criteria.

    Args:
        cloud_storage_connection_id: ID of the cloud storage connection.
        cloud_storage_integration_id: ID of the cloud storage integration.
        project_id: ID of the project.
        fields: All the fields to request among the possible fields for the cloud storage connections.
            See [the documentation](https://docs.kili-technology.com/reference/graphql-api#dataconnection) for all possible fields.
        first: Maximum number of cloud storage connections to return.
        skip: Number of skipped cloud storage connections.
        disable_tqdm: If `True`, the progress bar will be disabled.
        as_generator: If `True`, a generator on the cloud storage connections is returned.

    Returns:
        A list or a generator of the cloud storage connections that match the criteria.

    Examples:
        >>> kili.cloud_storage_connections(project_id="789465123")
        [{'id': '123456789', 'lastChecked': '2023-02-21T14:49:35.606Z', 'numberOfAssets': 42, 'selectedFolders': ['folder1', 'folder2'], 'projectId': '789465123'}]
    """
    if (
        cloud_storage_connection_id is None
        and cloud_storage_integration_id is None
        and project_id is None
    ):
        raise ValueError(
            "At least one of cloud_storage_connection_id, cloud_storage_integration_id or"
            " project_id must be specified"
        )

    # call dataConnection resolver
    if cloud_storage_connection_id is not None:
        data_connection = get_data_connection(self, cloud_storage_connection_id, fields)
        data_connection_list = [data_connection]
        if as_generator:
            return iter(data_connection_list)
        return data_connection_list

    # call dataConnections resolver
    where = DataConnectionsWhere(
        project_id=project_id, data_integration_id=cloud_storage_integration_id
    )
    disable_tqdm = disable_tqdm_if_as_generator(as_generator, disable_tqdm)
    options = QueryOptions(disable_tqdm, first, skip)
    data_connections_gen = DataConnectionsQuery(self.graphql_client, self.http_client)(
        where, fields, options
    )

    if as_generator:
        return data_connections_gen
    return list(data_connections_gen)

Mutations

Set of DataConnection mutations.

Source code in kili/entrypoints/mutations/data_connection/__init__.py
class MutationsDataConnection(BaseOperationEntrypointMixin):
    """Set of DataConnection mutations."""

    @typechecked
    def add_cloud_storage_connection(
        self,
        project_id: str,
        cloud_storage_integration_id: str,
        selected_folders: Optional[List[str]] = None,
    ) -> Dict:
        """Connect a cloud storage to a project.

        Args:
            project_id: Id of the project.
            cloud_storage_integration_id: Id of the cloud storage integration.
            selected_folders: List of folders of the data integration to connect to the project.
                If not provided, all folders of the data integration will be connected.

        Returns:
            A dict with the DataConnection Id.
        """
        data_integrations = list(
            DataIntegrationsQuery(self.graphql_client, self.http_client)(
                where=DataIntegrationWhere(data_integration_id=cloud_storage_integration_id),
                fields=["id"],
                options=QueryOptions(disable_tqdm=True, first=1, skip=0),
            )
        )
        if len(data_integrations) == 0:
            raise ValueError(
                f"Cloud storage integration with id {cloud_storage_integration_id} not found."
            )

        variables = {
            "data": {
                "projectId": project_id,
                "integrationId": cloud_storage_integration_id,
                "isChecking": False,
                "lastChecked": datetime.now().isoformat(sep="T", timespec="milliseconds") + "Z",
                "selectedFolders": selected_folders,
            }
        }
        result = self.graphql_client.execute(GQL_ADD_PROJECT_DATA_CONNECTION, variables)
        result = self.format_result("data", result)

        # We trigger data difference computation (same behavior as in the frontend)
        data_connection_service.compute_differences(self, result["id"])

        return result

    @typechecked
    def synchronize_cloud_storage_connection(
        self,
        cloud_storage_connection_id: str,
        delete_extraneous_files: bool = False,
        dry_run: bool = False,
    ) -> Dict:
        """Synchronize a cloud storage connection.

        This method will compute differences between the cloud storage connection and the project,
            and then validate the differences.

        If `delete_extraneous_files` is True, it will also delete files that are not in the
            cloud storage integration anymore but that are still in the project.

        Args:
            cloud_storage_connection_id: Id of the cloud storage connection.
            delete_extraneous_files: If True, delete extraneous files.
            dry_run: If True, will not synchronize the data connection but only print the
                differences. This is useful to check the differences before applying them to the
                project.

        Returns:
            A dict with the cloud storage connection Id.
        """
        return data_connection_service.synchronize_data_connection(
            self, cloud_storage_connection_id, delete_extraneous_files, dry_run
        )

add_cloud_storage_connection(self, project_id, cloud_storage_integration_id, selected_folders=None)

Connect a cloud storage to a project.

Parameters:

Name Type Description Default
project_id str

Id of the project.

required
cloud_storage_integration_id str

Id of the cloud storage integration.

required
selected_folders Optional[List[str]]

List of folders of the data integration to connect to the project. If not provided, all folders of the data integration will be connected.

None

Returns:

Type Description
Dict

A dict with the DataConnection Id.

Source code in kili/entrypoints/mutations/data_connection/__init__.py
def add_cloud_storage_connection(
    self,
    project_id: str,
    cloud_storage_integration_id: str,
    selected_folders: Optional[List[str]] = None,
) -> Dict:
    """Connect a cloud storage to a project.

    Args:
        project_id: Id of the project.
        cloud_storage_integration_id: Id of the cloud storage integration.
        selected_folders: List of folders of the data integration to connect to the project.
            If not provided, all folders of the data integration will be connected.

    Returns:
        A dict with the DataConnection Id.
    """
    data_integrations = list(
        DataIntegrationsQuery(self.graphql_client, self.http_client)(
            where=DataIntegrationWhere(data_integration_id=cloud_storage_integration_id),
            fields=["id"],
            options=QueryOptions(disable_tqdm=True, first=1, skip=0),
        )
    )
    if len(data_integrations) == 0:
        raise ValueError(
            f"Cloud storage integration with id {cloud_storage_integration_id} not found."
        )

    variables = {
        "data": {
            "projectId": project_id,
            "integrationId": cloud_storage_integration_id,
            "isChecking": False,
            "lastChecked": datetime.now().isoformat(sep="T", timespec="milliseconds") + "Z",
            "selectedFolders": selected_folders,
        }
    }
    result = self.graphql_client.execute(GQL_ADD_PROJECT_DATA_CONNECTION, variables)
    result = self.format_result("data", result)

    # We trigger data difference computation (same behavior as in the frontend)
    data_connection_service.compute_differences(self, result["id"])

    return result

synchronize_cloud_storage_connection(self, cloud_storage_connection_id, delete_extraneous_files=False, dry_run=False)

Synchronize a cloud storage connection.

This method will compute differences between the cloud storage connection and the project, and then validate the differences.

If delete_extraneous_files is True, it will also delete files that are not in the cloud storage integration anymore but that are still in the project.

Parameters:

Name Type Description Default
cloud_storage_connection_id str

Id of the cloud storage connection.

required
delete_extraneous_files bool

If True, delete extraneous files.

False
dry_run bool

If True, will not synchronize the data connection but only print the differences. This is useful to check the differences before applying them to the project.

False

Returns:

Type Description
Dict

A dict with the cloud storage connection Id.

Source code in kili/entrypoints/mutations/data_connection/__init__.py
def synchronize_cloud_storage_connection(
    self,
    cloud_storage_connection_id: str,
    delete_extraneous_files: bool = False,
    dry_run: bool = False,
) -> Dict:
    """Synchronize a cloud storage connection.

    This method will compute differences between the cloud storage connection and the project,
        and then validate the differences.

    If `delete_extraneous_files` is True, it will also delete files that are not in the
        cloud storage integration anymore but that are still in the project.

    Args:
        cloud_storage_connection_id: Id of the cloud storage connection.
        delete_extraneous_files: If True, delete extraneous files.
        dry_run: If True, will not synchronize the data connection but only print the
            differences. This is useful to check the differences before applying them to the
            project.

    Returns:
        A dict with the cloud storage connection Id.
    """
    return data_connection_service.synchronize_data_connection(
        self, cloud_storage_connection_id, delete_extraneous_files, dry_run
    )