How to resolve this error in low code cdk ``` def ...
# ask-ai
b
How to resolve this error in low code cdk
Copy code
def _validate_state_messages(state_messages: List[AirbyteMessage], configured_catalog: ConfiguredAirbyteCatalog):
        # Ensure that at least one state message is emitted for each stream
>       assert len(state_messages) >= len(
            configured_catalog.streams
        ), "At least one state message should be emitted for each configured stream."
E       AssertionError: At least one state message should be emitted for each configured stream.
E       assert 1 >= 6
E        +  where 1 = len([AirbyteMessage(type=<Type.STATE: 'STATE'>, log=None, spec=None, connectionStatus=None, catalog=None, record=None, sta..._=None, data=None, sourceStats=AirbyteStateStats(recordCount=231.0), destinationStats=None), trace=None, control=None)])
E        +  and   6 = len([ConfiguredAirbyteStream(stream=AirbyteStream(name='cards', json_schema={'type': 'object', '$schema': '<http://json-sch>...ll_refresh'>, cursor_field=None, destination_sync_mode=<DestinationSyncMode.overwrite: 'overwrite'>, primary_key=None)])
E        +    where [ConfiguredAirbyteStream(stream=AirbyteStream(name='cards', json_schema={'type': 'object', '$schema': '<http://json-sch>...ll_refresh'>, cursor_field=None, destination_sync_mode=<DestinationSyncMode.overwrite: 'overwrite'>, primary_key=None)] = ConfiguredAirbyteCatalog(streams=[ConfiguredAirbyteStream(stream=AirbyteStream(name='cards', json_schema={'type': 'obj...l_refresh'>, cursor_field=None, destination_sync_mode=<DestinationSyncMode.overwrite: 'overwrite'>, primary_key=None)]).streams
k
The error message you're seeing is due to an assertion in your code that checks if the number of state messages is greater than or equal to the number of streams in your configured catalog. In your case, it seems like you have 6 streams but only 1 state message, which is causing the assertion to fail. The state messages are used to capture the state of the data sync process. For each stream in your configured catalog, you should have at least one state message. This is important for incremental updates where you want to keep track of the last record you processed. To resolve this issue, you need to ensure that a state message is emitted for each stream in your configured catalog. This could be done in the
get_updated_state
method of your source connector where you update the state message for each stream. Unfortunately, I don't have a specific code snippet from the provided knowledge sources to resolve this issue. However, you can refer to the Airbyte documentation or the source code of other connectors for examples on how to emit state messages for each stream. Please note that the exact implementation might vary depending on the specifics of your source connector and the API you're working with.