Nir Diwakar (Nir)
12/19/2024, 8:18 AMclass EventsStream(EgnyteStream):
"""Define custom stream."""
name = "events"
path = "/pubapi/v2/audit/stream"
replication_key = "eventDate"
records_jsonpath = "$.events[*]"
is_sorted = True
page = 0
def get_url_params(
self,
context: Optional[dict],
next_page_token: Optional[Any],
) -> dict[str, Any]:
params: dict = {}
if next_page_token:
params["nextCursor"] = next_page_token
else:
params["startDate"] = self.start_time
return params
def parse_response(self, response: requests.Response | None):
if not response:
raise RetriableAPIError("No response from API")
try:
data = response.json()
except AttributeError as e:
<http://logging.info|logging.info>(f"{e} with response {response} and {response.text}")
return
events = data.get("events", [])
for event in events:
if not event:
continue
for key in ["loginDate", "logoutDate"]:
if event.get(key) == 0:
event.pop(key, None)
for key in ["loginDate", "logoutDate", "date"]:
if key in event:
dt = datetime(1970, 1, 1, tzinfo=timezone.utc) + \
timedelta(milliseconds=event[key])
event["eventDate"] = dt.isoformat().replace("+00:00", "Z")
del event[key]
break
if events:
events.sort(key=lambda x: x.get("eventDate", ""))
self.page = self.page + 1
<http://logging.info|logging.info>(f'Page: {self.page} {events}')
yield from events or ([{}] if data.get("moreEvents") else [])
def post_process(
self,
row: dict,
context: Context | None = None,
) -> dict | None:
if not row:
return None
if 'id' not in row or row['id'] is None:
row['id'] = str(uuid.uuid4())
return super().post_process(row, context)
def get_new_paginator(self) -> EgnyteEventsPaginator:
return EgnyteEventsPaginator()
Error occurs here:
'e5deb04e-75ba-4894-8709-001c5f4295d5', 'auditSource': 'FILE_AUDIT', 'eventDate': '2024-12-19T07:14:18.591000Z'}]
2024-12-19 07:44:24,675 | INFO | target-elasticsearch.events | Starting batch {'batch_id': '720b0104-adee-4855-b3dc-420fe0033a2a', 'batch_start_time': datetime.datetime(2024, 12, 19, 7, 44, 24, 675328, tzinfo=datetime.timezone.utc)} for dev_events_raw_egnyte_000
2024-12-19 07:44:27,219 | INFO | root | Page: 2 [{'sourcePath': '/Shared/Projects/2022/22-0055 Vicksburg National Cemetery Burial Excavation and Stablization/Lab/FORDISC Results/Burial 19_Fordisc Results_updated.docx', 'targetPath': 'N/A', 'user': 'Brittany McClain ( <mailto:bmcclain@owest.com|bmcclain@owest.com> )', 'userId': '1121', 'action': 'Read', 'access': 'Desktop App', 'ipAddress': '174.26.41.229', 'actionInfo': '', 'auditSource': 'FILE_AUDIT', 'eventDate': '2024-12-19T07:09:36Z'},
Jun Pei Liang
01/18/2025, 12:13 AMReuben (Matatika)
01/18/2025, 3:46 AMSQLStream
to create a "well-known" stream that defines a replication_key
that is respected (as in standard `RestStream`/`Stream` implementations)?
client.py
class TestStream(SQLStream):
connector_class = TestConnector
class AStream(TestStream):
name = "a"
replication_key = "updated_at"
class BStream(TestStream):
name = "b"
replication_key = "updated_at"
tap.py
KNOWN_STREAMS = {
"a": stream.AStream,
"b": streams.BStream,
}
...
def discover_streams(self):
for stream in super().discover_streams():
if stream_class := KNOWN_STREAMS.get(stream.name):
stream = stream_class(
tap=self,
catalog_entry=stream.catalog_entry,
connector=self.tap_connector,
)
yield stream
I've played around with this and a few other variations, but can't seem to get it to work - the sync runs as if no replication key was defined. Is there another (possibly simpler) way to do this?Peter Clemenko
01/19/2025, 2:22 PMPeter Clemenko
01/19/2025, 2:23 PMPeter Clemenko
01/19/2025, 2:23 PMPeter Clemenko
01/19/2025, 2:23 PMPeter Clemenko
01/19/2025, 2:23 PMIan OLeary
02/04/2025, 6:52 PM2025-02-04T18:44:21.577497Z [info ] 2025-02-04 13:44:21,576 | INFO | tap-litmos.lt_UserDetails | Pagination stopped after 0 pages because no records were found in the last response
Where do I need to alter this to continue paginating even if no records were returned for a particular week? I looked into the BasiAPIPaginator and I didn't find where this message is printed.
Here's my current paginator:
class LitmosPaginator(BaseAPIPaginator):
def __init__(self, *args, **kwargs):
super().__init__(None, *args, **kwargs)
def has_more(self, response):
return self.get_next(response) < date.today()
def get_next(self, response):
params = dict(parse_qsl(urlparse(response.request.url).query))
return datetime.strptime(params["to"], OUTPUT_DATE_FORMAT).date() + timedelta(seconds=1)
I'm paginating via a 1 week date range and even if there were no records I still want to move on to the next range.
edit: would it be in "advance"?
def advance(self, response: requests.Response) -> None:
...
...
# Stop if new value None, empty string, 0, etc.
if not new_value:
self._finished = True
else:
self._value = new_valueAndy Carter
02/24/2025, 2:52 PMget_records
in the normal way.
Instead of checking and rechecking for each new file in storage, I've discovered I can check for pipeline logs to see as each table / stream file is complete, then just read the file when I know it's saved to storage. However, I don't want to replace my 'file check' code with 'pipeline log check' code in each stream, as the rest call takes a while.
Is there a process I can run asynchronously at the tap
level every 10 seconds or so, and in my stream.get_records()
check the tap's cached version of the logs from ADF, and emit records if appropriate?
Ideally I don't want to wait for the whole pipeline to finish before I start emitting records - some data is ready in seconds but others take minutes.Tanner Wilcox
02/27/2025, 11:38 PMrequest_records())
on RESTStream
say: "If pagination is detected, pages will be recursed automatically." but I'm not seeing how it's detecting pagination in this caseJun Pei Liang
03/07/2025, 12:14 AM- name: tap-oracle
variant: s7clarke10
pip_url: git+<https://github.com/s7clarke10/pipelinewise-tap-oracle.git>
config:
default_replication_method: LOG_BASED
filter_schemas: IFSAPP
filter_tables:
- IFSAPP-ABC_CLASS_TAB
host: xxxx
port: 1521
service_name: xxxx
user: ifsapp
metadata:
IFSAPP-ABC_CLASS_TAB:
replication-method: INCREMENTAL
replication-key: ROWVERSION
hawkar_mahmod
03/23/2025, 3:06 PMtap-growthbook
using the Meltano SDK. When testing without meltano it runs fine, but then when I invoke with meltano run
via uv run
I get a discovery related error, and I don't know how to debug it. Here's the error:hawkar_mahmod
03/24/2025, 3:48 PMget_records
method) the child class produces the expected number of records but the parent class just stops producing any data in the destination. I am only overwriting on the parent get_records
not the child stream.Gordon Klundt
03/24/2025, 7:57 PMStĂŠphane Burwash
03/25/2025, 5:51 PMpayload
column which contains WAY too much PII, and I was hoping to be able to sync only the datapoints I needed (only grab values from payload.xyz
if payload.xyz
exists)
Thanks!
cc @visch since you're my tap-postgres guruhawkar_mahmod
03/28/2025, 5:25 PMReuben (Matatika)
04/10/2025, 3:19 AMrequest_records
entirely). Thought about setting up a date range parent stream to pass dates through as context, which I think would solve the state update part of my problem, but it felt counter-intuitive to how parent-child streams should work and incremental replication in general (I would end up with a bunch of interim dates stored in state as context, as well as the final date as replication_key_value
)visch
04/10/2025, 8:21 PMStĂŠphane Burwash
04/15/2025, 2:06 PMtest
functions, but I'd like to understand a bit more how they work (source https://sdk.meltano.com/en/latest/testing.html#singer_sdk.testing.get_tap_test_class)
What do the tests actually DO? My goal would be that it only tests that the tap CAN run, and not that it tries to run to completion. Most of my taps are full_table, so running their tests would take WAY too long.
Thanks đhawkar_mahmod
05/08/2025, 9:16 AMSegmentsStream
) correctly fetches all segments and my override of get_child_context(record)
returns {"segment_id": record["id"]}
for the one segment Iâm targeting.
- Child stream (SegmentMembersStream
) has schema
including "segment_id"
, no replication_key
, and overrides parse_response(response)
to yield only the member fields:
python
def parse_response(self, response):
for identifier in response.json().get("identifiers", []):
yield {
"member_id": identifier["id"],
"cio_id": identifier.get("cio_id"),
"email": identifier.get("email"),
}
- According to the docs, the SDK should automatically merge in the segment_id
from context after parse_response
(and before shipping the record out), as long as itâs in the schema. But in practice I only see segment_id
in the separate context
argument â it never appears in the actual record unless I manually inject it in `post_process`:
python
def post_process(self, row, context):
row["segment_id"] = context["segment_id"]
return row
Has anyone else seen this? Should the SDK be automatically adding parent-context fields into the record dict before emit, or is manual injection (in post_process
) the expected approach here? Any pointers or workaround suggestions are much appreciated! đSiddu Hussain
05/10/2025, 1:59 AMNico Deunk
05/23/2025, 7:05 AMRafaĹ
05/27/2025, 8:35 AMmark_estey
06/02/2025, 9:06 PMAndy Carter
06/04/2025, 10:34 AMBuildings
stream and a Tenants
stream, and a ServiceRequests
stream that requires a building_id
AND`tenant_id` in the body of a post request (there is no GET/ list ServiceRequests endpoint).
So I would iterate Buildings
, then Tenants
then iterate ServiceRequests
with a full outer join of the two parents. Is that possible?
https://www.linen.dev/s/meltano/t/16381950/hi-all-i-have-built-a-custom-tap-to-extract-data-from-an-api looks like a possible approach, basically I make my Tenants
stream an artificial child of Buildings
and then make service requests
have Tenants
as parent.RafaĹ
06/08/2025, 8:28 AMTanner Wilcox
06/10/2025, 9:04 PMreplication_method = "FULL_TABLE"
to my stream class but that doesn't seem rightMindaugas NiĹžauskas
06/30/2025, 5:06 AMFlorian Bergmann
07/07/2025, 8:57 AMtap-oracle
, based on s7clarke10
/ pipelinewise
to cover some special cases of our source db. During tests I noticed that the replication method log_based
uses continuous mine, which is deprecated since Oracle 12c and desupported since Oracle 19c, so six years ago. - Anyone knows an alternative I could use for logmining functionality? - trigger-based instead of log-based is currently no option for us.