https://linen.dev logo
Join Slack
Powered by
# advice-data-ingestion
  • r

    Rocky Appiah

    09/16/2022, 3:38 PM
    Can I get some assistance with this? blocking a live system https://airbytehq.slack.com/archives/C038KM9NF7Z/p1663265102848879
  • t

    Tarak dba

    09/16/2022, 3:57 PM
    Hi Guys, I am unable to see the cursor field for the incremental replication with append?
    m
    • 2
    • 8
  • d

    Dylan Roy

    09/16/2022, 7:27 PM
    Does anyone know if there's a connector that exists that allows me to point at a GCS directory as a source instead of a specific file? The existing source connector doesn't seem to support this.
  • r

    Rocky Appiah

    09/18/2022, 1:12 PM
    Anyone have insight into this bug here? Should I switch to pgoutput instead of wal2json to capture JSON (recommendation is the other way around).
  • a

    Abdulrahman Abu Tayli

    09/19/2022, 12:01 PM
    Hi there I’m using Airbyte version 0.40.7, s3 destination version 0.2.12 When using csv or jsonl format, the output files are uploaded compressed (.gz). Is there anyway I can save them uncompressed?
    e
    • 2
    • 1
  • a

    Abdulellah Alnumay

    09/20/2022, 9:11 AM
    Hey all, Where’s the appropriate place to discuss adding more support to a connector? I’m using Tempo’s connector, and it seems that there are a few endpoints that aren’t supported and doesn’t get extracted — mainly,
    teams
    and
    team_memebers
    .
    m
    • 2
    • 1
  • t

    tanuj soni

    09/20/2022, 9:40 AM
    Hi I am using the airbyte version 0.40.0-alpha and i have created one workflow from hubspot to bigquery. But the campaign data is not getting synced correctly. I am getting a random value for the lastupdatedTime. Could anyone please let me know what is the api being used and how it is fetching campaign data. And do we have any open ticket for the same ?
    m
    • 2
    • 3
  • d

    David Mattern

    09/20/2022, 10:50 PM
    Hello. I am trying to generate a dynamic schema from a source api. I saw this answer from alafanechere: "Dynamic schema change is not something supported by Airbyte at the moment. Your connector must be able to know the schema of the records it will handle before running a replication. You can implement your own discover method on your Source to infer a dynamic schema from a source, but this schema should not change from sync to sync. You can checkout our Salesforce connector to check a custom implementation of discover." https://discuss.airbyte.io/t/custom-source-connector-dynamic-response/897 Could alafanechere or someone else point out where in the Salesforce connector this is done? I could not find where they implement their own discover method. In the Airbyte UI, whenever you click "Set up connection", I assume that this looks for the schema files? Do you know if the Salesforce connector or any other implementation can query the API for the schema at this point? In my code, I do override the get_json_schema() method. Here is my code if interested: https://github.com/NMWDI/airbyte/blob/ckan-jdm/airbyte-integrations/connectors/source-ckan/source_ckan/source.py#L123 Thanks! David
    a
    • 2
    • 2
  • o

    Opeyemi Daniel

    09/21/2022, 11:54 AM
    Hello team Can someone explain how to handle arrays in airbyte. I'm trying to migrate data from mongo db
    m
    • 2
    • 1
  • b

    Bobby Iliev

    09/21/2022, 3:57 PM
    Hi all, is there any progress with the Kafka integration + Airbyte cloud? Or a rough ETA on when this will be doable?
    m
    y
    a
    • 4
    • 3
  • m

    Marielby Soares

    09/22/2022, 7:22 PM
    Hello. Is it possible to remove a state from a connection? Maybe trough manual database update?
    m
    • 2
    • 2
  • h

    Hong Wu

    09/22/2022, 8:48 PM
    Hello! i just got airbyte set up in GCP and integrated Harvest pipeline. We have some sources that do not have connectors yet, which channel is for connector development discussion?
  • r

    Robert Put

    09/22/2022, 11:27 PM
    Hello! running into this issue when trying to ingest data with the postgres source https://airbytehq.slack.com/archives/C021JANJ6TY/p1663851094427639 for some reason incremental is not an option.... there are valid primary keys and curors in the tables, but they don't appear as options...
  • t

    Tiri Georgiou

    09/23/2022, 9:28 AM
    Hi team, We are currently having an issue with the salesforce connector. We notice every time we sync
    Case
    salesforce object there are significant number of duplicates (i.e. per case_id we could have up to 13 duplicates) in the source table (i.e.
    _airbyte_raw_case
    ). It looks like the normalization stage takes care of the deduplication in the transform stage, however the duplication of the raw data is causing significant overhead when loading into a DWH. I was thinking of raising this as an issue on GH but want to first make sure: 1. This is not expected behaviour? 2. Could this be a quick fix and if so does anybody know where in the codebase this issue might be originating from? Thanks
  • t

    Tarek Kekhia

    09/23/2022, 12:58 PM
    Hi team, has anyone built out a connection with Bigquery as the source and Snowflake as the destination? I have a use case where I need to replicate an initial 2 TB of data from Bigquery to Snowflake and then set up continuous ingestion of around 5GB daily. I did an initial test yesterday with syncing over around 9 GB of data but my Airbyte server crashed on an EC2 t2.large instance deployed via Docker. I did some research and believe it is related to this issue (https://github.com/airbytehq/airbyte/issues/6533#issuecomment-1188663632). Before digging in further I wanted to reach out to the community and see if anyone else has encountered this issue. Happy Friday!
    • 1
    • 2
  • a

    Aditya Nambiar

    09/23/2022, 4:00 PM
    When I use airbyte to ingest say a BigQuery/Postgres table and define a cursor field on it to fetch incremental records every 5min. Does airbyte scan the entire table to identify new records ? Is it on me to define an index on the cursor table so that airbyte can quickly identify the new records ?
    m
    • 2
    • 2
  • h

    Hong Wu

    09/24/2022, 3:05 PM
    I have the Quickbooks connector set up, but ran into many issues. There was no full refresh / overwrite options, and the incremental modes are not pulling in all the records. Anyone has similar issues?
  • t

    Tien Nguyen

    09/24/2022, 10:56 PM
    #advice-data-ingestion Hi channel, I am trying to build a custom python source with incremental sync, but can't really get the image to work. It keep append the data even though the updated at is the same. Can anyone please give me some guidance? I will posted the code here.
  • t

    Tien Nguyen

    09/24/2022, 10:56 PM
    Copy code
    #
    # Copyright (c) 2021 Airbyte, Inc., all rights reserved.
    #
    
    
    import json
    import os.path
    import datetime
    from typing import Dict, Generator,Mapping,Any, Union
    
    from .kindful_api import *
    from airbyte_cdk.logger import AirbyteLogger
    from airbyte_cdk.models import (
        AirbyteCatalog,
        AirbyteConnectionStatus,
        AirbyteMessage,
        AirbyteRecordMessage,
    AirbyteStateMessage,
        AirbyteStream,
        ConfiguredAirbyteCatalog,
        Status,
        Type
    
    )
    from airbyte_cdk.sources import Source
    
    class SourceKindfulApi(Source):
    
        def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
            """
            Tests if the input configuration can be used to successfully connect to the integration
                e.g: if a provided Stripe API token can be used to connect to the Stripe API.
    
            :param logger: Logging object to display debug/info/error to the logs
                (logs will not be accessible via airbyte UI if they are not passed to this logger)
            :param config: Json object containing the configuration of this source, content of this json is as specified in
            the properties of the spec.json file
    
            :return: AirbyteConnectionStatus indicating a Success or Failure
            """
            try:
                # Not Implemented
    
                return AirbyteConnectionStatus(status=Status.SUCCEEDED)
            except Exception as e:
                return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {str(e)}")
    
        def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
            """
            Returns an AirbyteCatalog representing the available streams and fields in this integration.
            For example, given valid credentials to a Postgres database,
            returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
    
            :param logger: Logging object to display debug/info/error to the logs
                (logs will not be accessible via airbyte UI if they are not passed to this logger)
            :param config: Json object containing the configuration of this source, content of this json is as specified in
            the properties of the spec.json file
    
            :return: AirbyteCatalog is an object describing a list of all available streams in this source.
                A stream is an AirbyteStream object that includes:
                - its stream name (or table name in the case of Postgres)
                - json_schema providing the specifications of expected schema for this stream (a list of columns described
                by their names and types)
            """
    
            streams = []
            dirname= os.path.dirname(os.path.realpath(__file__))
            ##transaction
            transaction_spec_path=os.path.join(dirname, "json_schema/transaction.json")
            transaction_catalog=read_json(transaction_spec_path)
            ### contact
            contact_spec_path=os.path.join(dirname,"json_schema/contact.json")
            contact_catalog=read_json(contact_spec_path)
    
            #### Meta Json
            funds=os.path.join(dirname,"json_schema/funds.json")
            funds_catalog=read_json(funds)
    
            ### Group
            group=os.path.join(dirname,"json_schema/group.json")
            group_catalog=read_json(group)
    
    
            ### not yet Implement Incremental Sync --> Need more direction
            # streams.append(AirbyteStream(name="transaction",json_schema=catalog,supported_sync_modes=["full_refresh","incremental"],source_defined_cursor=True,
            #             default_cursor_field=["created_at"])
            #                )
    
            streams.append(AirbyteStream(name="transaction",json_schema=transaction_catalog,supported_sync_modes=["full_refresh","incremental"], source_defined_cursor=True,default_cursor_field=["updated_at"]))
            streams.append(AirbyteStream(name="contact",json_schema=contact_catalog,supported_sync_modes=["full_refresh","incremental"],source_defined_cursor=True,default_cursor_field=["updated_at"]))
            streams.append(AirbyteStream(name="funds", json_schema=funds_catalog, supported_sync_modes=["full_refresh","incremental"],source_defined_cursor=True,default_cursor_field=["updated_at"]))
            streams.append(AirbyteStream(name="group", json_schema=group_catalog, supported_sync_modes=["full_refresh","incremental"],source_defined_cursor=True,default_cursor_field=["updated_at"]))
            return AirbyteCatalog(streams=streams)
    
        def read(
            self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any]
        ) -> Generator[AirbyteMessage, None, None]:
            """
            Returns a generator of the AirbyteMessages generated by reading the source with the given configuration,
            catalog, and state.
    
            :param logger: Logging object to display debug/info/error to the logs
                (logs will not be accessible via airbyte UI if they are not passed to this logger)
            :param config: Json object containing the configuration of this source, content of this json is as specified in
                the properties of the spec.json file
            :param catalog: The input catalog is a ConfiguredAirbyteCatalog which is almost the same as AirbyteCatalog
                returned by discover(), but
            in addition, it's been configured in the UI! For each particular stream and field, there may have been provided
            with extra modifications such as: filtering streams and/or columns out, renaming some entities, etc
            :param state: When a Airbyte reads data from a source, it might need to keep a checkpoint cursor to resume
                replication in the future from that saved checkpoint.
                This is the object that is provided with state from previous runs and avoid replicating the entire set of
                data everytime.
    
            :return: A generator that produces a stream of AirbyteRecordMessage contained in AirbyteMessage object.
            """
            token = authorization("<your token>")
            contact = contact_query(token)
            transaction=transaction_query(token)
            meta=meta_api(token)
    
            data_query = {
                "query":
                    [
                        "not_linked"
                    ]
            }
            contact.query = data_query
            transaction.query=data_query
            # updated_cursor = datetime(datetime.now().year, datetime.now().month, datetime.now().day).isoformat()
    
            for stream in catalog.streams:
                if stream.stream.name=="transaction":
                    for data_ in transaction:
                        ## Cursor check if updated time is less than then write
                        date_,check=time_check(data_)
    
                        if check ==False:
                            continue
                        data_["updated_at"] = date_
                        yield generate_state(state, stream, {"cursor": data_["updated_at"]})
                        yield generate_record(stream,data_)
                elif stream.stream.name=="contact":
                    for data_ in contact:
                        date_, check = time_check(data_)
    
                        if check == False:
                            continue
                        data_["updated_at"] = date_
                        yield generate_state(state,stream,{"cursor":data_["updated_at"]})
                        yield generate_record(stream,data_)
                elif stream.stream.name=="funds":
                    for data_ in meta.funds:
                        date_, check = time_check(data_)
                        if check == False:
                            continue
                        data_["updated_at"] = date_
                        yield generate_state(state, stream, {"cursor": data_["updated_at"]})
                        yield generate_record(stream,data_)
                elif stream.stream.name=="group":
                    for data_ in meta.groups:
                        date_, check = time_check(data_)
                        if check == False:
                            continue
                        yield generate_state(state, stream, {"cursor": data_["updated_at"]})
                        data_["updated_at"] = date_
                        yield generate_record(stream,data_)
                else:
                    raise TypeError("Unknown stream")
    
    def get_stream_cursor(state: Dict[str, any], stream: str) -> int:
        cursor = (state[stream]["cursor"] or None) if stream in state else None
        return cursor
    
    def read_json(file_path):
        with open(file_path,"r") as f:
            return json.loads(f.read())
    def generate_record(stream: any, data: any):
        dict = data.copy()
    
        # timestamps need to be emitted in ISO format
        for key in dict:
            if isinstance(dict[key], datetime.datetime):
                dict[key] = dict[key].isoformat()
        # timestamps need to be emitted in ISO format
    
        return AirbyteMessage(
            type=Type.RECORD,
            record=AirbyteRecordMessage(stream=stream.stream.name, data=dict, emitted_at=int(datetime.datetime.now().timestamp()) * 1000),
        )
    def generate_state(state: Dict[str, any], stream: any, data: any):
    
        state[
            stream.stream.name
        ] = data
        return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=state))
    def time_check(data:any) ->Union[bool]:
    
            ### try to convert to Iso8601 format
            ### format:
            """
            2011-07-17T00:57:48.000-06:00
            """
    
            time_st = data["updated_at"]
            try:
                time_value=datetime.datetime.fromisoformat(time_st)
                # date_ = f"{time_value.year}-{time_value.month}-{time_value.day}"
                # now=datetime.now(pytz.timezone("US/Central"))
                # if now > time_value:
                #     return True
                # else:
                #     return False
                return time_value.date(),True
            except ValueError:
    
                try:
                    time_value = datetime.datetime.strptime(time_st, '%Y-%m-%d %H:%M:%S UTC')
                    # date_ = f"{time_value.year}-{time_value.month}-{time_value.day}"
                    # now = datetime.utcnow()
                    return time_value.date(),True
                    # if now >time_value:
                    #     return True
                    # else:
                    #     return False
                except ValueError:
                    ### return False for now
                    return None, False
                    # raise (f"Not supported time_stamp format {time_st}")
  • t

    Tomi

    09/26/2022, 4:15 PM
    Hi team, I try to setup HubSpot source, but they are not allowing to create api_keys anymore. Did somebody had luck with setting up hubspot source since they migrated to private apps?
  • r

    Rocky Appiah

    09/27/2022, 3:14 PM
    Any insight into this would be greatly appreciated. Running a test at the moment using pgoutput, instead of wal2json plugin
  • a

    Amit Jain

    09/27/2022, 5:31 PM
    Hello Folks, I'm using Snowflake as a source and Postgresql as destination. Source: Snowflake 0.1.24 Destination: Postgresql 0.3.24 After the latest update on Snowflake driver getting following error:
    Copy code
    2022-09-27 17:19:14 - Additional Failure Information: io.airbyte.integrations.source.relationaldb.InvalidCursorException: The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. {tableName='APPDATA_VIEWS.WV_ADDRESSES', cursorColumnName='ROW_LOADED_AT', cursorSqlType=TIMESTAMP_WITH_TIMEZONE},{tableName='APPDATA_VIEWS.WV_PRACTITIONERS', cursorColumnName='ROW_LOADED_AT', cursorSqlType=TIMESTAMP_WITH_TIMEZONE},{tableName='APPDATA_VIEWS.WV_RECOMMENDATIONS', cursorColumnName='ROW_LOADED_AT', cursorSqlType=TIMESTAMP_WITH_TIMEZONE},{tableName='APPDATA_VIEWS.WV_TREATMENT_PLAN_TEMPLATE_ITEMS', cursorColumnName='ROW_LOADED_AT', cursorSqlType=TIMESTAMP_WITH_TIMEZONE}
    I think I was at Snowflake 0.1.17 when it was able to run with
    ROW_LOADED_AT
    column as cursor.
    ROW_LOADED_AT
    is timestamp datatype on Snowflake. Can you confirm if I can still make it work with same column.
  • m

    Manitra Ranaivoharison

    09/28/2022, 6:38 AM
    Hello, we are resynchronizing our Aramis Group data on airbyte by making a post on /api/v1/connections/sync giving the connection_id, but sometimes it's just 1 stream name that crashes and not all from the same connection, would it be possible to launch a sync only on a specific stream name?
  • l

    lucien

    09/28/2022, 8:04 AM
    Hi every time I upgrade the Postgres connector the ingestion time increases a loooot Now it’s 3h for 3GB it’s definitely not possible Do you plan to make something to speed up the ingestion time (or at least get back the performance from the previous version of postgres connector). I really like Airbyte as an ingestion tool but I will be have to consider new options given such a slow ingestion time
    z
    • 2
    • 2
  • t

    Tarak dba

    09/28/2022, 2:46 PM
    Hello Folks, I am getting the below error while creating a Destination connection and the destination DB is MariaDB. Can somebody help me on the below error? Could not connect with provided configuration. HikariPool-1 - Connection is not available, request timed out after 60002ms. NOTE: With the same credentials, I am able to create a Source connection, but why I am not able to create a destination connections where its working fine with source connections and credentials are same for destination and source.
  • l

    Lucas Gonzalez

    09/28/2022, 6:11 PM
    Hi everyone! I am trying to use Airbyte to replicate data from Stripe and I would like to use the incremental mode as much as possible (I am embedding Airbyte into a SaaS application I am building, so I would like to avoid full refresh as much as possible to reduce costs). Now my problem is that invoices on stripe can have different status, and the status can be updated after creation (from “draft” to “open” and then “paid”). In incremental mode, the cursor used for increments is the created date of the invoice, therefore I am missing all the status updates and this is very problematic in my case. My guess is that I have three options: 1 ) use full refresh at each sync 2 ) sync with the ‘events’ stream from stripe, which contains data about invoice updates, and then use this data to manually apply the same changes to my ‘invoices’ table 3) implement myself another solution that would listen to stripe webhooks and update my database accordingly Am I missing something ? I’d appreciate any input! Thank you
    e
    • 2
    • 1
  • i

    Ismael Goulani

    09/28/2022, 6:50 PM
    I everyone, I’m trying to create an Airbyte connection between stripe and BigQuery through tue APi, The connection creation succeeds but once I'm trying to sung data it fails. The error message seems to indicate that's there some issue with the pod that runs the sync. Did anyone have done it before ? Do you have some resources I can look at? Thanks in advance
  • r

    Ramesh Shanmugam

    09/29/2022, 6:08 AM
    Hi, i am trying postgres to s3 parquet . *int8*(bigint) fields in postgres are converted to integer in parquet. because of that data getting corrupted. any way make this big integer in parquet?
    e
    • 2
    • 2
  • r

    Rocky Appiah

    09/29/2022, 2:41 PM
    Any help with this would be greatly appreciated -> https://discuss.airbyte.io/t/json-not-being-copied-on-incremental-sync/2727
  • a

    Alejandro A

    09/29/2022, 3:08 PM
    Hello, I am checking out Airbyte among some other tools to capture events from people navigating our website, I am not sure if Airbyte is the best tool to do that. Some solutions come already w/ APIs integrated but I guess that a standard HTTP API where we send events and store in a RDBMS should be developed and maintained?