From cb9beef998ca6ac3cf5fced23239667b3aec4575 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Tue, 4 Nov 2025 17:48:01 -0800 Subject: [PATCH 1/2] Remove cursor and RFR from the simple retriever and streamline classes --- .../parsers/model_to_component_factory.py | 86 +------ .../declarative/retrievers/retriever.py | 22 +- .../retrievers/simple_retriever.py | 162 +------------ .../test_manifest_declarative_source.py | 17 +- .../test_model_to_component_factory.py | 100 -------- .../retrievers/test_simple_retriever.py | 229 ++---------------- .../test_concurrent_declarative_source.py | 20 +- 7 files changed, 62 insertions(+), 574 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index fdaf26bba..aecaf73dc 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -34,10 +34,6 @@ from airbyte_cdk.connector_builder.models import ( LogMessage as ConnectorBuilderLogMessage, ) -from airbyte_cdk.legacy.sources.declarative.declarative_stream import DeclarativeStream -from airbyte_cdk.legacy.sources.declarative.incremental import ( - DatetimeBasedCursor, -) from airbyte_cdk.models import ( AirbyteStateBlob, AirbyteStateMessage, @@ -735,7 +731,6 @@ def _init_mappings(self) -> None: CustomTransformationModel: self.create_custom_component, CustomValidationStrategyModel: self.create_custom_component, CustomConfigTransformationModel: self.create_custom_component, - DatetimeBasedCursorModel: self.create_datetime_based_cursor, DeclarativeStreamModel: self.create_default_stream, DefaultErrorHandlerModel: self.create_default_error_handler, DefaultPaginatorModel: self.create_default_paginator, @@ -758,7 +753,6 @@ def _init_mappings(self) -> None: FlattenFieldsModel: self.create_flatten_fields, DpathFlattenFieldsModel: self.create_dpath_flatten_fields, IterableDecoderModel: self.create_iterable_decoder, - IncrementingCountCursorModel: self.create_incrementing_count_cursor, XmlDecoderModel: self.create_xml_decoder, JsonFileSchemaLoaderModel: self.create_json_file_schema_loader, DynamicSchemaLoaderModel: self.create_dynamic_schema_loader, @@ -1926,64 +1920,6 @@ def _create_nested_component( def _is_component(model_value: Any) -> bool: return isinstance(model_value, dict) and model_value.get("type") is not None - def create_datetime_based_cursor( - self, model: DatetimeBasedCursorModel, config: Config, **kwargs: Any - ) -> DatetimeBasedCursor: - start_datetime: Union[str, MinMaxDatetime] = ( - model.start_datetime - if isinstance(model.start_datetime, str) - else self.create_min_max_datetime(model.start_datetime, config) - ) - end_datetime: Union[str, MinMaxDatetime, None] = None - if model.is_data_feed and model.end_datetime: - raise ValueError("Data feed does not support end_datetime") - if model.is_data_feed and model.is_client_side_incremental: - raise ValueError( - "`Client side incremental` cannot be applied with `data feed`. Choose only 1 from them." - ) - if model.end_datetime: - end_datetime = ( - model.end_datetime - if isinstance(model.end_datetime, str) - else self.create_min_max_datetime(model.end_datetime, config) - ) - - end_time_option = ( - self._create_component_from_model( - model.end_time_option, config, parameters=model.parameters or {} - ) - if model.end_time_option - else None - ) - start_time_option = ( - self._create_component_from_model( - model.start_time_option, config, parameters=model.parameters or {} - ) - if model.start_time_option - else None - ) - - return DatetimeBasedCursor( - cursor_field=model.cursor_field, - cursor_datetime_formats=model.cursor_datetime_formats - if model.cursor_datetime_formats - else [], - cursor_granularity=model.cursor_granularity, - datetime_format=model.datetime_format, - end_datetime=end_datetime, - start_datetime=start_datetime, - step=model.step, - end_time_option=end_time_option, - lookback_window=model.lookback_window, - start_time_option=start_time_option, - partition_field_end=model.partition_field_end, - partition_field_start=model.partition_field_start, - message_repository=self._message_repository, - is_compare_strictly=model.is_compare_strictly, - config=config, - parameters=model.parameters or {}, - ) - def create_default_stream( self, model: DeclarativeStreamModel, config: Config, is_parent: bool = False, **kwargs: Any ) -> AbstractStream: @@ -2647,24 +2583,6 @@ def create_gzip_decoder( fallback_parser=gzip_parser.inner_parser, ) - # todo: This method should be removed once we deprecate the SimpleRetriever.cursor field and the various - # state methods - @staticmethod - def create_incrementing_count_cursor( - model: IncrementingCountCursorModel, config: Config, **kwargs: Any - ) -> DatetimeBasedCursor: - # This should not actually get used anywhere at runtime, but needed to add this to pass checks since - # we still parse models into components. The issue is that there's no runtime implementation of a - # IncrementingCountCursor. - # A known and expected issue with this stub is running a check with the declared IncrementingCountCursor because it is run without ConcurrentCursor. - return DatetimeBasedCursor( - cursor_field=model.cursor_field, - datetime_format="%Y-%m-%d", - start_datetime="2024-12-12", - config=config, - parameters={}, - ) - @staticmethod def create_iterable_decoder( model: IterableDecoderModel, config: Config, **kwargs: Any @@ -3392,7 +3310,6 @@ def _get_url(req: Requester) -> str: record_selector=record_selector, stream_slicer=_NO_STREAM_SLICING, request_option_provider=request_options_provider, - cursor=None, config=config, ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, parameters=model.parameters or {}, @@ -3413,7 +3330,6 @@ def _get_url(req: Requester) -> str: record_selector=record_selector, stream_slicer=_NO_STREAM_SLICING, request_option_provider=request_options_provider, - cursor=None, config=config, ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, additional_query_properties=query_properties, @@ -3506,7 +3422,7 @@ def create_state_delegating_stream( config: Config, has_parent_state: Optional[bool] = None, **kwargs: Any, - ) -> DeclarativeStream: + ) -> DefaultStream: if ( model.full_refresh_stream.name != model.name or model.name != model.incremental_stream.name diff --git a/airbyte_cdk/sources/declarative/retrievers/retriever.py b/airbyte_cdk/sources/declarative/retrievers/retriever.py index a4cce901c..a5641596b 100644 --- a/airbyte_cdk/sources/declarative/retrievers/retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/retriever.py @@ -36,23 +36,19 @@ def stream_slices(self) -> Iterable[Optional[StreamSlice]]: """Returns the stream slices""" @property - @abstractmethod @deprecated("State management is being moved to the stream level.") def state(self) -> StreamState: - """State getter, should return state in form that can serialized to a string and send to the output - as a STATE AirbyteMessage. - - A good example of a state is a cursor_value: - { - self.cursor_field: "cursor_value" - } - - State should try to be as small as possible but at the same time descriptive enough to restore - syncing process from the point where it stopped. """ + Does nothing as this method is deprecated, so underlying Retriever implementations + do not need to implement this. + """ + return {} @state.setter - @abstractmethod @deprecated("State management is being moved to the stream level.") def state(self, value: StreamState) -> None: - """State setter, accept state serialized by state getter.""" + """ + Does nothing as this method is deprecated, so underlying Retriever implementations + do not need to implement this. + """ + pass diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index a30574107..8b15db404 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -23,9 +23,6 @@ import requests from typing_extensions import deprecated -from airbyte_cdk.legacy.sources.declarative.incremental import ResumableFullRefreshCursor -from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor -from airbyte_cdk.models import AirbyteMessage from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import ( @@ -43,12 +40,11 @@ from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer from airbyte_cdk.sources.source import ExperimentalClassWarning -from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.streams.http.pagination_reset_exception import ( PaginationResetRequiredException, ) -from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState +from airbyte_cdk.sources.types import Config, Record, StreamSlice from airbyte_cdk.utils.mapping_helpers import combine_mappings FULL_REFRESH_SYNC_COMPLETE_KEY = "__ab_full_refresh_sync_complete" @@ -75,7 +71,6 @@ class SimpleRetriever(Retriever): record_selector (HttpSelector): The record selector paginator (Optional[Paginator]): The paginator stream_slicer (Optional[StreamSlicer]): The stream slicer - cursor (Optional[cursor]): The cursor parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation """ @@ -94,7 +89,6 @@ class SimpleRetriever(Retriever): request_option_provider: RequestOptionsProvider = field( default_factory=lambda: DefaultRequestOptionsProvider(parameters={}) ) - cursor: Optional[DeclarativeCursor] = None ignore_stream_slicer_parameters_on_paginated_requests: bool = False additional_query_properties: Optional[QueryProperties] = None log_formatter: Optional[Callable[[requests.Response], Any]] = None @@ -103,9 +97,6 @@ class SimpleRetriever(Retriever): ) def __post_init__(self, parameters: Mapping[str, Any]) -> None: - # while changing `ModelToComponentFactory.create_simple_retriever` to accept a cursor, the sources implementing - # a CustomRetriever inheriting for SimpleRetriever needed to have the following validation added. - self.cursor = None if isinstance(self.cursor, Cursor) else self.cursor self._paginator = self.paginator or NoPagination(parameters=parameters) self._parameters = parameters self._name = ( @@ -144,7 +135,6 @@ def _get_mapping( def _get_request_options( self, - stream_state: Optional[StreamData], stream_slice: Optional[StreamSlice], next_page_token: Optional[Mapping[str, Any]], paginator_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], @@ -155,8 +145,6 @@ def _get_request_options( Raise a ValueError if there's a key collision Returned merged mapping otherwise """ - # FIXME we should eventually remove the usage of stream_state as part of the interpolation - is_body_json = paginator_method.__name__ == "get_request_body_json" mappings = [ @@ -176,7 +164,6 @@ def _get_request_options( def _request_headers( self, - stream_state: Optional[StreamData] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: @@ -185,7 +172,6 @@ def _request_headers( Authentication headers will overwrite any overlapping headers returned from this method. """ headers = self._get_request_options( - stream_state, stream_slice, next_page_token, self._paginator.get_request_headers, @@ -197,7 +183,6 @@ def _request_headers( def _request_params( self, - stream_state: Optional[StreamData] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: @@ -207,7 +192,6 @@ def _request_params( E.g: you might want to define query parameters for paging if next_page_token is not None. """ params = self._get_request_options( - stream_state, stream_slice, next_page_token, self._paginator.get_request_params, @@ -219,7 +203,6 @@ def _request_params( def _request_body_data( self, - stream_state: Optional[StreamData] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Union[Mapping[str, Any], str]: @@ -233,7 +216,6 @@ def _request_body_data( At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. """ return self._get_request_options( - stream_state, stream_slice, next_page_token, self._paginator.get_request_body_data, @@ -242,7 +224,6 @@ def _request_body_data( def _request_body_json( self, - stream_state: Optional[StreamData] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping[str, Any]]: @@ -252,7 +233,6 @@ def _request_body_json( At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. """ body_json = self._get_request_options( - stream_state, stream_slice, next_page_token, self._paginator.get_request_body_json, @@ -265,7 +245,6 @@ def _request_body_json( def _paginator_path( self, next_page_token: Optional[Mapping[str, Any]] = None, - stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[StreamSlice] = None, ) -> Optional[str]: """ @@ -275,14 +254,13 @@ def _paginator_path( """ return self._paginator.path( next_page_token=next_page_token, - stream_state=stream_state, + stream_state={}, # stream_state as an interpolation context is deprecated stream_slice=stream_slice, ) def _parse_response( self, response: Optional[requests.Response], - stream_state: StreamState, records_schema: Mapping[str, Any], stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, @@ -292,7 +270,7 @@ def _parse_response( else: yield from self.record_selector.select_records( response=response, - stream_state=stream_state, + stream_state={}, # stream_state as an interpolation context is deprecated records_schema=records_schema, stream_slice=stream_slice, next_page_token=next_page_token, @@ -331,36 +309,30 @@ def _next_page_token( def _fetch_next_page( self, - stream_state: Mapping[str, Any], stream_slice: StreamSlice, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[requests.Response]: return self.requester.send_request( path=self._paginator_path( next_page_token=next_page_token, - stream_state=stream_state, stream_slice=stream_slice, ), - stream_state=stream_state, + stream_state={}, # stream_state as an interpolation context is deprecated stream_slice=stream_slice, next_page_token=next_page_token, request_headers=self._request_headers( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, ), request_params=self._request_params( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, ), request_body_data=self._request_body_data( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, ), request_body_json=self._request_body_json( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, ), @@ -371,7 +343,6 @@ def _fetch_next_page( def _read_pages( self, records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], - stream_state: Mapping[str, Any], stream_slice: StreamSlice, ) -> Iterable[Record]: pagination_tracker = self.pagination_tracker_factory() @@ -393,9 +364,7 @@ def _read_pages( cursor_slice=stream_slice.cursor_slice or {}, extra_fields={"query_properties": properties}, ) - response = self._fetch_next_page( - stream_state, stream_slice, next_page_token - ) + response = self._fetch_next_page(stream_slice, next_page_token) for current_record in records_generator_fn(response): if self.additional_query_properties.property_chunking: @@ -425,7 +394,7 @@ def _read_pages( last_record = record yield record else: - response = self._fetch_next_page(stream_state, stream_slice, next_page_token) + response = self._fetch_next_page(stream_slice, next_page_token) for current_record in records_generator_fn(response): pagination_tracker.observe(current_record) last_page_size += 1 @@ -466,49 +435,6 @@ def _get_initial_next_page_token(self) -> Optional[Mapping[str, Any]]: next_page_token = {"next_page_token": initial_token} if initial_token is not None else None return next_page_token - def _read_single_page( - self, - records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], - stream_state: Mapping[str, Any], - stream_slice: StreamSlice, - ) -> Iterable[StreamData]: - initial_token = stream_state.get("next_page_token") - if initial_token is None: - initial_token = self._paginator.get_initial_token() - next_page_token: Optional[Mapping[str, Any]] = ( - {"next_page_token": initial_token} if initial_token else None - ) - - response = self._fetch_next_page(stream_state, stream_slice, next_page_token) - - last_page_size = 0 - last_record: Optional[Record] = None - for record in records_generator_fn(response): - last_page_size += 1 - last_record = record - yield record - - if not response: - next_page_token = {FULL_REFRESH_SYNC_COMPLETE_KEY: True} - else: - last_page_token_value = ( - next_page_token.get("next_page_token") if next_page_token else None - ) - next_page_token = self._next_page_token( - response=response, - last_page_size=last_page_size, - last_record=last_record, - last_page_token_value=last_page_token_value, - ) or {FULL_REFRESH_SYNC_COMPLETE_KEY: True} - - if self.cursor: - self.cursor.close_slice( - StreamSlice(cursor_slice=next_page_token, partition=stream_slice.partition) - ) - - # Always return an empty generator just in case no records were ever yielded - yield from [] - def read_records( self, records_schema: Mapping[str, Any], @@ -526,95 +452,36 @@ def read_records( record_generator = partial( self._parse_records, stream_slice=stream_slice, - stream_state=self.state or {}, records_schema=records_schema, ) + yield from self._read_pages(record_generator, _slice) - if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): - stream_state = self.state - - # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to - # fetch more records. The platform deletes stream state for full refresh streams before starting a - # new job, so we don't need to worry about this value existing for the initial attempt - if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): - return - - yield from self._read_single_page(record_generator, stream_state, _slice) - else: - for stream_data in self._read_pages(record_generator, self.state, _slice): - current_record = self._extract_record(stream_data, _slice) - if self.cursor and current_record: - self.cursor.observe(_slice, current_record) - - yield stream_data - - if self.cursor: - self.cursor.close_slice(_slice) - return - - # FIXME based on the comment above in SimpleRetriever.read_records, it seems like we can tackle https://github.com/airbytehq/airbyte-internal-issues/issues/6955 and remove this - - def _extract_record( - self, stream_data: StreamData, stream_slice: StreamSlice - ) -> Optional[Record]: - """ - As we allow the output of _read_pages to be StreamData, it can be multiple things. Therefore, we need to filter out and normalize - to data to streamline the rest of the process. - """ - if isinstance(stream_data, Record): - # Record is not part of `StreamData` but is the most common implementation of `Mapping[str, Any]` which is part of `StreamData` - return stream_data - elif isinstance(stream_data, (dict, Mapping)): - return Record( - data=dict(stream_data), associated_slice=stream_slice, stream_name=self.name - ) - elif isinstance(stream_data, AirbyteMessage) and stream_data.record: - return Record( - data=stream_data.record.data, # type:ignore # AirbyteMessage always has record.data - associated_slice=stream_slice, - stream_name=self.name, - ) - return None - - # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore """ Specifies the slices for this stream. See the stream slicing section of the docs for more information. :param sync_mode: :param cursor_field: - :param stream_state: :return: """ return self.stream_slicer.stream_slices() # todo: There are a number of things that can be cleaned up when we remove self.cursor and all the related # SimpleRetriever state management that is handled by the concurrent CDK Framework: - # - ModelToComponentFactory.create_datetime_based_cursor() should be removed since it does need to be instantiated - # - ModelToComponentFactory.create_incrementing_count_cursor() should be removed since it's a placeholder - # - test_simple_retriever.py: Remove all imports and usages of legacy cursor components - # - test_model_to_component_factory.py:test_datetime_based_cursor() test can be removed - @property - def state(self) -> Mapping[str, Any]: - return self.cursor.get_stream_state() if self.cursor else {} - - @state.setter - def state(self, value: StreamState) -> None: - """State setter, accept state serialized by state getter.""" - if self.cursor: - self.cursor.set_initial_state(value) + # - DONE ModelToComponentFactory.create_datetime_based_cursor() should be removed since it does need to be instantiated + # - DONE ModelToComponentFactory.create_incrementing_count_cursor() should be removed since it's a placeholder + # - DONE test_simple_retriever.py: Remove all imports and usages of legacy cursor components + # - DONE test_model_to_component_factory.py:test_datetime_based_cursor() test can be removed def _parse_records( self, response: Optional[requests.Response], - stream_state: Mapping[str, Any], records_schema: Mapping[str, Any], stream_slice: Optional[StreamSlice], ) -> Iterable[Record]: yield from self._parse_response( response, stream_slice=stream_slice, - stream_state=stream_state, records_schema=records_schema, ) @@ -660,7 +527,6 @@ class LazySimpleRetriever(SimpleRetriever): def _read_pages( self, records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], - stream_state: Mapping[str, Any], stream_slice: StreamSlice, ) -> Iterable[Record]: response = stream_slice.extra_fields["child_response"] @@ -676,26 +542,24 @@ def _read_pages( yield from self._paginate( next_page_token, records_generator_fn, - stream_state, stream_slice, ) yield from [] else: - yield from self._read_pages(records_generator_fn, stream_state, stream_slice) + yield from self._read_pages(records_generator_fn, stream_slice) def _paginate( self, next_page_token: Any, records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], - stream_state: Mapping[str, Any], stream_slice: StreamSlice, ) -> Iterable[Record]: """Handle pagination by fetching subsequent pages.""" pagination_complete = False while not pagination_complete: - response = self._fetch_next_page(stream_state, stream_slice, next_page_token) + response = self._fetch_next_page(stream_slice, next_page_token) last_page_size, last_record = 0, None for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func diff --git a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py index 87e8574bb..3dc271bc7 100644 --- a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py @@ -1564,7 +1564,7 @@ def _create_page(response_body): ) * 10, [{"ABC": 0}, {"AED": 1}], - [call({}, {}, None)], + [call({}, None)], ), ( "test_read_manifest_with_added_fields", @@ -1651,7 +1651,7 @@ def _create_page(response_body): {"ABC": 0, "added_field_key": "added_field_value"}, {"AED": 1, "added_field_key": "added_field_value"}, ], - [call({}, {}, None)], + [call({}, None)], ), ( "test_read_manifest_with_flatten_fields", @@ -1735,7 +1735,7 @@ def _create_page(response_body): {"ABC": 0, "id": 1}, {"AED": 1, "id": 2}, ], - [call({}, {}, None)], + [call({}, None)], ), ( "test_read_with_pagination_no_partitions", @@ -1822,9 +1822,8 @@ def _create_page(response_body): * 10, [{"ABC": 0}, {"AED": 1}, {"USD": 2}], [ - call({}, {}, None), + call({}, None), call( - {}, {}, {"next_page_token": "next"}, ), @@ -1913,9 +1912,8 @@ def _create_page(response_body): ), [{"ABC": 0, "partition": 0}, {"AED": 1, "partition": 0}, {"ABC": 2, "partition": 1}], [ - call({}, {"partition": "0"}, None), + call({"partition": "0"}, None), call( - {}, {"partition": "1"}, None, ), @@ -2021,10 +2019,9 @@ def _create_page(response_body): {"ABC": 2, "partition": 1}, ], [ - call({}, {"partition": "0"}, None), - call({}, {"partition": "0"}, {"next_page_token": "next"}), + call({"partition": "0"}, None), + call({"partition": "0"}, {"next_page_token": "next"}), call( - {}, {"partition": "1"}, None, ), diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 591d47ae6..4ae6b4039 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -775,76 +775,6 @@ def test_create_substream_partition_router(): assert partition_router.parent_stream_configs[1].request_option is None -# todo: delete this class once we deprecate SimpleRetriever.cursor and SimpleRetriever.state methods -def test_datetime_based_cursor(): - content = """ - incremental: - type: DatetimeBasedCursor - $parameters: - datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" - start_datetime: - type: MinMaxDatetime - datetime: "{{ config['start_time'] }}" - min_datetime: "{{ config['start_time'] + day_delta(2) }}" - end_datetime: "{{ config['end_time'] }}" - step: "P10D" - cursor_field: "created" - cursor_granularity: "PT0.000001S" - lookback_window: "P5D" - start_time_option: - type: RequestOption - inject_into: request_parameter - field_name: "since_{{ config['cursor_field'] }}" - end_time_option: - type: RequestOption - inject_into: body_json - field_path: ["before_{{ parameters['cursor_field'] }}"] - partition_field_start: star - partition_field_end: en - """ - parsed_manifest = YamlDeclarativeSource._parse(content) - resolved_manifest = resolver.preprocess_manifest(parsed_manifest) - slicer_manifest = transformer.propagate_types_and_parameters( - "", resolved_manifest["incremental"], {"cursor_field": "created_at"} - ) - - stream_slicer = factory.create_component( - model_type=DatetimeBasedCursorModel, - component_definition=slicer_manifest, - config=input_config, - ) - - assert isinstance(stream_slicer, DatetimeBasedCursor) - assert stream_slicer._step == timedelta(days=10) - assert stream_slicer.cursor_field.string == "created" - assert stream_slicer.cursor_granularity == "PT0.000001S" - assert stream_slicer._lookback_window.string == "P5D" - assert stream_slicer.start_time_option.inject_into == RequestOptionType.request_parameter - assert ( - stream_slicer.start_time_option.field_name.eval( - config=input_config | {"cursor_field": "updated_at"} - ) - == "since_updated_at" - ) - assert stream_slicer.end_time_option.inject_into == RequestOptionType.body_json - assert [field.eval({}) for field in stream_slicer.end_time_option.field_path] == [ - "before_created_at" - ] - assert stream_slicer._partition_field_start.eval({}) == "star" - assert stream_slicer._partition_field_end.eval({}) == "en" - - assert isinstance(stream_slicer._start_datetime, MinMaxDatetime) - assert stream_slicer.start_datetime._datetime_format == "%Y-%m-%dT%H:%M:%S.%f%z" - assert stream_slicer.start_datetime.datetime.string == "{{ config['start_time'] }}" - assert ( - stream_slicer.start_datetime.min_datetime.string - == "{{ config['start_time'] + day_delta(2) }}" - ) - - assert isinstance(stream_slicer._end_datetime, MinMaxDatetime) - assert stream_slicer._end_datetime.datetime.string == "{{ config['end_time'] }}" - - def test_stream_with_incremental_and_retriever_with_partition_router(): content = """ decoder: @@ -1636,36 +1566,6 @@ def test_client_side_incremental_with_partition_router(): ) -def test_given_data_feed_and_client_side_incremental_then_raise_error(): - content = """ -incremental_sync: - type: DatetimeBasedCursor - $parameters: - datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" - start_datetime: "{{ config['start_time'] }}" - cursor_field: "created" - is_data_feed: true - is_client_side_incremental: true - """ - - parsed_incremental_sync = YamlDeclarativeSource._parse(content) - resolved_incremental_sync = resolver.preprocess_manifest(parsed_incremental_sync) - datetime_based_cursor_definition = transformer.propagate_types_and_parameters( - "", resolved_incremental_sync["incremental_sync"], {} - ) - - with pytest.raises(ValueError) as e: - factory.create_component( - model_type=DatetimeBasedCursorModel, - component_definition=datetime_based_cursor_definition, - config=input_config, - ) - assert ( - e.value.args[0] - == "`Client side incremental` cannot be applied with `data feed`. Choose only 1 from them." - ) - - @pytest.mark.parametrize( "test_name, record_selector, expected_runtime_selector", [ diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 5caec9a34..df03f9830 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -10,11 +10,6 @@ import pytest import requests -from airbyte_cdk.legacy.sources.declarative.incremental import ( - DatetimeBasedCursor, - DeclarativeCursor, - ResumableFullRefreshCursor, -) from airbyte_cdk.models import ( AirbyteLogMessage, AirbyteMessage, @@ -26,6 +21,10 @@ from airbyte_cdk.sources.declarative.decoders import JsonDecoder from airbyte_cdk.sources.declarative.extractors import DpathExtractor, HttpSelector, RecordSelector from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter +from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( + ParentStreamConfig, + SubstreamPartitionRouter, +) from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator, Paginator from airbyte_cdk.sources.declarative.requesters.paginators.strategies import ( CursorPaginationStrategy, @@ -91,10 +90,6 @@ def test_simple_retriever_full(mock_http_stream): record_selector = MagicMock() record_selector.select_records.return_value = records - cursor = MagicMock(spec=DeclarativeCursor) - stream_slices = [{"date": "2022-01-01"}, {"date": "2022-01-02"}] - cursor.stream_slices.return_value = stream_slices - response = requests.Response() response.status_code = 200 @@ -103,7 +98,6 @@ def test_simple_retriever_full(mock_http_stream): last_page_token_value = 0 underlying_state = {"date": "2021-01-01"} - cursor.get_stream_state.return_value = underlying_state requester.get_authenticator.return_value = NoAuth({}) url_base = "https://airbyte.io" @@ -130,20 +124,17 @@ def test_simple_retriever_full(mock_http_stream): requester=requester, paginator=paginator, record_selector=record_selector, - stream_slicer=cursor, - cursor=cursor, + stream_slicer=SinglePartitionRouter(parameters={}), parameters={}, config={}, ) assert retriever.primary_key == primary_key - assert retriever.state == underlying_state assert ( retriever._next_page_token(response, last_page_size, last_record, last_page_token_value) == next_page_token ) assert retriever._request_params(None, None) == {} - assert retriever.stream_slices() == stream_slices @patch.object(SimpleRetriever, "_read_pages", return_value=iter([*request_response_logs, *records])) @@ -151,16 +142,6 @@ def test_simple_retriever_with_request_response_logs(mock_http_stream): requester = MagicMock() paginator = MagicMock() record_selector = MagicMock() - stream_slicer = DatetimeBasedCursor( - start_datetime="", - end_datetime="", - step="P1D", - cursor_field="id", - datetime_format="", - cursor_granularity="P1D", - config={}, - parameters={}, - ) retriever = SimpleRetriever( name="stream_name", @@ -168,7 +149,7 @@ def test_simple_retriever_with_request_response_logs(mock_http_stream): requester=requester, paginator=paginator, record_selector=record_selector, - stream_slicer=stream_slicer, + stream_slicer=SinglePartitionRouter(parameters={}), parameters={}, config={}, ) @@ -181,153 +162,6 @@ def test_simple_retriever_with_request_response_logs(mock_http_stream): assert actual_messages[3] == records[1] -@pytest.mark.parametrize( - "initial_state, expected_reset_value, expected_next_page", - [ - pytest.param(None, None, 1, id="test_initial_sync_no_state"), - pytest.param({"next_page_token": 10}, 10, 11, id="test_reset_with_next_page_token"), - ], -) -def test_simple_retriever_resumable_full_refresh_cursor_page_increment( - initial_state, expected_reset_value, expected_next_page -): - stream_name = "stream_name" - expected_records = [ - Record(data={"id": "abc"}, associated_slice=None, stream_name=stream_name), - Record(data={"id": "def"}, associated_slice=None, stream_name=stream_name), - Record(data={"id": "ghi"}, associated_slice=None, stream_name=stream_name), - Record(data={"id": "jkl"}, associated_slice=None, stream_name=stream_name), - Record(data={"id": "mno"}, associated_slice=None, stream_name=stream_name), - Record(data={"id": "123"}, associated_slice=None, stream_name=stream_name), - Record(data={"id": "456"}, associated_slice=None, stream_name=stream_name), - Record(data={"id": "789"}, associated_slice=None, stream_name=stream_name), - ] - - response = requests.Response() - response.status_code = 200 - response._content = json.dumps( - {"data": [record.data for record in expected_records[:5]]} - ).encode("utf-8") - - requester = MagicMock() - requester.send_request.side_effect = [ - response, - response, - ] - - record_selector = MagicMock() - record_selector.select_records.side_effect = [ - [ - expected_records[0], - expected_records[1], - expected_records[2], - expected_records[3], - expected_records[4], - ], - [ - expected_records[5], - expected_records[6], - expected_records[7], - ], - ] - - page_increment_strategy = PageIncrement(config={}, page_size=5, parameters={}) - paginator = DefaultPaginator( - config={}, - pagination_strategy=page_increment_strategy, - url_base="https://airbyte.io", - parameters={}, - ) - - stream_slicer = ResumableFullRefreshCursor(parameters={}) - if initial_state: - stream_slicer.set_initial_state(initial_state) - - retriever = SimpleRetriever( - name=stream_name, - primary_key=primary_key, - requester=requester, - paginator=paginator, - record_selector=record_selector, - stream_slicer=stream_slicer, - cursor=stream_slicer, - parameters={}, - config={}, - ) - - stream_slice = list(stream_slicer.stream_slices())[0] - actual_records = [ - r for r in retriever.read_records(records_schema={}, stream_slice=stream_slice) - ] - - assert len(actual_records) == 5 - assert actual_records == expected_records[:5] - assert retriever.state == {"next_page_token": expected_next_page} - - actual_records = [ - r for r in retriever.read_records(records_schema={}, stream_slice=stream_slice) - ] - assert len(actual_records) == 3 - assert actual_records == expected_records[5:] - assert retriever.state == {"__ab_full_refresh_sync_complete": True} - - -def test_simple_retriever_resumable_full_refresh_cursor_reset_skip_completed_stream(): - expected_records = [ - Record(data={"id": "abc"}, associated_slice=None, stream_name="test_stream"), - Record(data={"id": "def"}, associated_slice=None, stream_name="test_stream"), - ] - - response = requests.Response() - response.status_code = 200 - response._content = json.dumps({}).encode("utf-8") - - requester = MagicMock() - requester.send_request.side_effect = [ - response, - ] - - record_selector = MagicMock() - record_selector.select_records.return_value = [ - expected_records[0], - expected_records[1], - ] - - page_increment_strategy = PageIncrement(config={}, page_size=5, parameters={}) - paginator = DefaultPaginator( - config={}, - pagination_strategy=page_increment_strategy, - url_base="https://airbyte.io", - parameters={}, - ) - paginator.get_initial_token = Mock(wraps=paginator.get_initial_token) - - stream_slicer = ResumableFullRefreshCursor(parameters={}) - stream_slicer.set_initial_state({"__ab_full_refresh_sync_complete": True}) - - retriever = SimpleRetriever( - name="stream_name", - primary_key=primary_key, - requester=requester, - paginator=paginator, - record_selector=record_selector, - stream_slicer=stream_slicer, - cursor=stream_slicer, - parameters={}, - config={}, - ) - - stream_slice = list(stream_slicer.stream_slices())[0] - actual_records = [ - r for r in retriever.read_records(records_schema={}, stream_slice=stream_slice) - ] - - assert len(actual_records) == 0 - assert retriever.state == {"__ab_full_refresh_sync_complete": True} - - paginator.get_initial_token.assert_not_called() - - @pytest.mark.parametrize( "test_name, paginator_mapping, request_options_provider_mapping, expected_mapping", [ @@ -384,11 +218,11 @@ def test_get_request_options_from_pagination( for _, method in request_option_type_to_method.items(): if expected_mapping is not None: - actual_mapping = method(None, None, None) + actual_mapping = method(None, None) assert actual_mapping == expected_mapping else: try: - method(None, None, None) + method(None, None) assert False except ValueError: pass @@ -431,11 +265,11 @@ def test_get_request_headers(test_name, paginator_mapping, expected_mapping): for _, method in request_option_type_to_method.items(): if expected_mapping: - actual_mapping = method(None, None, None) + actual_mapping = method(None, None) assert actual_mapping == expected_mapping else: try: - method(None, None, None) + method(None, None) assert False except ValueError: pass @@ -509,7 +343,7 @@ def test_ignore_request_option_provider_parameters_on_paginated_requests( } for _, method in request_option_type_to_method.items(): - actual_mapping = method(None, None, next_page_token={"next_page_token": "1000"}) + actual_mapping = method(None, next_page_token={"next_page_token": "1000"}) assert actual_mapping == expected_mapping @@ -552,11 +386,11 @@ def test_request_body_data( ) if expected_body_data: - actual_body_data = retriever._request_body_data(None, None, None) + actual_body_data = retriever._request_body_data(None, None) assert actual_body_data == expected_body_data else: try: - retriever._request_body_data(None, None, None) + retriever._request_body_data(None, None) assert False except ValueError: pass @@ -623,7 +457,6 @@ def test_given_stream_data_is_not_record_when_read_records_then_update_slice_wit ] record_selector = MagicMock() record_selector.select_records.return_value = [] - cursor = MagicMock(spec=DeclarativeCursor) retriever = SimpleRetriever( name="stream_name", @@ -631,16 +464,15 @@ def test_given_stream_data_is_not_record_when_read_records_then_update_slice_wit requester=MagicMock(), paginator=Mock(), record_selector=record_selector, - stream_slicer=cursor, - cursor=cursor, + stream_slicer=SinglePartitionRouter(parameters={}), parameters={}, config={}, ) stream_slice = StreamSlice(cursor_slice={}, partition={"repository": "airbyte"}) - def retriever_read_pages(_, __, ___): + def retriever_read_pages(_, __): return retriever._parse_records( - response=MagicMock(), stream_state={}, stream_slice=stream_slice, records_schema={} + response=MagicMock(), stream_slice=stream_slice, records_schema={} ) with patch.object( @@ -650,14 +482,11 @@ def retriever_read_pages(_, __, ___): side_effect=retriever_read_pages, ): list(retriever.read_records(stream_slice=stream_slice, records_schema={})) - cursor.observe.assert_not_called() - cursor.close_slice.assert_called_once_with(stream_slice) def test_given_initial_token_is_zero_when_read_records_then_pass_initial_token(): record_selector = MagicMock() record_selector.select_records.return_value = [] - cursor = MagicMock(spec=DeclarativeCursor) paginator = MagicMock() paginator.get_initial_token.return_value = 0 paginator.next_page_token.return_value = None @@ -668,8 +497,7 @@ def test_given_initial_token_is_zero_when_read_records_then_pass_initial_token() requester=MagicMock(), paginator=paginator, record_selector=record_selector, - stream_slicer=cursor, - cursor=cursor, + stream_slicer=SinglePartitionRouter(parameters={}), parameters={}, config={}, ) @@ -685,9 +513,7 @@ def test_given_initial_token_is_zero_when_read_records_then_pass_initial_token() return_value=response, ) as fetch_next_page_mock: list(retriever.read_records(stream_slice=stream_slice, records_schema={})) - fetch_next_page_mock.assert_called_once_with( - cursor.get_stream_state(), stream_slice, {"next_page_token": 0} - ) + fetch_next_page_mock.assert_called_once_with(stream_slice, {"next_page_token": 0}) def _generate_slices(number_of_slices): @@ -699,9 +525,6 @@ def test_given_state_selector_when_read_records_use_stream_state(http_stream_rea requester = MagicMock() paginator = MagicMock() record_selector = MagicMock() - cursor = MagicMock(spec=DeclarativeCursor) - cursor.select_state = MagicMock(return_value=A_SLICE_STATE) - cursor.get_stream_state = MagicMock(return_value=A_STREAM_STATE) retriever = SimpleRetriever( name="stream_name", @@ -709,15 +532,14 @@ def test_given_state_selector_when_read_records_use_stream_state(http_stream_rea requester=requester, paginator=paginator, record_selector=record_selector, - stream_slicer=cursor, - cursor=cursor, + stream_slicer=SinglePartitionRouter(parameters={}), parameters={}, config={}, ) list(retriever.read_records(stream_slice=A_STREAM_SLICE, records_schema={})) - http_stream_read_pages.assert_called_once_with(mocker.ANY, A_STREAM_STATE, A_STREAM_SLICE) + http_stream_read_pages.assert_called_once_with(mocker.ANY, A_STREAM_SLICE) def test_retriever_last_page_size_for_page_increment(): @@ -755,7 +577,6 @@ def mock_parse_records(response: Optional[requests.Response]) -> Iterable[Record actual_records = list( retriever._read_pages( records_generator_fn=mock_parse_records, - stream_state={}, stream_slice=StreamSlice(cursor_slice={}, partition={}), ) ) @@ -805,7 +626,6 @@ def mock_parse_records(response: Optional[requests.Response]) -> Iterable[Record actual_records = list( retriever._read_pages( records_generator_fn=mock_parse_records, - stream_state={}, stream_slice=StreamSlice(cursor_slice={}, partition={}), ) ) @@ -896,7 +716,6 @@ def mock_send_request( record_generator = partial( retriever._parse_records, - stream_state=retriever.state or {}, stream_slice=_slice, records_schema={}, ) @@ -904,9 +723,7 @@ def mock_send_request( # We call _read_pages() because the existing read_records() used to modify and reset state whereas # _read_pages() did not invoke any methods to reset state actual_records = list( - retriever._read_pages( - records_generator_fn=record_generator, stream_state={}, stream_slice=_slice - ) + retriever._read_pages(records_generator_fn=record_generator, stream_slice=_slice) ) assert len(actual_records) == 8 assert actual_records[0] == Record( @@ -917,9 +734,7 @@ def mock_send_request( ) actual_records = list( - retriever._read_pages( - records_generator_fn=record_generator, stream_state={}, stream_slice=_slice - ) + retriever._read_pages(records_generator_fn=record_generator, stream_slice=_slice) ) assert len(actual_records) == 8 assert actual_records[2] == Record( diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index ce2e17ce5..8df16b463 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -3290,7 +3290,7 @@ def _create_page(response_body): ) * 10, [{"ABC": 0}, {"AED": 1}], - [call({}, {}, None)], + [call({}, None)], ), ( "test_read_manifest_with_added_fields", @@ -3377,7 +3377,7 @@ def _create_page(response_body): {"ABC": 0, "added_field_key": "added_field_value"}, {"AED": 1, "added_field_key": "added_field_value"}, ], - [call({}, {}, None)], + [call({}, None)], ), ( "test_read_manifest_with_flatten_fields", @@ -3461,7 +3461,7 @@ def _create_page(response_body): {"ABC": 0, "id": 1}, {"AED": 1, "id": 2}, ], - [call({}, {}, None)], + [call({}, None)], ), ( "test_read_with_pagination_no_partitions", @@ -3548,8 +3548,8 @@ def _create_page(response_body): * 10, [{"ABC": 0}, {"AED": 1}, {"USD": 2}], [ - call({}, {}, None), - call({}, {}, {"next_page_token": "next"}), + call({}, None), + call({}, {"next_page_token": "next"}), ], ), ( @@ -3635,8 +3635,8 @@ def _create_page(response_body): ), [{"ABC": 0, "partition": 0}, {"AED": 1, "partition": 0}, {"ABC": 2, "partition": 1}], [ - call({}, {"partition": "0"}, None), - call({}, {"partition": "1"}, None), + call({"partition": "0"}, None), + call({"partition": "1"}, None), ], ), ( @@ -3739,9 +3739,9 @@ def _create_page(response_body): {"ABC": 2, "partition": 1}, ], [ - call({}, {"partition": "0"}, None), - call({}, {"partition": "0"}, {"next_page_token": "next"}), - call({}, {"partition": "1"}, None), + call({"partition": "0"}, None), + call({"partition": "0"}, {"next_page_token": "next"}), + call({"partition": "1"}, None), ], ), ], From ef0795934c95e8db1ba3d04dac94ab258b994d23 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Tue, 4 Nov 2025 18:03:37 -0800 Subject: [PATCH 2/2] remove stream_slices() from SimpleRetriever and AsyncRetriever --- .../sources/declarative/declarative_stream.py | 6 ++-- .../declarative/retrievers/async_retriever.py | 32 ++----------------- .../declarative/retrievers/retriever.py | 5 ++- .../retrievers/simple_retriever.py | 17 ---------- .../retrievers/test_simple_retriever.py | 29 ----------------- 5 files changed, 9 insertions(+), 80 deletions(-) diff --git a/airbyte_cdk/legacy/sources/declarative/declarative_stream.py b/airbyte_cdk/legacy/sources/declarative/declarative_stream.py index 89935fda8..593a6e504 100644 --- a/airbyte_cdk/legacy/sources/declarative/declarative_stream.py +++ b/airbyte_cdk/legacy/sources/declarative/declarative_stream.py @@ -5,6 +5,8 @@ from dataclasses import InitVar, dataclass, field from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union +from typing_extensions import deprecated + from airbyte_cdk.legacy.sources.declarative.incremental import ( GlobalSubstreamCursor, PerPartitionCursor, @@ -13,7 +15,6 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration -from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader @@ -28,6 +29,7 @@ from airbyte_cdk.sources.types import Config, StreamSlice +@deprecated("DeclarativeStream has been deprecated in favor of the concurrent DefaultStream") @dataclass class DeclarativeStream(Stream): """ @@ -198,8 +200,6 @@ def state_checkpoint_interval(self) -> Optional[int]: return None def get_cursor(self) -> Optional[Cursor]: - if self.retriever and isinstance(self.retriever, SimpleRetriever): - return self.retriever.cursor return None def _get_checkpoint_reader( diff --git a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py index 33a288c43..986a11d39 100644 --- a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py @@ -11,7 +11,7 @@ ) from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.streams.core import StreamData -from airbyte_cdk.sources.types import Config, StreamSlice, StreamState +from airbyte_cdk.sources.types import Config, StreamSlice from airbyte_cdk.sources.utils.slice_logger import AlwaysLogSliceLogger @@ -59,30 +59,6 @@ def exit_on_rate_limit(self, value: bool) -> None: if job_orchestrator is not None: job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment] - @property - def state(self) -> StreamState: - """ - As a first iteration for sendgrid, there is no state to be managed - """ - return {} - - @state.setter - def state(self, value: StreamState) -> None: - """ - As a first iteration for sendgrid, there is no state to be managed - """ - pass - - def _get_stream_state(self) -> StreamState: - """ - Gets the current state of the stream. - - Returns: - StreamState: Mapping[str, Any] - """ - - return self.state - def _validate_and_get_stream_slice_jobs( self, stream_slice: Optional[StreamSlice] = None ) -> Iterable[AsyncJob]: @@ -101,9 +77,6 @@ def _validate_and_get_stream_slice_jobs( """ return stream_slice.extra_fields.get("jobs", []) if stream_slice else [] - def stream_slices(self) -> Iterable[Optional[StreamSlice]]: - yield from self.stream_slicer.stream_slices() - def read_records( self, records_schema: Mapping[str, Any], @@ -112,13 +85,12 @@ def read_records( # emit the slice_descriptor log message, for connector builder TestRead yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore - stream_state: StreamState = self._get_stream_state() jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs) yield from self.record_selector.filter_and_transform( all_data=records, - stream_state=stream_state, + stream_state={}, # stream_state as an interpolation context is deprecated records_schema=records_schema, stream_slice=stream_slice, ) diff --git a/airbyte_cdk/sources/declarative/retrievers/retriever.py b/airbyte_cdk/sources/declarative/retrievers/retriever.py index a5641596b..e9a1cfc25 100644 --- a/airbyte_cdk/sources/declarative/retrievers/retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/retriever.py @@ -33,7 +33,10 @@ def read_records( @abstractmethod @deprecated("Stream slicing is being moved to the stream level.") def stream_slices(self) -> Iterable[Optional[StreamSlice]]: - """Returns the stream slices""" + """Does nothing as this method is deprecated, so underlying Retriever implementations + do not need to implement this. + """ + yield from [] @property @deprecated("State management is being moved to the stream level.") diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 8b15db404..fc6f0d78d 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -456,23 +456,6 @@ def read_records( ) yield from self._read_pages(record_generator, _slice) - def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore - """ - Specifies the slices for this stream. See the stream slicing section of the docs for more information. - - :param sync_mode: - :param cursor_field: - :return: - """ - return self.stream_slicer.stream_slices() - - # todo: There are a number of things that can be cleaned up when we remove self.cursor and all the related - # SimpleRetriever state management that is handled by the concurrent CDK Framework: - # - DONE ModelToComponentFactory.create_datetime_based_cursor() should be removed since it does need to be instantiated - # - DONE ModelToComponentFactory.create_incrementing_count_cursor() should be removed since it's a placeholder - # - DONE test_simple_retriever.py: Remove all imports and usages of legacy cursor components - # - DONE test_model_to_component_factory.py:test_datetime_based_cursor() test can be removed - def _parse_records( self, response: Optional[requests.Response], diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index df03f9830..d9585dbd4 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -21,10 +21,6 @@ from airbyte_cdk.sources.declarative.decoders import JsonDecoder from airbyte_cdk.sources.declarative.extractors import DpathExtractor, HttpSelector, RecordSelector from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter -from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( - ParentStreamConfig, - SubstreamPartitionRouter, -) from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator, Paginator from airbyte_cdk.sources.declarative.requesters.paginators.strategies import ( CursorPaginationStrategy, @@ -42,7 +38,6 @@ from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import PaginationTracker from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever -from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator from airbyte_cdk.sources.streams.http.pagination_reset_exception import ( PaginationResetRequiredException, ) @@ -425,30 +420,6 @@ def test_path(test_name, requester_path, paginator_path, expected_path): assert actual_path == expected_path -def test_limit_stream_slices(): - maximum_number_of_slices = 4 - stream_slicer = MagicMock() - stream_slicer.stream_slices.return_value = _generate_slices(maximum_number_of_slices * 2) - stream_slicer_wrapped = StreamSlicerTestReadDecorator( - wrapped_slicer=stream_slicer, - maximum_number_of_slices=maximum_number_of_slices, - ) - retriever = SimpleRetriever( - name="stream_name", - primary_key=primary_key, - requester=MagicMock(), - paginator=MagicMock(), - record_selector=MagicMock(), - stream_slicer=stream_slicer_wrapped, - parameters={}, - config={}, - ) - - truncated_slices = list(retriever.stream_slices()) - - assert truncated_slices == _generate_slices(maximum_number_of_slices) - - def test_given_stream_data_is_not_record_when_read_records_then_update_slice_with_optional_record(): stream_data = [ AirbyteMessage(