<@U04SE163WC8> This is my custom pagination `class...
# ask-ai
a
@kapa.ai This is my custom pagination
class LastEvaluatedKeyPaginationStrategy(PageIncrement):
def next_page_token(self, response, last_records: List[Mapping[str, Any]]) -> Optional[Tuple[Optional[int], Optional[int]]]:
if ( len(last_records) < self._page_size) or len(last_records) == 0:
return None
last_evaluated_key = response.json().get('LastEvaluatedKey')
if last_evaluated_key:
return json.dumps(last_evaluated_key)
else:
return None
And it is showing error File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py", line 107, in next_page_token 127 self._token = self.pagination_strategy.next_page_token(response, last_page_size, last_record) 128 TypeError: next_page_token() takes 3 positional arguments but 4 were given
k
The error you're encountering indicates that the
next_page_token
method in your
LastEvaluatedKeyPaginationStrategy
class is being called with four arguments, but your method definition only expects three. To resolve this, you need to adjust your method definition to accept the correct number of arguments. Based on the error message, it seems that the
next_page_token
method should accept four arguments. Here is how you can modify your method:
Copy code
python class LastEvaluatedKeyPaginationStrategy(PageIncrement):          def next_page_token(self, response, last_page_size: int, last_records: List[Mapping[str, Any]]) -> Optional[Tuple[Optional[int], Optional[int]]]:                  if (len(last_records) < self._page_size) or len(last_records) == 0:             return None                  last_evaluated_key = response.json().get('LastEvaluatedKey')         if last_evaluated_key:             return json.dumps(last_evaluated_key)         else:             return None
In this updated method,
last_page_size
is added as the second parameter to match the expected number of arguments. If you need further assistance, you can refer to the [Airbyte CDK documentation](https://docs.airbyte.com/connector-development/cdk-python/) for more details on implementing custom pagination strategies.
a
@kapa.ai "error": {"message": "During the sync, the following streams did not sync successfully: Subscriptions: AirbyteTracedException(\"LastEvaluatedKeyPaginationStrategy.next_page_token() missing 1 required positional argument: 'last_records'\"
k
A message from kapa.ai
The error message indicates that the
next_page_token
method in your
LastEvaluatedKeyPaginationStrategy
class is missing a required positional argument:
last_records
. This suggests that the method definition does not match the expected signature. To resolve this, you need to ensure that your method definition includes all required arguments. Based on the error message and the previous context, it seems that the
next_page_token
method should accept four arguments. Here is how you can modify your method:
Copy code
python class LastEvaluatedKeyPaginationStrategy(PageIncrement):          def next_page_token(self, response, last_page_size: int, last_records: List[Mapping[str, Any]]) -> Optional[Tuple[Optional[int], Optional[int]]]:                  if (len(last_records) < self._page_size) or len(last_records) == 0:             return None                  last_evaluated_key = response.json().get('LastEvaluatedKey')         if last_evaluated_key:             return json.dumps(last_evaluated_key)         else:             return None
In this updated method,
last_page_size
is added as the second parameter to match the expected number of arguments. If you need further assistance, you can refer to the [Airbyte CDK documentation](https://docs.airbyte.com/connector-development/cdk-python/) for more details on implementing custom pagination strategies.