How do we implement nested substream partitions wh...
# help-connector-development
a
How do we implement nested substream partitions whose parent itself uses a substream partition to call it's parent? I am trying to apply it for the twilio api for it's dependent phone number stream https://github.com/airbytehq/airbyte/pull/25705/commits
k
A message from kapa.ai
New message text here
a
@kapa.ai This is my manifest.py file which has a substream partition
accounts
and another substream
partition available_phone_number_countries
the stream
available_phone_numbers_local
needs the data from both partitions. is the file correct?
Copy code
accounts_stream:
    $ref: "#/definitions/base_stream_with_pagination"
    $parameters:
      name: "accounts"
      path: "/Accounts.json"
      primary_key: "sid"
  
  accounts_partition_router:
    type: SubstreamPartitionRouter
    parent_stream_configs:
      - stream: "#/definitions/accounts_stream"
        parent_key: sid
        partition_field: id
  available_phone_number_countries_stream:
    $ref: "#/definitions/base_stream_with_pagination"
    $parameters:
      name: "available_phone_number_countries"
      path: "/Accounts/{{stream_ slice.id}}/AvailablePhoneNumbers.json"
      primary_key: "country_code"
    retriever:
      $ref: "#/definitions/retriever"
      requester:
        $ref: "#/definitions/requester"
      paginator:
        $ref: "#/definitions/default_paginator"
      partition_router:
        $ref: "#/definitions/accounts_partition_router"
    
  available_phone_numbers_countries_partition_router:
    type: SubstreamPartitionRouter
    parent_stream_configs:
      - stream: "#/definitions/available_phone_number_countries_stream"
        parent_key: ""
        partition_field: subresource_uris

  available_phone_numbers_local_stream:
    $ref: "#/definitions/base_stream_with_pagination"
    $parameters:
      name: "available_phone_numbers_local"
      path: "/Accounts/{{stream_slice.account_id}}/AvailablePhoneNumbers/{{stream_slice.phone_number}}/Local.json"
      primary_key: null
    retriever:
      $ref: "#/definitions/retriever"
      requester:
        $ref: "#/definitions/requester"
      paginator:
        $ref: "#/definitions/default_paginator"
      partition_router:
        $ref: "#/definitions/available_phone_number_countries_partition_router"
k
A message from kapa.ai
Based on the information extracts provided, I cannot confirm if your manifest file is correct or not. However, I can provide some guidance on how to use the
SubstreamPartitionRouter
based on the extracts. To use the
SubstreamPartitionRouter
, you need to configure the following fields: 1. Parent stream: Defines the records of which stream should be used as partitions. 2. Parent key: The property on the parent stream record that should become the partition value (usually some form of id). 3. Current partition value identifier: This is the identifier of the variable holding the current partition value. It can be used in the path of the stream using the
{{ stream_partition.<identifier> }}
interpolation placeholder. Here's an example of how to configure a `SubstreamPartitionRouter`:
Copy code
yaml partition_router:   type: SubstreamPartitionRouter   parent_streams_configs:     - stream: "#/repositories_stream"       parent_key: "id"       partition_field: "repository"
Please review your manifest file and compare it with the provided guidance to ensure it is configured correctly. If you need more information, you can refer to the [Airbyte documentation](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/partition-router).
a
@kapa.ai but what if I my partition calls a parent which itself is a partition of a grandparent stream?
k
A message from kapa.ai
If you have a stream that depends on a parent stream, which itself is a partition of a grandparent stream, you can still use the
SubstreamPartitionRouter
. You'll need to configure the partition routers for both the parent and the child streams. Here's an example based on the information extracts: 1. Configure the partition router for the parent stream (which is a partition of the grandparent stream):
Copy code
yaml parent_partition_router:   type: SubstreamPartitionRouter   parent_stream_configs:     - stream: "#/grandparent_stream"       parent_key: "grandparent_id"       partition_field: "parent_id"
2. Configure the partition router for the child stream (which is a partition of the parent stream):
Copy code
yaml child_partition_router:   type: SubstreamPartitionRouter   parent_stream_configs:     - stream: "#/parent_stream"       parent_key: "parent_id"       partition_field: "child_id"
3. In the child stream configuration, use the
{{ stream_partition.<identifier> }}
interpolation placeholder to refer to the parent stream's partition value:
Copy code
yaml child_stream:   $parameters:     path: "/grandparent/{{ stream_partition.parent_id }}/parent/{{ stream_partition.child_id }}"   partition_router:     $ref: "#/definitions/child_partition_router"
Make sure to adjust the example according to your specific use case and the structure of your API. For more information, you can refer to the [Airbyte documentation](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/partition-router).
a
@kapa.ai i get this error
Copy code
File "/home/alcadeus/airbyte/airbyte-integrations/connectors/source-twilio/.venv/lib/python3.10/site-packages/jsonschema/validators.py", line 934, in validate
    raise error
jsonschema.exceptions.ValidationError: 'DatetimeBasedCursor' is not one of ['CustomIncrementalSync']

Failed validating 'enum' in schema[0]['properties']['type']:
    {'enum': ['CustomIncrementalSync'], 'type': 'string'}

On instance['type']:
    'DatetimeBasedCursor'
k
A message from kapa.ai
It seems like you are encountering a JSON schema validation error. The error message indicates that
'DatetimeBasedCursor'
is not one of the allowed values, which is
['CustomIncrementalSync']
. To fix this issue, you should update your configuration to use one of the allowed values. In this case, you should replace
'DatetimeBasedCursor'
with
'CustomIncrementalSync'
. If you need more information or guidance, please refer to the provided information extracts or the [Airbyte documentation](https://docs.airbyte.com/connector-development/connector-specification-reference).
a
@kapa.ai this is the stream in my manifest.yaml file which is causing the error
Copy code
conferences_stream:
    $ref: "#/definitions/base_stream_with_pagination"
    $parameters:
      name: "conferences"
      path: "/Accounts/{{stream_slice.id}}/Conferences.json"
      primary_key: "sid"
      incremental_sync:
        type: DatetimeBasedCursor
        cursor_field: "date_created"
        datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
        cursor_granularity: "PT0.000001S"
        step: "P1D"
    retriever:
      $ref: "#/definitions/retriever"
      requester:
        $ref: "#/definitions/requester"
      paginator:
        $ref: "#/definitions/default_paginator"
      partition_router:
        $ref: "#/definitions/accounts_partition_router"
k
A message from kapa.ai
New message text here
a
@kapa.ai how do i update
record_selector
of a specific stream? I want to put a record filter in the stream in my manifest.yaml file
k
A message from kapa.ai
To update the
record_selector
of a specific stream and add a record filter in your manifest.yaml file, you can follow these steps: 1. Locate the stream configuration in your manifest.yaml file. 2. Add or update the
retriever
field in the stream configuration. 3. Add or update the
record_selector
field within the
retriever
configuration. 4. Add the
record_filter
field with the desired filter condition. Here's an example of how to add a record filter to a specific stream:
Copy code
yaml your_stream:   $ref: "#/definitions/base_stream"   $parameters:     name: "your_stream_name"     path: "/your_stream_path"     primary_key: "your_primary_key"   retriever:     $ref: "#/definitions/retriever"     record_selector:       extractor:         field_path: ["your_field_path"]       record_filter:         condition: "{{ record['created_at'] < stream_slice['start_time'] }}"
In this example, replace
your_stream
,
your_stream_name
,
your_stream_path
,
your_primary_key
, and
your_field_path
with the appropriate values for your specific stream. The
condition
field in the
record_filter
should be updated with the desired filter expression. For more information on configuring record selectors and filters, you can refer to the [Airbyte documentation](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).
a
@kapa.ai how would I set a record filter if i want to only display the records which have the selected field to be not empthy
k
A message from kapa.ai
@Aazam Thakur That's a great question, but unfortunately I am only capable of responding to a maximum of five questions per thread. Don't hesitate to create a new thread for any further questions you have though. I'm here to help!