https://linen.dev logo
Join Slack
Powered by
# help-connector-development
  • l

    laila ribke

    12/22/2022, 10:03 PM
    Hi all! I need to create a custom source for getting the transactions of my client´s bank account. The API is https://nordigen.com/en/docs. From what I´ve read, you get secret key and secret id, then you have to issue access token and refresh token. The API calls to the end point are with bearer access token. Did someone encounterd this kind of authentication? Hoy do you automatically ask for new access token before sending the get request to the end point? AAAHHHH
    m
    • 2
    • 4
  • l

    laila ribke

    12/27/2022, 6:06 AM
    Hi all, does someone have written a custom connector with OAuth authentication and can share the yaml file? I'm struggling to adapt it to the specific api needs. In the refresh token endpoint Ineed to pass secret_id and secret_key instead of client_id and client_secret..
    a
    • 2
    • 3
  • j

    Jamie Turner

    01/01/2023, 12:44 AM
    hey all. we ( convex.dev ) recently collaborated on getting a source connector convex -> airbyte, and it was a great experience! two questions in thread...
    • 1
    • 2
  • t

    Till Blesik

    01/03/2023, 8:44 PM
    Hi everyone, I guess I posted this in the wrong channel before. I have created a new connector using the low code / configuration method. I successfully implemented a few streams and are now running integration and the standard tests on it. It errors on missing the
    spec.yaml
    file. If I understand the tutorial correctly (Step 3: Connecting to the API | Airbyte Documentation), the spec is part of the
    source_connector-name/connector-name.yaml
    file. Am I supposed to manually copy the spec section from that file into its own file or is there a step I am missing?
    a
    • 2
    • 2
  • m

    Mariam Thiam

    01/09/2023, 2:13 PM
    Hello Airbyte team, I have open this PR to adresse Amplitude connector issue #21010 to enable events stream custom time interval selection to avoid
    MemoryError
    on this stream. Can you please take look on it 🙏 . Thanks
    • 1
    • 1
  • a

    Akash Ghadge

    01/06/2023, 12:01 PM
    Hi team, I am working on creating custom connector for the http api, I am able deploy airbyte locally and create virtual environment in python but when I try to install dependencies for the connector template I am getting an error as below, I am not sure what is causing this, please help! Thank you
    a
    • 2
    • 5
  • b

    Ben Greene

    01/10/2023, 2:24 PM
    Hi All, Apologies if this is not the right channel, or if I am missing a detail in the docs. I am looking into updating a version of a source from v2-v3. I don't want to destroy v2, and was simply going to add a v3 implementation. Is there a best practice or recommended approach for updating or adding a source to cover new versions of already supported API's? Happy Tuesday!
  • b

    Ben Greene

    01/10/2023, 7:45 PM
    for context on above message, it's the Amazon Ads API, v2 is being deprecated in March, and we have been asked to transfer to v3 asap. Is this already being worked on?
    s
    y
    • 3
    • 4
  • t

    Till Blesik

    01/10/2023, 9:17 PM
    Hi everyone, I am building a connector using the low-code approach. I am wondering how I can define selection options for a stream. For example: the Salesforce connector shows you the available tables to select which one you want to sync. I currently have a configuration that requires a module_id, view_id and so on. Instead, I would like to specify that the
    /modules/{module_id}/data
    stream offers you a selection of modules provided by
    /modules
    . Is that even supported by the low-code CDK or would I need to switch to the Python CDK?
    s
    • 2
    • 3
  • s

    Sushant

    01/12/2023, 3:57 PM
    Hi Team, I am trying to automate the airbyte CDC process by using http API. My airbyte is installed in AWS EC2 instance. How can I connect API and create soruce/connection/destination runtime using Python or any other suggested approach ? Please advise.
  • m

    Matheus Pinheiro

    01/12/2023, 6:54 PM
    hi everyone, I've been trying to find alternatives to this problem for the last couple of days but without much success. Can anyone give me a brief estimation on how complex it'd be to write a custom SQL Server Connector using the CDK to fetch CDC records of tables with custom data types?
    • 1
    • 1
  • a

    Antony Ede

    01/16/2023, 3:35 AM
    Hi, I’m using the low code approach to building a connector for Decipher (a survey app). I’m having trouble with the schema for one of the endpoints. The schema of the response is dynamic and varies by survey. Different surveys have different questions and each question name/id is returned as a key in the response for the GetSruevyData endpoint. As per the documentation, the (simplified) response body looks like this:
    Copy code
    [
    	{
    		"status": 1,
    		"uuid": "jwy3kwppp4yuqg86",
    		...
    		"Q1r1": "8",
    		"Q2a": "I thought it was great.",
    		"Q2b": "I thought it was not so great."
    	}
    ]
    Note that the
    status
    and
    uuid
    keys are returned in every response while the
    Q*
    keys are dynamic and vary depending on the survey. Ideally I think I’d restructure the response to something like this. It would then be easy to represent in the schema and should normalise how I want.
    Copy code
    [
    	{
    		"status": 1,
    		"uuid": "jwy3kwppp4yuqg86",
    		...
    		"answers": [
    			{ "question": "Q1r1", "answer": "8" },
    			{ "question": "Q2a", "answer": "I thought it was great." },
    			{ "question": "Q2b", "answer": "I thought it was not so great." }
    		]
    	}
    ]
    How should the be handled? Can/Should I restructure the JSON Schema as above? Do I need to move to the Python CDK instead to do this? Can I get away with just ingesting the full raw responses and do the normalisation?
    a
    • 2
    • 2
  • i

    Iván Alberquilla

    01/17/2023, 12:23 PM
    Hi, I'm creating a source that calls an api endpoint but I need to send a query since is a graphql endpoint. so I need to do a post call like this
    response = <http://requests.post|requests.post>(endpoint, json={'query': query})
    is that possible using the HTTP source? or only get functions are allowed? How can I add the json parameter to the body?
    ✅ 1
  • i

    Iván Alberquilla

    01/17/2023, 1:28 PM
    ok, looking the docs should be done by overriding
    http_method
    ?
  • i

    Iván Alberquilla

    01/17/2023, 1:58 PM
    OK, found the way overriding the functions
    e
    • 2
    • 2
  • r

    Rachel RIZK

    01/20/2023, 11:46 AM
    Hello everyone 🙂 I am currently running some acceptance test on a new source, Apple Search Ads, with the low-code CDK. However, the connector needs a 30days lookback window to reflect how performance is actually computed in Apple Search Ads report. The problem is: it takes way too long for the acceptance tests to run on such a lookback window (no lookback window = 30mn, 2days of lookback window = 1h30)... at some point it timesout, and I know I can change the timeout value but it would take days to run I guess. is there any workaround for this?
    🙏 1
    s
    • 2
    • 2
  • v

    VITOR OLIVEIRA DOS SANTOS

    01/20/2023, 4:58 PM
    Hello everyone 🙂 I'm trying to build a python source connector. The problem is we have many cities and the same endpoints for all. So what I'm trying to do is make just one incremental stream for all cities in my list. However I can't save multiple state to a single stream. Could you help me? Here my code and my sample_state.json
    Copy code
    {
        "historical_ocupation":{
          "city1_historical_occupation": {
            "date": "2023-01-20T00:00:00Z"
          },
          "city2_historical_occupation": {
            "date": "2023-01-18T00:00:00Z"
          },
          "city3_historical_occupation": {
            "date": "2023-01-20T00:00:00Z"
          }
        },
        "historical_daily_public":{
          "city1_historical_daily_public": {
            "date": "2023-01-20T00:00:00Z"
          },
          "city2_historical_daily_public": {
            "date": "2023-01-20T00:00:00Z"
          },
          "city3_historical_daily_public": {
            "date": "2023-01-20T00:00:00Z"
          }
        }
      }
    
    
    class KeyAccessStream(HttpStream):
    
        def __init__(self, url, sites: List[str], **kwargs):
            super().__init__(**kwargs)
            self.url = url
            self.sites = sites
            self.localizacao = self.sites[0]
    
        @property
        def url_base(self) -> str:
            return f'{self.url}/dashboards/'
        def next_page_token(self,
                            response: requests.Response) -> Optional[Mapping[str,
                                                                             Any]]:
            """
            :param response: the most recent response from the API
            :return If there is another page in the result, a mapping (e.g: dict)
                    containing information needed to query the next page
                    in the response.
                    If there are no more pages in the result, return None.
            """
            return None
    
        def parse_response(self,
                           response: requests.Response,
                           **kwargs) -> Iterable[Mapping]:
            """
            :return an iterable containing each record in the response
            """
            if isinstance(response.json(), list):
                for item in response.json():
                    if isinstance(item, dict):
                        item.update({'site':self.localizacao})
                        yield item
            if isinstance(response.json(), dict):
                for key, item in dict(response.json()).items():
                    if isinstance(item, list):
                        for value in item:
                            if isinstance(value, dict):
                                value.update({'site':self.localizacao})
                                yield value
                        return
        
        def stream_slices(self, *, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]:
            return self.sites
    
    class IncrementalKeyAccessStream(KeyAccessStream, IncrementalMixin):
    
        # Fill in to checkpoint stream reads after N records.
        # This prevents re-reading of data if the stream fails for any reason.
        state_checkpoint_interval = None
        _cursor_value = None
    
        def __init__(self, url, sites, start_date: datetime, **kwargs):
            super().__init__(url, sites, **kwargs)
    
            self.start_date = start_date
            self._cursor_value = None
    
    
        @property
        def cursor_field(self) -> str:
            """
            :return str: The name of the cursor field.
            """
            return "date"
    
        @property
        def state(self) -> Mapping[str, Any]:
            """
            :return: A dictionary representing the current state of the stream.
            """
            state = {}
            if self._cursor_value:
                state[casing.camel_to_snake(self.localizacao+self.__class__.__name__)] = {self.cursor_field: self._cursor_value}
            else:
                state[casing.camel_to_snake(self.localizacao+self.__class__.__name__)] = { self.cursor_field: self.start_date.strftime("%Y-%m-%dT%H:%M:%SZ")}
            return state
    
        @state.setter
        def state(self, value: Mapping[str, Any]):
            """
            :param value: A dictionary representing the new state of the stream.
            """
            self._cursor_value = value[casing.camel_to_snake(self.__class__.__name__)][casing.camel_to_snake(self.localizacao+self.__class__.__name__)][self.cursor_field]
    
    
        def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
            if self._cursor_value:
                print(self._cursor_value)
            for record in super().read_records(*args, **kwargs):
                latest_record_date = datetime.strptime(record[self.cursor_field],
                                                       "%Y-%m-%dT%H:%M:%SZ")
                if self._cursor_value:
                    if self._cursor_value < str(latest_record_date):
                        self._cursor_value = str(latest_record_date)
                        yield record
                else:
                    self._cursor_value = str(latest_record_date)
                    yield record 
    
    
    class HistoricalOccupation(IncrementalKeyAccessStream):
    
        primary_key = None
    
        @property
        def http_method(self) -> str:
            return "POST"
    
        def request_body_json(
            self,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, Any] = None,
            next_page_token: Mapping[str, Any] = None,
        ) -> Optional[Mapping[str, Any]]:
            """
            Override when creating POST/PUT/PATCH requests
            to populate the body of the request with a non-JSON payload.
            If returns a ready text that it will be sent as is.
            If returns a dict that it will be converted to a urlencoded form.
            E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
            At the same time only one of the
            'request_body_data' and 'request_body_json'
            functions can be overridden.
            """
    
            if stream_state:
                print("Aqui")
                start_at = datetime.strptime(
                                             stream_state[casing.camel_to_snake(self.localizacao+self.__class__.__name__)][self.cursor_field],
                                             "%Y-%m-%dT%H:%M:%SZ")
                start_at = start_at.strftime("%Y-%m-%dT%H:%M:%SZ")
                print("Recuperei o state:",start_at)
            else:
                start_at = self.start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
                print(start_at, self.localizacao)
                print(casing.camel_to_snake(self.localizacao+self.__class__.__name__))
    
            return {"companies": None,
                    "category": None,
                    "scale": "HOURS",
                    "endAt": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
                    "startAt": start_at}
    
        def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
            self.localizacao = stream_slice
            return stream_slice + "/historical-occupation"
    a
    • 2
    • 3
  • e

    Elliot Trabac

    01/23/2023, 10:45 AM
    Hi team! I’m using the low code CDK and one the stream has is defined as follow:
    Copy code
    questions_stream:
        $ref: "*ref(definitions.base_stream)"
        $options:
          name: "questions"
          primary_key: "id"
          path: "/question/{{ stream_slice.parent_id }}"
        retriever:
          $ref: "*ref(definitions.retriever)"
          record_selector:
            extractor:
              field_pointer: []
          stream_slicer:
            type: SubstreamSlicer
            parent_stream_configs:
              - stream: "*ref(definitions.reviews_stream)"
                parent_key: "question_id"
                stream_slice_field: "parent_id"
    The problem is that in the parent stream, most the the reviews have the same question_id. So in the question stream I’m getting many many duplicates and the extract takes a lot of time. What would you recommend to fetch the unique question_id in this stream?
    a
    • 2
    • 2
  • t

    Theodor Sjöstedt

    01/23/2023, 7:09 PM
    Hi! We have a supplier with a bulk http API that have the following lifecycle: 1. Authentication to an
    /auth
    endpoint 2. POST request with a csv file as payload with list of ids that should be exported 3. Request a status endpoint and wait for the result file to be ready 4. Download the file through another endpoint. We wanna fetch data from this api at least nightly. Is this workflow worth implementing a custom connector for this, or should I just hack something together in Airflow? :)
    s
    • 2
    • 3
  • a

    Aman Satya

    01/26/2023, 10:57 AM
    Hello everyone , When I am building any existing Java connector using the gradlew command , I am getting an error: Task : airbyte-integrationsbasesbase: airbyteDocker failed When I am following the stacktrace of the build , the error is happening due to IOexception: Create process error =193, %1 is not a valid Win32 application. It is unable to run build_image.sh program
  • a

    Aman Satya

    01/26/2023, 10:58 AM
    If I am building any python connector using docker build command it is not failing
  • a

    Aman Satya

    01/26/2023, 10:58 AM
    Only Java connectors are throwing this kind of error. Please help me in this regard
  • t

    Thomas Pedot

    01/27/2023, 5:02 PM
    Hello, I have hard time to integrate my Low Code connector https://airbytehq.slack.com/archives/C021JANJ6TY/p1674483110076999 Now, everything is working fine locally (with python main.py read ... command) I test also with my Dockerfile just in case but I cannot connect it in the UI. I think it is the dicovery part which is failing.
    a
    • 2
    • 12
  • a

    Akash Ghadge

    01/30/2023, 3:57 PM
    Hi Team, I am working on creating a custom connector for public API, I am able to complete the connection successfully, discover and read the data from API. But when I try to read the data using command
    Copy code
    python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
    I am getting an data on command line but at the end of the response I am getting an error
    "failure_type": "system_error"}}
    I am not aware of the reason for this error, I have attached the screen shots for the response below, Please anyone help me understand the reason behind this error Thank you
    m
    • 2
    • 2
  • r

    Ryan (Airbyte)

    01/30/2023, 5:29 PM
    Do we have best practice guidance for optimizing how many records to load per page? I assume it highly API dependent, but w/o guidance from the API source, would it be better to lean towards the min amount of items per page or the max?
    a
    • 2
    • 2
  • r

    Ryan (Airbyte)

    01/31/2023, 8:06 PM
    I am running tests on an output from the UI and then following the guidance of the low code tutorial, but when I run my acceptance tests it seems to expect a file called
    spec.yaml
    where it looks like the latest version uses a
    manifest.yaml
    See below for my error
    Copy code
    E       FileNotFoundError: [Errno 2] No such file or directory: '/Users/rbernstein/airbyte/airbyte-integrations/connectors/source-shelter-luv/source_shelter_luv/spec.yaml'
    a
    • 2
    • 2
  • k

    Kacper Adler

    02/01/2023, 11:10 AM
    is it possible to make retriever optional? I found source where one of the streams are optional but when I tried to add source it forces a synchronization from optional stream and it fails due to that cause in my account this optional stream is disabled
  • k

    Kacper Adler

    02/01/2023, 11:11 AM
    Copy code
    features_stream:
        $ref: "*ref(definitions.base_stream)"
        retriever:
          $ref: "*ref(definitions.retriever)"
          record_selector:
            $ref: "*ref(definitions.selector_features)"
    I want this to accept 403 response as INFO/WARNING - in aha.io it's not required to have features with products. It's possible to have only one but source forces both
    a
    • 2
    • 6
  • b

    Ben Greene

    02/01/2023, 9:41 PM
    Hi All, I'm trying to set up a connector that would take an arbitrary and inconsistent set of values as an input. In order to facilitate this, I'm trying to set the type of the parameter input as to an emtpy dict or object. I'm setting it as so in the spec.yaml file. Is there another way to do this? is there a better way? Whenever I build the connector, I don't see any options for input when I go to build the source. properties: test_dict: type: object description: test
    m
    • 2
    • 1
  • j

    Jimmy McBroom

    02/03/2023, 11:40 PM
    so i use the smartsheet connector a lot -- it connects to one sheet, but oftentimes i would prefer to ingest many sheets, oftentimes organized by "workspace". is it best in this case to create a new connector that operates on the workspace level, or try to extend the current connector with more functionality
12345...21Latest