Hi I am using the connector builder ui and I am tr...
# help-connector-development
a
Hi I am using the connector builder ui and I am trying to build an incrementally syncing api whose lowest cursor granularity is date. I have the incremental sync working. However whole using the cursor it syncs data including the cursor date meaning that everytime it syncs data on the cursor date is duplicated. Is there a combination of the macros to get around this . Or some other setting
k
A message from kapa.ai
Airbyte provides an at-least-once delivery guarantee for incremental syncs, which means it's acceptable for sources to re-send some data when ran incrementally. In cases where the cursor field has a granularity of a day (like your situation), there's no way for the source to know which records with that date were already replicated earlier that day. By convention, sources should prefer resending data if the cursor field is ambiguous [source](https://docs.airbyte.com/understanding-airbyte/connections/incremental-deduped-history#inclusive-cursors). However, you can consider deduplicating the data on the destination side after the sync is completed. This would involve identifying and removing duplicate records based on a unique identifier or a combination of fields that can uniquely identify a record. Unfortunately, the information extracts provided don't contain a specific solution for avoiding duplication during the incremental sync process when the cursor granularity is limited to a date.
a
Can you not achieve this by only allowing syncs once the day is complete and moving the cursor forward so it cannot sync the same day again. In your way it turns a once daily sync into every single row is duplicated that is synced after the initial sync
a
Hi Aidan, You can configure your connector to only sync up to "yesterday" by setting the end date to "{{ day_delta(-1) }}"
scratch that. the data will be delayed, but you'll still have duplicate records 😅 let me see if there's a workaround
a
Hi @Alexandre Girard (Airbyte) Thanks for the suggestion. I am currently doing this but like you said it is the cursor /start_date that I would need to add {{duration(P1D)}} to. Ive tried this but couldnt get it to work That could just my implementation though. I am hoping there is a work around. Unfortunately the api I am building for is terrible and duplicating isnt really an option. Is it possible to use the cursor as a variable ? eg {{config[start_date] or cursor + duration(P1D)}} It would the fail if the start_date was greater than the end date but thats fine . Regards Aidan
a
This won't work -we'll either need to add a macro or an option to sync last_state + granularity. In the meantime, you can avoid duplicate records of you can set a primary key and your destination supports
append_dedup
a
@Alexandre Girard (Airbyte) Thanks . I have this setting currently set but one of or two of the apis I am dealing with eithier I cannot trust the primary keys as they are an amalgamation or the throughput will become a problem eventually dur to size. A macro like this would really open up functionality for apis where they lowest granularity is date.
a
can you share the API doc or your connector manifest? Assuming the date is part of the path, you could use a step size of 1 day (P1D) and use
stream_slice['end_time']
You'll also need to set the end time to
{{ day_delta(-1) }}
eg
/exchangerates_data/{{ stream_slice['end_time'] }}
a
@Alexandre Girard (Airbyte) the the parameter is in the body_json and I am currently using inject_into fileds to add the dates . Can I achieve something similar to you using inject-into . I will share the manifest shortly
If injected stream slice
Copy code
version: 0.35.0
type: DeclarativeSource
check:
  type: CheckStream
  stream_names:
    - DCM
streams:
  - type: DeclarativeStream
    name: DCM
    primary_key: []
    schema_loader:
      type: InlineSchemaLoader
      schema:
        $schema: <http://json-schema.org/schema#|http://json-schema.org/schema#>
Copy code
properties:
          activity:
            type: string
          campaign:
            type: string
          clicks:
            type: integer
          date:
            type: string
          dbM_Cost_USD:
            type: number
          impressions:
            type: integer
          platform_Type:
            type: string
          total_Conversions_Cross_Environment:
            type: integer
          video_Completions:
            type: integer
        type: object
    retriever:
      type: SimpleRetriever
      requester:
        type: HttpRequester
        url_base: <https://omdigitaldataapi.oceanmediainc.com:444|https://omdigitaldataapi.oceanmediainc.com:444>
Copy code
path: /api/DCPDCM/Get/
        http_method: POST
        request_parameters: {}
        request_headers: {}
        request_body_json:
          StartDate: '{{stream_slice[''end_time'']}}'
          EndDate: '{{stream_slice[''end_time'']}}'
        authenticator:
          type: ApiKeyAuthenticator
          header: ClientKey
          api_token: '{{ config[''api_key''] }}'
        error_handler:
          type: CompositeErrorHandler
          error_handlers:
            - type: DefaultErrorHandler
              backoff_strategies:
                - type: ConstantBackoffStrategy
                  backoff_time_in_seconds: 5
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path:
            - data
      paginator:
        type: NoPagination
    incremental_sync:
      step: P1D
      type: DatetimeBasedCursor
      cursor_field: date
      end_datetime: '{{ format_datetime(day_delta(-1), ''%Y-%m-%d'') }}'
      start_datetime: '{{ format_datetime(config[''start_date''] , ''%Y-%m-%d'') }}'
      datetime_format: '%Y-%m-%d'
      cursor_granularity: P1D
spec:
  connection_specification:
    $schema: <http://json-schema.org/draft-07/schema#|http://json-schema.org/draft-07/schema#>
Copy code
type: object
    required:
      - start_date
      - api_key
    properties:
      start_date:
        type: string
        title: Start Date
        default: '2023-01-01'
      api_key:
        type: string
        title: Client Key
        airbyte_secret: true
    additionalProperties: true
  documentation_url: <https://example.org|https://example.org>
type: Spec
@Alexandre Girard (Airbyte) I have made changes to the the manifest.yaml adding stream_slice['end_time'] I feel that this doen t solve the issue as it still takes the slice after it comes incremental of the first day and still duplicates so it still has the same issue. I will know tommorow after it syncs incrementally