Balasubramanian T K
07/15/2024, 12:59 PMdef _validate_state_messages(state_messages: List[AirbyteMessage], configured_catalog: ConfiguredAirbyteCatalog):
# Ensure that at least one state message is emitted for each stream
> assert len(state_messages) >= len(
configured_catalog.streams
), "At least one state message should be emitted for each configured stream."
E AssertionError: At least one state message should be emitted for each configured stream.
E assert 1 >= 6
E + where 1 = len([AirbyteMessage(type=<Type.STATE: 'STATE'>, log=None, spec=None, connectionStatus=None, catalog=None, record=None, sta..._=None, data=None, sourceStats=AirbyteStateStats(recordCount=231.0), destinationStats=None), trace=None, control=None)])
E + and 6 = len([ConfiguredAirbyteStream(stream=AirbyteStream(name='cards', json_schema={'type': 'object', '$schema': '<http://json-sch>...ll_refresh'>, cursor_field=None, destination_sync_mode=<DestinationSyncMode.overwrite: 'overwrite'>, primary_key=None)])
E + where [ConfiguredAirbyteStream(stream=AirbyteStream(name='cards', json_schema={'type': 'object', '$schema': '<http://json-sch>...ll_refresh'>, cursor_field=None, destination_sync_mode=<DestinationSyncMode.overwrite: 'overwrite'>, primary_key=None)] = ConfiguredAirbyteCatalog(streams=[ConfiguredAirbyteStream(stream=AirbyteStream(name='cards', json_schema={'type': 'obj...l_refresh'>, cursor_field=None, destination_sync_mode=<DestinationSyncMode.overwrite: 'overwrite'>, primary_key=None)]).streams
kapa.ai
07/15/2024, 1:00 PMget_updated_state
method of your source connector where you update the state message for each stream. Unfortunately, I don't have a specific code snippet from the provided knowledge sources to resolve this issue. However, you can refer to the Airbyte documentation or the source code of other connectors for examples on how to emit state messages for each stream. Please note that the exact implementation might vary depending on the specifics of your source connector and the API you're working with.