https://linen.dev logo
Join Slack
Powered by
# singer-tap-development
  • y

    Yordan Ivanov

    11/07/2024, 8:11 AM
    Hi all, I opened a PR on tap-salesforce last month. What do I need to do to get a review? https://github.com/MeltanoLabs/tap-salesforce/pull/63
    👀 1
    e
    • 2
    • 1
  • n

    Nir Diwakar (Nir)

    11/19/2024, 11:05 AM
    I am using the graph API for getting details of MS resources. However, I encounter name resolution errors 1% of the times:
    requests.exceptions.ConnectionError: HTTPSConnectionPool(host='<http://login.microsoftonline.com|login.microsoftonline.com>', port=443): Max retries exceeded with url: /b5953ddd-dd80-4110-904b-e503716f0caf/oauth2/v2.0/token (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7b99afdc5ed0>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution'))
    To handle this I am keeping the
    prepare_request
    method in try except block and raising
    RetriableAPIError
    , but the retries aren't happening. Should I handle this in a different way? from the traceback the following exceptions happen: • requests.exceptions.ConnectionError • urllib3.exceptions.MaxRetryError • socket.gaierror
    👀 1
    r
    v
    • 3
    • 11
  • j

    Jens Christian Hillerup

    11/27/2024, 2:23 PM
    I want to extract from a REST API that has an endpoint, say
    GET /documents
    that gives me a list of documents, and then another one like
    GET /document/<doc_id>
    that returns important information that's not part of the former. I've wrapped REST APIs before with the tap cookiecutter in the Meltano SDK, but how would this actually translate into a
    RESTStream
    in the Singer tap? Is it even a supported use case to have an "N+1" stream which requires another roundtrip per record? (I'm aware it's going to be slow, luckily it's not that many rows). Thanks!
    a
    r
    • 3
    • 4
  • a

    Andy Carter

    11/29/2024, 10:05 AM
    I have a tap that I am using a google client library for authentication. I create the client in the
    discover_streams
    method (in my
    _custom_initialization
    method), which works fine.
    Copy code
    def discover_streams(self) -> list[streams.GoogleSearchConsoleStream]:
            """Return a list of discovered streams.
    
            Returns:
                A list of discovered streams.
            """
            self._custom_initialization()
            return [
                streams.PerformanceReportPage(self, service=self.service),
                streams.PerformanceReportDate(self, service=self.service),
                streams.PerformanceReportCountry(self, service=self.service),
                streams.PerformanceReportQuery(self, service=self.service),
                streams.PerformanceReportDevice(self, service=self.service),
            ]
    But when I am trying to develop some tests, this obviously causes an issue, as I don't have any test credentials. Is there an alternative method I can move this
    _custom_initialization
    in my
    Tap
    class so it run after
    discover_streams
    ? Any perhaps any suggestions for how to handle 'fake' credentials for developing some unit tests?
    v
    • 2
    • 5
  • i

    Izaak

    12/02/2024, 12:49 PM
    Hi everyone, I'm developing a custom tap that extracts jsonlines files. I want to keep the original json object of unspecifiable schema as a json type so I can load it into postgres as JSON or JSONB. How can I achieve that? I'm running into issues were a 'singer_sdk.typing.ObjectType' without any further schema specifications is not accepted by the typing checks..
    v
    e
    • 3
    • 6
  • i

    Izaak

    12/03/2024, 3:01 PM
    Back again with another question! So the custom tap I'm working on scans the local file system for files to sync. All files belong to the same stream, like a track-record of events per day, with a file for each day. Files should only be synced once to avoid duplication. How should I implement that? The docs recommend against writing to the state yourself, and I can't imagine that this is such a unique case, so what am I missing?
    ✅ 1
    e
    a
    • 3
    • 4
  • s

    steven_wang

    12/05/2024, 2:47 PM
    Question: if a stream has a pre-defined schema and one of the columns/fields is no longer returned by the source (eg column deleted in the source database, REST api no longer returns the field, etc.) and the stream's schema is not updated, what happens?
    v
    e
    • 3
    • 5
  • a

    Andy Carter

    12/12/2024, 3:37 PM
    I have a custom tap that has a single setup step to run, (a simple rest call) which writes multiple files out to a azure location, before I can return multiple streams, one for each file. Where should I place the code to run this setup step? Which method should I override? I need the
    run_id
    available to me, so I think
    Tap.discover_streams
    is too early. But it's not correct to do that at the stream level (as part of
    Stream.get_records
    ) because it only needs to be carried out once per tap run.
    Tap.sync_all
    would be another option if it weren't a
    @final
    method. Is there anywhere I can jump in, a
    setup
    ish method in Tap? Or should I implement a singleton for all my streams to make use of in
    get_records
    ?
    v
    • 2
    • 6
  • n

    Nir Diwakar (Nir)

    12/19/2024, 8:18 AM
    I have a tap for Egnyte, Egnyte has a REST API where it sends response in multiple pages, the pages are batched in time slices. Unfortunately, it seems there might be some error in their paging logic. What's happening is, the last record of 1 page is of greater value than 1st record of 2 page. I sort all records by time in parse_response. But due to records being not sorted inter page, I get singer_sdk.exceptions.InvalidStreamSortException Is it possible to retrieve data from all pages and then perform a sort by date? This is the code I am using:
    Copy code
    class EventsStream(EgnyteStream):
        """Define custom stream."""
    
        name = "events"
        path = "/pubapi/v2/audit/stream"
        replication_key = "eventDate"
        records_jsonpath = "$.events[*]"
        is_sorted = True
        page = 0
    
        def get_url_params(
            self,
            context: Optional[dict],
            next_page_token: Optional[Any],
        ) -> dict[str, Any]:
            params: dict = {}
            if next_page_token:
                params["nextCursor"] = next_page_token
            else:
                params["startDate"] = self.start_time
            return params
        
        def parse_response(self, response: requests.Response | None):
            if not response:
                raise RetriableAPIError("No response from API")
    
            try:
                data = response.json()
            except AttributeError as e:
                <http://logging.info|logging.info>(f"{e} with response {response} and {response.text}")
                return
    
            events = data.get("events", [])
    
            for event in events:
                if not event:
                    continue
    
                for key in ["loginDate", "logoutDate"]:
                    if event.get(key) == 0:
                        event.pop(key, None)
    
                for key in ["loginDate", "logoutDate", "date"]:
                    if key in event:
                        dt = datetime(1970, 1, 1, tzinfo=timezone.utc) + \
                            timedelta(milliseconds=event[key])
                        event["eventDate"] = dt.isoformat().replace("+00:00", "Z")
                        del event[key]
                        break
    
            if events:
                events.sort(key=lambda x: x.get("eventDate", ""))
            self.page = self.page + 1
            <http://logging.info|logging.info>(f'Page: {self.page} {events}')
            yield from events or ([{}] if data.get("moreEvents") else [])
    
        def post_process(
            self,
            row: dict,
            context: Context | None = None,
        ) -> dict | None:
            if not row:
                return None
    
            if 'id' not in row or row['id'] is None:
                row['id'] = str(uuid.uuid4())
    
            return super().post_process(row, context)
    
        def get_new_paginator(self) -> EgnyteEventsPaginator:
            return EgnyteEventsPaginator()
    Error occurs here:
    Copy code
    'e5deb04e-75ba-4894-8709-001c5f4295d5', 'auditSource': 'FILE_AUDIT', 'eventDate': '2024-12-19T07:14:18.591000Z'}]
    2024-12-19 07:44:24,675 | INFO     | target-elasticsearch.events | Starting batch {'batch_id': '720b0104-adee-4855-b3dc-420fe0033a2a', 'batch_start_time': datetime.datetime(2024, 12, 19, 7, 44, 24, 675328, tzinfo=datetime.timezone.utc)} for dev_events_raw_egnyte_000
    2024-12-19 07:44:27,219 | INFO     | root                 | Page: 2 [{'sourcePath': '/Shared/Projects/2022/22-0055 Vicksburg National Cemetery Burial Excavation and Stablization/Lab/FORDISC Results/Burial 19_Fordisc Results_updated.docx', 'targetPath': 'N/A', 'user': 'Brittany McClain ( <mailto:bmcclain@owest.com|bmcclain@owest.com> )', 'userId': '1121', 'action': 'Read', 'access': 'Desktop App', 'ipAddress': '174.26.41.229', 'actionInfo': '', 'auditSource': 'FILE_AUDIT', 'eventDate': '2024-12-19T07:09:36Z'},
    e
    • 2
    • 1
  • j

    Jun Pei Liang

    01/18/2025, 12:13 AM
    I'm unsure if this is the right place to post my question. I am evaluating Oracle tap to retrieve data from an Oracle database into parquet file for change data capture . The parquet file will be loaded into Databrick via auto loader. The data in oracle we need involves some system-built API or function, so we can't extract the data directly from a database table but instead have to use an Oracle view. However, based on my research, it appears that the incremental replication method only works with tables and not views. The issue is that we can't to recreate those system APIs or functions in Databricks. How should we proceed with CDC for data from an Oracle view?
  • r

    Reuben (Matatika)

    01/18/2025, 3:46 AM
    Is it somehow possible to subclass
    SQLStream
    to create a "well-known" stream that defines a
    replication_key
    that is respected (as in standard `RestStream`/`Stream` implementations)?
    client.py
    Copy code
    class TestStream(SQLStream):
        connector_class = TestConnector
    
    class AStream(TestStream):
        name = "a"
        replication_key = "updated_at"
    
    class BStream(TestStream):
        name = "b"
        replication_key = "updated_at"
    tap.py
    Copy code
    KNOWN_STREAMS = {
      "a": stream.AStream,
      "b": streams.BStream,
    }
    
    
    ...
    
    
        def discover_streams(self):
            for stream in super().discover_streams():
                if stream_class := KNOWN_STREAMS.get(stream.name):
                    stream = stream_class(
                        tap=self,
                        catalog_entry=stream.catalog_entry,
                        connector=self.tap_connector,
                    )
    
                yield stream
    I've played around with this and a few other variations, but can't seem to get it to work - the sync runs as if no replication key was defined. Is there another (possibly simpler) way to do this?
  • p

    Peter Clemenko

    01/19/2025, 2:22 PM
    question
  • p

    Peter Clemenko

    01/19/2025, 2:23 PM
    is there a way to spin up a tap or target skeleton from python programatically
  • p

    Peter Clemenko

    01/19/2025, 2:23 PM
    example
  • p

    Peter Clemenko

    01/19/2025, 2:23 PM
    if i wanted to use langchain to automatically generate taps and targets
  • p

    Peter Clemenko

    01/19/2025, 2:23 PM
    using llama
  • i

    Ian OLeary

    02/04/2025, 6:52 PM
    Copy code
    2025-02-04T18:44:21.577497Z [info     ] 2025-02-04 13:44:21,576 | INFO     | tap-litmos.lt_UserDetails | Pagination stopped after 0 pages because no records were found in the last response
    Where do I need to alter this to continue paginating even if no records were returned for a particular week? I looked into the BasiAPIPaginator and I didn't find where this message is printed. Here's my current paginator:
    Copy code
    class LitmosPaginator(BaseAPIPaginator):
    
        def __init__(self, *args, **kwargs):
            super().__init__(None, *args, **kwargs)
    
        def has_more(self, response):
            return self.get_next(response) < date.today()
    
        def get_next(self, response):
            params = dict(parse_qsl(urlparse(response.request.url).query))
            return datetime.strptime(params["to"], OUTPUT_DATE_FORMAT).date() + timedelta(seconds=1)
    I'm paginating via a 1 week date range and even if there were no records I still want to move on to the next range. edit: would it be in "advance"? def advance(self, response: requests.Response) -> None: ... ... # Stop if new value None, empty string, 0, etc. if not new_value: self._finished = True else: self._value = new_value
    p
    • 2
    • 2
  • a

    Andy Carter

    02/24/2025, 2:52 PM
    I have a tap that is based on an Azure Datafactory (ADF) pipeline run - it's a long story.... The tap class itself triggers a pipeline run, which extracts csv data, and saves it into different named files (aligning to tables of a database). Each sdk stream (40+ streams) is checking for the presence of the new file corresponding to its table in storage (using backoff). Once the file arrives it is read and then emitted via the stream in
    get_records
    in the normal way. Instead of checking and rechecking for each new file in storage, I've discovered I can check for pipeline logs to see as each table / stream file is complete, then just read the file when I know it's saved to storage. However, I don't want to replace my 'file check' code with 'pipeline log check' code in each stream, as the rest call takes a while. Is there a process I can run asynchronously at the
    tap
    level every 10 seconds or so, and in my
    stream.get_records()
    check the tap's cached version of the logs from ADF, and emit records if appropriate?
    Ideally I don't want to wait for the whole pipeline to finish before I start emitting records - some data is ready in seconds but others take minutes.
    👀 1
    e
    • 2
    • 2
  • t

    Tanner Wilcox

    02/27/2025, 11:38 PM
    The API i'm pulling from for my tap doesn't support pagination. I'm having a hard time finding documentation on how to disable that. I can see when I run my tap that it's making a handful of calls to my API. The docs for
    request_records())
    on
    RESTStream
    say: "If pagination is detected, pages will be recursed automatically." but I'm not seeing how it's detecting pagination in this case
    ✅ 1
    a
    • 2
    • 6
  • j

    Jun Pei Liang

    03/07/2025, 12:14 AM
    does anyone know where bookmark is saved? i have oracle tap in the following, it keep exporting the entire table .
    Copy code
    - name: tap-oracle
        variant: s7clarke10
        pip_url: git+<https://github.com/s7clarke10/pipelinewise-tap-oracle.git>
        config:
          default_replication_method: LOG_BASED
          filter_schemas: IFSAPP
          filter_tables:
          - IFSAPP-ABC_CLASS_TAB
          host: xxxx
          port: 1521
          service_name: xxxx
          user: ifsapp
        metadata:
          IFSAPP-ABC_CLASS_TAB:
            replication-method: INCREMENTAL
            replication-key: ROWVERSION
    e
    • 2
    • 1
  • h

    hawkar_mahmod

    03/23/2025, 3:06 PM
    I've started developing a tap called
    tap-growthbook
    using the Meltano SDK. When testing without meltano it runs fine, but then when I invoke with
    meltano run
    via
    uv run
    I get a discovery related error, and I don't know how to debug it. Here's the error:
    ✅ 1
    e
    • 2
    • 5
  • h

    hawkar_mahmod

    03/24/2025, 3:48 PM
    Can someone give me a high-level overview of what happens when parent-child streams run? I'm getting strange behaviour that I don't understand. When I limit the number of records the parents return (by overwriting the
    get_records
    method) the child class produces the expected number of records but the parent class just stops producing any data in the destination. I am only overwriting on the parent
    get_records
    not the child stream.
    e
    • 2
    • 1
  • g

    Gordon Klundt

    03/24/2025, 7:57 PM
    I'm looking for guidance on this particular use case. I'm hoping there is a reference architecture in the world using this pattern. I want to write a stream that reads from an asynchronous endpoint. • I need to POST a payload to get an export ID (endpoint 1) • Use the export ID to poll a status (endpoint 2 - polled until FINISHED) • Get the "chunks" listed in an array when the status reads "FINISHED" (endpoint 3)
    r
    • 2
    • 1
  • s

    Stéphane Burwash

    03/25/2025, 5:51 PM
    Hey everyone 👋 Happy tuesday! I was wondering if anyone knew how I could only select specific columns to sync using tap-postgres in a json payload. We have a
    payload
    column which contains WAY too much PII, and I was hoping to be able to sync only the datapoints I needed (only grab values from
    payload.xyz
    if
    payload.xyz
    exists) Thanks! cc @visch since you're my tap-postgres guru
    👋 1
    e
    d
    v
    • 4
    • 13
  • h

    hawkar_mahmod

    03/28/2025, 5:25 PM
    Hey everyone! I'm finding that my pipelines that involve parent-child streams that load to DB are incredibly slow (+2hrs for a couple thousands entities per day), and I believe this is because each entity and it's children are being written to the database one-by-one rather than being batched. Anyone know where I should start to address this? I'm seeing this with both target-redshift and target-duckdb (not as slow as Redshift).
    e
    • 2
    • 9
  • r

    Reuben (Matatika)

    04/10/2025, 3:19 AM
    Is there a recommended approach to handling API endpoints that support start and end dates in chunks (i.e. sliding window) and emitting/finalising state for each? The API we are dealing with supports exporting data in this way - and there is quite a lot of it - so dividing up into multiple requests and keeping track of the start date in state would be ideal. I implemented it mostly in a custom paginator class, but was not able to find how/where to apply state operations after each "page" (without overriding
    request_records
    entirely). Thought about setting up a date range parent stream to pass dates through as context, which I think would solve the state update part of my problem, but it felt counter-intuitive to how parent-child streams should work and incremental replication in general (I would end up with a bunch of interim dates stored in state as context, as well as the final date as
    replication_key_value
    )
    👀 1
    v
    s
    +2
    • 5
    • 30
  • v

    visch

    04/10/2025, 8:21 PM
    Parent and Child Stream contexts are shared between any Child Streams that have a shared Parent. Noticed an SDK thing today that I don't think used to be the case but this is a weird one. Would have to put together a tap to show this 🧵
    ✅ 1
    e
    • 2
    • 9
  • s

    Stéphane Burwash

    04/15/2025, 2:06 PM
    Hello everyone 👋 happy tuesday! I was wondering if you could give me some guidance on testing taps. I would love to use the integrated
    test
    functions, but I'd like to understand a bit more how they work (source https://sdk.meltano.com/en/latest/testing.html#singer_sdk.testing.get_tap_test_class) What do the tests actually DO? My goal would be that it only tests that the tap CAN run, and not that it tries to run to completion. Most of my taps are full_table, so running their tests would take WAY too long. Thanks 😄
    ✅ 1
    e
    • 2
    • 2
  • h

    hawkar_mahmod

    05/08/2025, 9:16 AM
    Hey everyone 👋 I’m working on a Customer.io tap using the Meltano Singer SDK and running into some odd parent-child behavior. Here’s the gist: - Parent stream (
    SegmentsStream
    ) correctly fetches all segments and my override of
    get_child_context(record)
    returns
    {"segment_id": record["id"]}
    for the one segment I’m targeting. - Child stream (
    SegmentMembersStream
    ) has
    schema
    including
    "segment_id"
    , no
    replication_key
    , and overrides
    parse_response(response)
    to yield only the member fields:
    Copy code
    python
        def parse_response(self, response):
            for identifier in response.json().get("identifiers", []):
                yield {
                    "member_id": identifier["id"],
                    "cio_id":    identifier.get("cio_id"),
                    "email":     identifier.get("email"),
                }
    - According to the docs, the SDK should automatically merge in the
    segment_id
    from context after
    parse_response
    (and before shipping the record out), as long as it’s in the schema. But in practice I only see
    segment_id
    in the separate
    context
    argument — it never appears in the actual record unless I manually inject it in `post_process`:
    Copy code
    python
        def post_process(self, row, context):
            row["segment_id"] = context["segment_id"]
            return row
    Has anyone else seen this? Should the SDK be automatically adding parent-context fields into the record dict before emit, or is manual injection (in
    post_process
    ) the expected approach here? Any pointers or workaround suggestions are much appreciated! 🙏
    e
    v
    • 3
    • 4
  • s

    Siddu Hussain

    05/10/2025, 1:59 AM
    Hi @Edgar Ramírez (Arch.dev), do we have a plan to suppress this message when we have additionalProperties set to true? This method was designed to validate date types, but the error message about the missing key in the schema is being spammed for cases where additionalProperties is set to true. The fix seems quite straightforward: adding a condition to check if "additionalProperties" is true, then suppressing the warning "No schema for record field." Please let me know if this message serves a different purpose that might conflict with skipping it by checking "additionalProperties." https://github.com/meltano/sdk/blob/fdeb393416f0d1935e40c28b91c800c9d1b40822/singer_sdk/sinks/core.py#L588 fix : https://github.com/SidduHussain/sdk/blob/f99ecab90770932829f138aece164ad7d4196115/singer_sdk/sinks/core.py#L587