I am working on a python automation and i wanted t...
# ask-ai
m
I am working on a python automation and i wanted to understand the ordering of the different functions under the Stream class. When does
read_records
get called?
u
Based on the provided knowledge sources, I can explain the ordering of functions in the Stream class and when
read_records
is called in Airbyte's Python CDK. The
read_records
function is a key part of the data reading process in Airbyte. It's typically called within the
read_full_refresh
or
read_incremental
methods of the AbstractSource class. Here's a general flow of how it works: 1. The process starts with the
read
method in the AbstractSource class. 2. Depending on the sync mode (full refresh or incremental), it calls either
_read_full_refresh
or
_read_incremental
. 3. These methods then iterate over the stream slices and call
read_records
for each slice. Here's a relevant code snippet from the
_read_full_refresh
method [Source: Github Issue](https://github.com/airbytehq/airbyte/issues/15547):
Copy code
python def _read_full_refresh(         self,         sync_mode: SyncMode,         cursor_field: List[str] = None,         stream_slice: Mapping[str, Any] = None,         stream_state: Mapping[str, Any] = None,     ) -> Iterable[StreamData]:         for record_data_or_message in record_data_or_messages:             yield from self.retriever.read_records(stream_slice)
The
read_records
method is typically implemented in the HttpStream class for HTTP-based connectors. It uses other methods like
_read_pages
and
parse_response
to handle pagination and data parsing. Here's a simplified version of the
read_records
method [Source: Github Discussion](https://github.com/airbytehq/airbyte/discussions/33814):
Copy code
python def read_records(         self,         sync_mode: SyncMode,         cursor_field: List[str] = None,         stream_slice: Mapping[str, Any] = None,         stream_state: Mapping[str, Any] = None,     ) -> Iterable[StreamData]:         yield from self._read_pages(             lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state         )
This method is responsible for reading records from the API, handling pagination, and yielding the parsed records. In summary,
read_records
is called as part of the main reading process, after the source object is created and the read operation is initiated. It's a crucial method that handles the actual data retrieval and parsing for each stream.
m
@kapa.ai can you explain on a higher level how the workflow works with regards to
read_records
u
Certainly! I can explain the high-level workflow of
read_records
in Airbyte based on the provided knowledge sources. The
read_records
function is a crucial part of Airbyte's data synchronization process. Here's a high-level overview of how it works: 1. Initialization: The process starts when the
read
command is initiated in the Airbyte CDK (Connector Development Kit) [How the Airbyte CDK read command works Part 2](https://github.com/airbytehq/airbyte/discussions/33814). 2. Stream Slicing: Before
read_records
is called, the system generates stream slices. These slices can be used to break down the data into manageable chunks, which is especially useful for large datasets or APIs with pagination [How the Airbyte CDK read command works > Full Refresh Part 1](https://github.com/airbytehq/airbyte/discussions/33814). 3. Iteration: The system then iterates over each stream slice. 4. Reading Records: For each slice, the
read_records
function is called. This function is typically implemented in the
HttpStream
class for most connectors [How the Airbyte CDK read command works > Full Refresh Part 4](https://github.com/airbytehq/airbyte/discussions/33814). 5. API Interaction: Inside
read_records
, an HTTP request is created using parameters from
request_param
and
next_page_token
functions. 6. Parsing Response: The
parse_response
function is called to process the JSON object returned from the API. This function yields individual records [How the Airbyte CDK read command works > Full Refresh Part 4](https://github.com/airbytehq/airbyte/discussions/33814). 7. Record Generation: The parsed records are then yielded one by one. 8. Output: These records are sent to the standard input (STDIN) to be read by the Airbyte Worker and subsequently sent to the destination. This process continues until all slices have been processed or until a specified limit is reached. It's worth noting that this workflow can vary slightly depending on whether the sync is incremental or full refresh, and some connectors may implement custom logic within this general framework.
m
@kapa.ai can you explain the workflow of the Stream class?
u
A message from kapa.ai
u
Sure, I can explain the workflow of the
Stream
class based on the provided knowledge sources. The
Stream
class in Airbyte is a fundamental concept that models the logical data groups that make up the larger Source. If the Source is a RDMS, each Stream is a table. In a REST API setting, each Stream corresponds to one resource within the API [Basic Concepts](https://docs.airbyte.com/connector-development/cdk-python/basic-concepts). Here's a high-level overview of how the
Stream
class works: 1. Defining the Stream's Schema: The first step in implementing a stream is to define the stream's schema. This schema defines the structure of the data that the stream will be reading from the source [Full Refresh Streams](https://docs.airbyte.com/connector-development/cdk-python/full-refresh-stream). 2. Implementing the Logic for Reading Records: The next step is to implement the logic for reading records from the underlying data source. This is where the
read_records
function comes into play, which is responsible for reading data from the source and yielding individual records [Full Refresh Streams](https://docs.airbyte.com/connector-development/cdk-python/full-refresh-stream). 3. Stream Representation in YAML: The stream object is represented in the YAML file. This includes the type of the stream, the retriever, the schema loader, the stream cursor field, and the transformations [Understanding the YAML file > Stream Part 1](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/yaml-overview#stream). 4. Data Synchronization: During the data synchronization process, the
Stream
class is responsible for reading data from the source and sending it to the destination. This can be done in either a full refresh or incremental manner [Basic Concepts](https://docs.airbyte.com/connector-development/cdk-python/basic-concepts). 5. Subclassing for Specific Sources: For specific sources, such as Webflow API or a database, the
Stream
class can be subclassed to create a new stream for each unique API endpoint or database table [Understanding the Webflow API > WebflowStream class](https://airbyte.com/tutorials/extract-data-from-the-webflow-api#step-2). Please note that the exact workflow can vary depending on the specific implementation of the
Stream
class for a given connector.