Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airbyte_cdk/legacy/sources/declarative/declarative_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

from typing_extensions import deprecated

from airbyte_cdk.legacy.sources.declarative.incremental import (
GlobalSubstreamCursor,
PerPartitionCursor,
Expand All @@ -13,7 +15,6 @@
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
Expand All @@ -28,6 +29,7 @@
from airbyte_cdk.sources.types import Config, StreamSlice


@deprecated("DeclarativeStream has been deprecated in favor of the concurrent DefaultStream")
@dataclass
class DeclarativeStream(Stream):
"""
Expand Down Expand Up @@ -198,8 +200,6 @@ def state_checkpoint_interval(self) -> Optional[int]:
return None

def get_cursor(self) -> Optional[Cursor]:
if self.retriever and isinstance(self.retriever, SimpleRetriever):
return self.retriever.cursor
return None

def _get_checkpoint_reader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@
from airbyte_cdk.connector_builder.models import (
LogMessage as ConnectorBuilderLogMessage,
)
from airbyte_cdk.legacy.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.legacy.sources.declarative.incremental import (
DatetimeBasedCursor,
)
from airbyte_cdk.models import (
AirbyteStateBlob,
AirbyteStateMessage,
Expand Down Expand Up @@ -735,7 +731,6 @@ def _init_mappings(self) -> None:
CustomTransformationModel: self.create_custom_component,
CustomValidationStrategyModel: self.create_custom_component,
CustomConfigTransformationModel: self.create_custom_component,
DatetimeBasedCursorModel: self.create_datetime_based_cursor,
DeclarativeStreamModel: self.create_default_stream,
DefaultErrorHandlerModel: self.create_default_error_handler,
DefaultPaginatorModel: self.create_default_paginator,
Expand All @@ -758,7 +753,6 @@ def _init_mappings(self) -> None:
FlattenFieldsModel: self.create_flatten_fields,
DpathFlattenFieldsModel: self.create_dpath_flatten_fields,
IterableDecoderModel: self.create_iterable_decoder,
IncrementingCountCursorModel: self.create_incrementing_count_cursor,
XmlDecoderModel: self.create_xml_decoder,
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
DynamicSchemaLoaderModel: self.create_dynamic_schema_loader,
Expand Down Expand Up @@ -1926,64 +1920,6 @@ def _create_nested_component(
def _is_component(model_value: Any) -> bool:
return isinstance(model_value, dict) and model_value.get("type") is not None

def create_datetime_based_cursor(
self, model: DatetimeBasedCursorModel, config: Config, **kwargs: Any
) -> DatetimeBasedCursor:
start_datetime: Union[str, MinMaxDatetime] = (
model.start_datetime
if isinstance(model.start_datetime, str)
else self.create_min_max_datetime(model.start_datetime, config)
)
end_datetime: Union[str, MinMaxDatetime, None] = None
if model.is_data_feed and model.end_datetime:
raise ValueError("Data feed does not support end_datetime")
if model.is_data_feed and model.is_client_side_incremental:
raise ValueError(
"`Client side incremental` cannot be applied with `data feed`. Choose only 1 from them."
)
if model.end_datetime:
end_datetime = (
model.end_datetime
if isinstance(model.end_datetime, str)
else self.create_min_max_datetime(model.end_datetime, config)
)

end_time_option = (
self._create_component_from_model(
model.end_time_option, config, parameters=model.parameters or {}
)
if model.end_time_option
else None
)
start_time_option = (
self._create_component_from_model(
model.start_time_option, config, parameters=model.parameters or {}
)
if model.start_time_option
else None
)

return DatetimeBasedCursor(
cursor_field=model.cursor_field,
cursor_datetime_formats=model.cursor_datetime_formats
if model.cursor_datetime_formats
else [],
cursor_granularity=model.cursor_granularity,
datetime_format=model.datetime_format,
end_datetime=end_datetime,
start_datetime=start_datetime,
step=model.step,
end_time_option=end_time_option,
lookback_window=model.lookback_window,
start_time_option=start_time_option,
partition_field_end=model.partition_field_end,
partition_field_start=model.partition_field_start,
message_repository=self._message_repository,
is_compare_strictly=model.is_compare_strictly,
config=config,
parameters=model.parameters or {},
)

def create_default_stream(
self, model: DeclarativeStreamModel, config: Config, is_parent: bool = False, **kwargs: Any
) -> AbstractStream:
Expand Down Expand Up @@ -2647,24 +2583,6 @@ def create_gzip_decoder(
fallback_parser=gzip_parser.inner_parser,
)

# todo: This method should be removed once we deprecate the SimpleRetriever.cursor field and the various
# state methods
@staticmethod
def create_incrementing_count_cursor(
model: IncrementingCountCursorModel, config: Config, **kwargs: Any
) -> DatetimeBasedCursor:
# This should not actually get used anywhere at runtime, but needed to add this to pass checks since
# we still parse models into components. The issue is that there's no runtime implementation of a
# IncrementingCountCursor.
# A known and expected issue with this stub is running a check with the declared IncrementingCountCursor because it is run without ConcurrentCursor.
return DatetimeBasedCursor(
cursor_field=model.cursor_field,
datetime_format="%Y-%m-%d",
start_datetime="2024-12-12",
config=config,
parameters={},
)

@staticmethod
def create_iterable_decoder(
model: IterableDecoderModel, config: Config, **kwargs: Any
Expand Down Expand Up @@ -3392,7 +3310,6 @@ def _get_url(req: Requester) -> str:
record_selector=record_selector,
stream_slicer=_NO_STREAM_SLICING,
request_option_provider=request_options_provider,
cursor=None,
config=config,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
parameters=model.parameters or {},
Expand All @@ -3413,7 +3330,6 @@ def _get_url(req: Requester) -> str:
record_selector=record_selector,
stream_slicer=_NO_STREAM_SLICING,
request_option_provider=request_options_provider,
cursor=None,
config=config,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
additional_query_properties=query_properties,
Expand Down Expand Up @@ -3506,7 +3422,7 @@ def create_state_delegating_stream(
config: Config,
has_parent_state: Optional[bool] = None,
**kwargs: Any,
) -> DeclarativeStream:
) -> DefaultStream:
if (
model.full_refresh_stream.name != model.name
or model.name != model.incremental_stream.name
Expand Down
32 changes: 2 additions & 30 deletions airbyte_cdk/sources/declarative/retrievers/async_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
from airbyte_cdk.sources.types import Config, StreamSlice
from airbyte_cdk.sources.utils.slice_logger import AlwaysLogSliceLogger


Expand Down Expand Up @@ -59,30 +59,6 @@ def exit_on_rate_limit(self, value: bool) -> None:
if job_orchestrator is not None:
job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment]

@property
def state(self) -> StreamState:
"""
As a first iteration for sendgrid, there is no state to be managed
"""
return {}

@state.setter
def state(self, value: StreamState) -> None:
"""
As a first iteration for sendgrid, there is no state to be managed
"""
pass

def _get_stream_state(self) -> StreamState:
"""
Gets the current state of the stream.

Returns:
StreamState: Mapping[str, Any]
"""

return self.state

def _validate_and_get_stream_slice_jobs(
self, stream_slice: Optional[StreamSlice] = None
) -> Iterable[AsyncJob]:
Expand All @@ -101,9 +77,6 @@ def _validate_and_get_stream_slice_jobs(
"""
return stream_slice.extra_fields.get("jobs", []) if stream_slice else []

def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
yield from self.stream_slicer.stream_slices()

def read_records(
self,
records_schema: Mapping[str, Any],
Expand All @@ -112,13 +85,12 @@ def read_records(
# emit the slice_descriptor log message, for connector builder TestRead
yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore

stream_state: StreamState = self._get_stream_state()
jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)

yield from self.record_selector.filter_and_transform(
all_data=records,
stream_state=stream_state,
stream_state={}, # stream_state as an interpolation context is deprecated
records_schema=records_schema,
stream_slice=stream_slice,
)
27 changes: 13 additions & 14 deletions airbyte_cdk/sources/declarative/retrievers/retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,25 @@ def read_records(
@abstractmethod
@deprecated("Stream slicing is being moved to the stream level.")
def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
"""Returns the stream slices"""
"""Does nothing as this method is deprecated, so underlying Retriever implementations
do not need to implement this.
"""
yield from []
Comment on lines 33 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Potential inconsistency between @abstractmethod decorator and docstring.

The docstring says "underlying Retriever implementations do not need to implement this", but the @abstractmethod decorator (line 33) means that concrete subclasses must override this method or they cannot be instantiated. This seems contradictory.

Since the state property had its @abstractmethod removed (lines 41-48, 50-57), should stream_slices also have the decorator removed to match the deprecation pattern? Or if we want to keep it abstract, maybe update the docstring to clarify that subclasses should provide a minimal override that calls the parent implementation?

Something like:

@deprecated("Stream slicing is being moved to the stream level.")
def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
    """Deprecated no-op implementation. Subclasses should provide a minimal override 
    that yields from the parent or returns an empty iterable.
    """
    yield from []

wdyt?



---

🏁 Script executed:

```shell
#!/bin/bash
# Inspect the retriever.py file to see the full context of both stream_slices and state
fd -t f "retriever.py" --path "*sources/declarative/retrievers*" | head -5

Length of output: 233


🏁 Script executed:

#!/bin/bash
# Get lines around stream_slices and state property to compare their decorators
rg -n -B5 -A10 "def stream_slices|@property" "airbyte_cdk/sources/declarative/retrievers/retriever.py" | head -80

Length of output: 839


🏁 Script executed:

#!/bin/bash
# Check if there are any subclasses of Retriever that override stream_slices
rg -n "class.*Retriever" "airbyte_cdk/sources/declarative/retrievers/" --type py | head -20

Length of output: 384


🏁 Script executed:

#!/bin/bash
# Search for implementations of stream_slices in subclasses
rg -n -B3 -A8 "def stream_slices" "airbyte_cdk/sources/declarative/retrievers/" --type py

Length of output: 1152



---

🏁 Script executed:

```shell
#!/bin/bash
# Check if Retriever uses ABC and class definition
rg -n -B2 -A2 "^class Retriever|from abc|import ABC" "airbyte_cdk/sources/declarative/retrievers/retriever.py"

Length of output: 214


🏁 Script executed:

#!/bin/bash
# Check if AsyncRetriever and SimpleRetriever can instantiate (look for __init__ or direct instantiation patterns)
rg -n -B5 -A15 "^class AsyncRetriever|^class SimpleRetriever" "airbyte_cdk/sources/declarative/retrievers/async_retriever.py" "airbyte_cdk/sources/declarative/retrievers/simple_retriever.py" | head -60

Length of output: 4285


🏁 Script executed:

#!/bin/bash
# Search for any attempt to override stream_slices in the entire codebase (not just retrievers dir)
rg -n "def stream_slices" --type py | grep -v "retriever.py:35"

Length of output: 3258


Remove the @abstractmethod decorator to match the deprecation pattern used for state.

You've spotted a real inconsistency! The Retriever class doesn't inherit from ABC (it's a plain class at line 14), which means the @abstractmethod decorator on stream_slices is essentially decorative—it doesn't enforce anything. This makes it contradictory alongside the docstring saying implementations "do not need to implement this."

The state property already demonstrates the correct pattern for deprecation: it uses @property and @deprecated without @abstractmethod, and neither AsyncRetriever nor SimpleRetriever override it. Stream slicing should follow the same approach.

Removing @abstractmethod from stream_slices would:

  • Eliminate the misleading decorator from a non-ABC class
  • Align with how state is handled
  • Keep the no-op implementation clear without false enforcement signals

Wdyt—does removing the decorator make sense here?

🤖 Prompt for AI Agents
airbyte_cdk/sources/declarative/retrievers/retriever.py around lines 33 to 39:
the stream_slices method is decorated with @abstractmethod even though Retriever
is not an ABC and the method is intended to be a deprecated no-op; remove the
@abstractmethod decorator so the method mirrors the deprecated state property
pattern, leaving @deprecated and the generator implementation (yield from [])
intact to signal no enforcement is required.


@property
@abstractmethod
@deprecated("State management is being moved to the stream level.")
def state(self) -> StreamState:
"""State getter, should return state in form that can serialized to a string and send to the output
as a STATE AirbyteMessage.

A good example of a state is a cursor_value:
{
self.cursor_field: "cursor_value"
}

State should try to be as small as possible but at the same time descriptive enough to restore
syncing process from the point where it stopped.
"""
Does nothing as this method is deprecated, so underlying Retriever implementations
do not need to implement this.
"""
return {}

@state.setter
@abstractmethod
@deprecated("State management is being moved to the stream level.")
def state(self, value: StreamState) -> None:
"""State setter, accept state serialized by state getter."""
"""
Does nothing as this method is deprecated, so underlying Retriever implementations
do not need to implement this.
"""
pass
Loading
Loading