Ashish Narang
11/09/2022, 11:30 AMstate.json
is not getting updated. From the checkpoint i'm getting messages logs that new data is fetched but state.json points to the first date which i provided in config.json file.
Sharing below the code for Stream:
class ExchangeRates(HttpStream, IncrementalMixin):
# url_base = "<https://api.apilayer.com/exchangerates_data/>"
url_base = "<https://api.exchangerate.host/>"
cursor_field = "date"
primary_key = "date"
def __init__(self, config: Mapping[str, Any], start_date: datetime, **kwargs):
super().__init__()
self.base = config['base']
self.access_key = config['access_key']
self.start_date = start_date
self._cursor_value = None
@property
def state(self) -> Mapping[str, Any]:
if self._cursor_value:
return {self.cursor_field: self._cursor_value.strftime('%Y-%m-%d')}
else:
return {self.cursor_field: self.start_date.strftime('%Y-%m-%d')}
@state.setter
def state(self, value: Mapping[str, Any]):
self._cursor_value = datetime.strptime(value[self.cursor_field], '%Y-%m-%d')
def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(*args, **kwargs):
if self._cursor_value:
latest_record_date = datetime.strptime(record[self.cursor_field], '%Y-%m-%d')
self._cursor_value = max(self._cursor_value, latest_record_date)
yield record
def _chunk_date_range(self, start_date: datetime) -> List[Mapping[str, Any]]:
"""
Returns a list of each day between the start date and now.
The return value is a list of dicts {'date': date_string}.
"""
dates = []
while start_date < datetime.now():
dates.append({self.cursor_field: start_date.strftime('%Y-%m-%d')})
start_date += timedelta(days=1)
return dates
def stream_slices(self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]:
start_date = datetime.strptime(stream_state[self.cursor_field], '%Y-%m-%d') if stream_state and self.cursor_field in stream_state else self.start_date
return self._chunk_date_range(start_date)
def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None) -> str:
return stream_slice['date']
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
# The api requires that we include access_key as a query param so we do that in this method
return {'apikey': self.access_key}
def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
# The response is a simple JSON whose schema matches our stream's schema exactly,
# so we just return a list containing the response
return [response.json()]
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
# The API does not offer pagination,
# so we return None to indicate there are no more pages in the response
return None
return None
Below is the command and messages i'm getting in terminal:
(airbyte) ashish@ashish-ThinkPad-E14-Gen-2:~/repos/airbyte/airbyte-integrations/connectors/source-python-http-rates-api$ python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json --state sample_files/state.json
{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourcePythonHttpRatesApi"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: exchange_rates "}}
Inside state setter
-- 2022-11-07 00:00:00
{"type": "LOG", "log": {"level": "INFO", "message": "Setting state of exchange_rates stream to {'date': '2022-11-07'}"}}
=================================================================================
{"type": "STATE", "state": {"type": "STREAM", "stream": {"stream_descriptor": {"name": "exchange_rates"}, "stream_state": {"date": "2022-11-08"}}, "data": {"exchange_rates": {"date": "2022-11-08"}}}}
===================================================
{"type": "STATE", "state": {"type": "STREAM", "stream": {"stream_descriptor": {"name": "exchange_rates"}, "stream_state": {"date": "2022-11-09"}}, "data": {"exchange_rates": {"date": "2022-11-09"}}}}
{"type": "LOG", "log": {"level": "INFO", "message": "Read 3 records from exchange_rates stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing exchange_rates"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourcePythonHttpRatesApi runtimes:\nSyncing stream exchange_rates 0:00:00.384562"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourcePythonHttpRatesApi"}}
In above logs it's clear that we are fetching date for 2022-11-07
2022-11-08
2022-11-09
dates starting from 2022-11-07
But my state.json always has this fixed value
{
"exchange_rates": {
"date": "2022-11-07"
}
}
can someone help what i'm doing wrong?user
12/06/2022, 8:24 PM