Vika Petrenko
06/06/2021, 11:40 PMread_records
if source API does not support query params to get incremental new or modified data and use like an incremental stream?
def read_records(...) -> Iterable[Mapping[str, Any]]:
filtered = []
items = super().read_records(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)
for item in items:
if self._field_to_datetime(item[self.cursor_field]) > self._field_to_datetime(stream_state[self.cursor_field]):
filtered.append(item)
yield from filtered
s
s
s
def read_records(...) -> Iterable[Mapping[str, Any]]:
filtered = []
items = super().read_records(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)
for item in items:
if self._field_to_datetime(item[self.cursor_field]) > self._field_to_datetime(stream_state[self.cursor_field]):
yield item
Vika Petrenko
06/07/2021, 12:02 AM