Hello all, i'm tryting to use airbyte incremental ...
# ask-community-for-troubleshooting
a
Hello all, i'm tryting to use airbyte incremental sync with exchange rate api. I've followed the read data documentation - https://docs.airbyte.com/connector-development/tutorials/cdk-tutorial-python-http/read-data Everything seems to be working except that my
state.json
is not getting updated. From the checkpoint i'm getting messages logs that new data is fetched but state.json points to the first date which i provided in config.json file. Sharing below the code for Stream:
Copy code
class ExchangeRates(HttpStream, IncrementalMixin):
    # url_base = "<https://api.apilayer.com/exchangerates_data/>"
    url_base = "<https://api.exchangerate.host/>"

    cursor_field = "date"
    primary_key = "date"

    def __init__(self, config: Mapping[str, Any], start_date: datetime, **kwargs):
        super().__init__()
        self.base = config['base']
        self.access_key = config['access_key']
        self.start_date = start_date
        self._cursor_value = None

    @property
    def state(self) -> Mapping[str, Any]:
        if self._cursor_value:
            return {self.cursor_field: self._cursor_value.strftime('%Y-%m-%d')}
        else:
            return {self.cursor_field: self.start_date.strftime('%Y-%m-%d')}
    
    @state.setter
    def state(self, value: Mapping[str, Any]):
       self._cursor_value = datetime.strptime(value[self.cursor_field], '%Y-%m-%d')

    def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
        for record in super().read_records(*args, **kwargs):
            if self._cursor_value:
                latest_record_date = datetime.strptime(record[self.cursor_field], '%Y-%m-%d')
                self._cursor_value = max(self._cursor_value, latest_record_date)
            yield record

    def _chunk_date_range(self, start_date: datetime) -> List[Mapping[str, Any]]:
        """
        Returns a list of each day between the start date and now.
        The return value is a list of dicts {'date': date_string}.
        """
        dates = []
        while start_date < datetime.now():
            dates.append({self.cursor_field: start_date.strftime('%Y-%m-%d')})
            start_date += timedelta(days=1)
        return dates

    def stream_slices(self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]:
        start_date = datetime.strptime(stream_state[self.cursor_field], '%Y-%m-%d') if stream_state and self.cursor_field in stream_state else self.start_date
        return self._chunk_date_range(start_date)

    def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None) -> str:
        return stream_slice['date']


    def request_params(
            self,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, Any] = None,
            next_page_token: Mapping[str, Any] = None,
    ) -> MutableMapping[str, Any]:
        # The api requires that we include access_key as a query param so we do that in this method
        return {'apikey': self.access_key}

    def parse_response(
            self,
            response: requests.Response,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, Any] = None,
            next_page_token: Mapping[str, Any] = None,
    ) -> Iterable[Mapping]:
        # The response is a simple JSON whose schema matches our stream's schema exactly, 
        # so we just return a list containing the response
        return [response.json()]

    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        # The API does not offer pagination, 
        # so we return None to indicate there are no more pages in the response
        return None
        return None
Below is the command and messages i'm getting in terminal:
Copy code
(airbyte) ashish@ashish-ThinkPad-E14-Gen-2:~/repos/airbyte/airbyte-integrations/connectors/source-python-http-rates-api$ python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json --state sample_files/state.json
{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourcePythonHttpRatesApi"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: exchange_rates "}}
Inside state setter
-- 2022-11-07 00:00:00
{"type": "LOG", "log": {"level": "INFO", "message": "Setting state of exchange_rates stream to {'date': '2022-11-07'}"}}

=================================================================================

{"type": "STATE", "state": {"type": "STREAM", "stream": {"stream_descriptor": {"name": "exchange_rates"}, "stream_state": {"date": "2022-11-08"}}, "data": {"exchange_rates": {"date": "2022-11-08"}}}}

===================================================


{"type": "STATE", "state": {"type": "STREAM", "stream": {"stream_descriptor": {"name": "exchange_rates"}, "stream_state": {"date": "2022-11-09"}}, "data": {"exchange_rates": {"date": "2022-11-09"}}}}
{"type": "LOG", "log": {"level": "INFO", "message": "Read 3 records from exchange_rates stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing exchange_rates"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourcePythonHttpRatesApi runtimes:\nSyncing stream exchange_rates 0:00:00.384562"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourcePythonHttpRatesApi"}}
In above logs it's clear that we are fetching date for
2022-11-07
2022-11-08
2022-11-09
dates starting from
2022-11-07
But my state.json always has this fixed value
Copy code
{
  "exchange_rates": {
    "date": "2022-11-07"
  }
}
can someone help what i'm doing wrong?
u
Hello Ashish Narang, it's been a while without an update from us. Are you still having problems or did you find a solution?