Yordan Ivanov
11/07/2024, 8:11 AMNir Diwakar (Nir)
11/19/2024, 11:05 AMrequests.exceptions.ConnectionError: HTTPSConnectionPool(host='<http://login.microsoftonline.com|login.microsoftonline.com>', port=443): Max retries exceeded with url: /b5953ddd-dd80-4110-904b-e503716f0caf/oauth2/v2.0/token (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7b99afdc5ed0>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution'))
To handle this I am keeping the prepare_request
method in try except block and raising RetriableAPIError
, but the retries aren't happening. Should I handle this in a different way?
from the traceback the following exceptions happen:
• requests.exceptions.ConnectionError
• urllib3.exceptions.MaxRetryError
• socket.gaierrorJens Christian Hillerup
11/27/2024, 2:23 PMGET /documents
that gives me a list of documents, and then another one like GET /document/<doc_id>
that returns important information that's not part of the former. I've wrapped REST APIs before with the tap cookiecutter in the Meltano SDK, but how would this actually translate into a RESTStream
in the Singer tap? Is it even a supported use case to have an "N+1" stream which requires another roundtrip per record? (I'm aware it's going to be slow, luckily it's not that many rows). Thanks!Andy Carter
11/29/2024, 10:05 AMdiscover_streams
method (in my _custom_initialization
method), which works fine.
def discover_streams(self) -> list[streams.GoogleSearchConsoleStream]:
"""Return a list of discovered streams.
Returns:
A list of discovered streams.
"""
self._custom_initialization()
return [
streams.PerformanceReportPage(self, service=self.service),
streams.PerformanceReportDate(self, service=self.service),
streams.PerformanceReportCountry(self, service=self.service),
streams.PerformanceReportQuery(self, service=self.service),
streams.PerformanceReportDevice(self, service=self.service),
]
But when I am trying to develop some tests, this obviously causes an issue, as I don't have any test credentials.
Is there an alternative method I can move this _custom_initialization
in my Tap
class so it run after discover_streams
?
Any perhaps any suggestions for how to handle 'fake' credentials for developing some unit tests?Izaak
12/02/2024, 12:49 PMIzaak
12/03/2024, 3:01 PMsteven_wang
12/05/2024, 2:47 PMAndy Carter
12/12/2024, 3:37 PMrun_id
available to me, so I think Tap.discover_streams
is too early. But it's not correct to do that at the stream level (as part of Stream.get_records
) because it only needs to be carried out once per tap run. Tap.sync_all
would be another option if it weren't a @final
method.
Is there anywhere I can jump in, a setup
ish method in Tap? Or should I implement a singleton for all my streams to make use of in get_records
?Nir Diwakar (Nir)
12/19/2024, 8:18 AMclass EventsStream(EgnyteStream):
"""Define custom stream."""
name = "events"
path = "/pubapi/v2/audit/stream"
replication_key = "eventDate"
records_jsonpath = "$.events[*]"
is_sorted = True
page = 0
def get_url_params(
self,
context: Optional[dict],
next_page_token: Optional[Any],
) -> dict[str, Any]:
params: dict = {}
if next_page_token:
params["nextCursor"] = next_page_token
else:
params["startDate"] = self.start_time
return params
def parse_response(self, response: requests.Response | None):
if not response:
raise RetriableAPIError("No response from API")
try:
data = response.json()
except AttributeError as e:
<http://logging.info|logging.info>(f"{e} with response {response} and {response.text}")
return
events = data.get("events", [])
for event in events:
if not event:
continue
for key in ["loginDate", "logoutDate"]:
if event.get(key) == 0:
event.pop(key, None)
for key in ["loginDate", "logoutDate", "date"]:
if key in event:
dt = datetime(1970, 1, 1, tzinfo=timezone.utc) + \
timedelta(milliseconds=event[key])
event["eventDate"] = dt.isoformat().replace("+00:00", "Z")
del event[key]
break
if events:
events.sort(key=lambda x: x.get("eventDate", ""))
self.page = self.page + 1
<http://logging.info|logging.info>(f'Page: {self.page} {events}')
yield from events or ([{}] if data.get("moreEvents") else [])
def post_process(
self,
row: dict,
context: Context | None = None,
) -> dict | None:
if not row:
return None
if 'id' not in row or row['id'] is None:
row['id'] = str(uuid.uuid4())
return super().post_process(row, context)
def get_new_paginator(self) -> EgnyteEventsPaginator:
return EgnyteEventsPaginator()
Error occurs here:
'e5deb04e-75ba-4894-8709-001c5f4295d5', 'auditSource': 'FILE_AUDIT', 'eventDate': '2024-12-19T07:14:18.591000Z'}]
2024-12-19 07:44:24,675 | INFO | target-elasticsearch.events | Starting batch {'batch_id': '720b0104-adee-4855-b3dc-420fe0033a2a', 'batch_start_time': datetime.datetime(2024, 12, 19, 7, 44, 24, 675328, tzinfo=datetime.timezone.utc)} for dev_events_raw_egnyte_000
2024-12-19 07:44:27,219 | INFO | root | Page: 2 [{'sourcePath': '/Shared/Projects/2022/22-0055 Vicksburg National Cemetery Burial Excavation and Stablization/Lab/FORDISC Results/Burial 19_Fordisc Results_updated.docx', 'targetPath': 'N/A', 'user': 'Brittany McClain ( <mailto:bmcclain@owest.com|bmcclain@owest.com> )', 'userId': '1121', 'action': 'Read', 'access': 'Desktop App', 'ipAddress': '174.26.41.229', 'actionInfo': '', 'auditSource': 'FILE_AUDIT', 'eventDate': '2024-12-19T07:09:36Z'},
Jun Pei Liang
01/18/2025, 12:13 AMReuben (Matatika)
01/18/2025, 3:46 AMSQLStream
to create a "well-known" stream that defines a replication_key
that is respected (as in standard `RestStream`/`Stream` implementations)?
client.py
class TestStream(SQLStream):
connector_class = TestConnector
class AStream(TestStream):
name = "a"
replication_key = "updated_at"
class BStream(TestStream):
name = "b"
replication_key = "updated_at"
tap.py
KNOWN_STREAMS = {
"a": stream.AStream,
"b": streams.BStream,
}
...
def discover_streams(self):
for stream in super().discover_streams():
if stream_class := KNOWN_STREAMS.get(stream.name):
stream = stream_class(
tap=self,
catalog_entry=stream.catalog_entry,
connector=self.tap_connector,
)
yield stream
I've played around with this and a few other variations, but can't seem to get it to work - the sync runs as if no replication key was defined. Is there another (possibly simpler) way to do this?Peter Clemenko
01/19/2025, 2:22 PMPeter Clemenko
01/19/2025, 2:23 PMPeter Clemenko
01/19/2025, 2:23 PMPeter Clemenko
01/19/2025, 2:23 PMPeter Clemenko
01/19/2025, 2:23 PMIan OLeary
02/04/2025, 6:52 PM2025-02-04T18:44:21.577497Z [info ] 2025-02-04 13:44:21,576 | INFO | tap-litmos.lt_UserDetails | Pagination stopped after 0 pages because no records were found in the last response
Where do I need to alter this to continue paginating even if no records were returned for a particular week? I looked into the BasiAPIPaginator and I didn't find where this message is printed.
Here's my current paginator:
class LitmosPaginator(BaseAPIPaginator):
def __init__(self, *args, **kwargs):
super().__init__(None, *args, **kwargs)
def has_more(self, response):
return self.get_next(response) < date.today()
def get_next(self, response):
params = dict(parse_qsl(urlparse(response.request.url).query))
return datetime.strptime(params["to"], OUTPUT_DATE_FORMAT).date() + timedelta(seconds=1)
I'm paginating via a 1 week date range and even if there were no records I still want to move on to the next range.
edit: would it be in "advance"?
def advance(self, response: requests.Response) -> None:
...
...
# Stop if new value None, empty string, 0, etc.
if not new_value:
self._finished = True
else:
self._value = new_valueAndy Carter
02/24/2025, 2:52 PMget_records
in the normal way.
Instead of checking and rechecking for each new file in storage, I've discovered I can check for pipeline logs to see as each table / stream file is complete, then just read the file when I know it's saved to storage. However, I don't want to replace my 'file check' code with 'pipeline log check' code in each stream, as the rest call takes a while.
Is there a process I can run asynchronously at the tap
level every 10 seconds or so, and in my stream.get_records()
check the tap's cached version of the logs from ADF, and emit records if appropriate?
Ideally I don't want to wait for the whole pipeline to finish before I start emitting records - some data is ready in seconds but others take minutes.Tanner Wilcox
02/27/2025, 11:38 PMrequest_records())
on RESTStream
say: "If pagination is detected, pages will be recursed automatically." but I'm not seeing how it's detecting pagination in this caseJun Pei Liang
03/07/2025, 12:14 AM- name: tap-oracle
variant: s7clarke10
pip_url: git+<https://github.com/s7clarke10/pipelinewise-tap-oracle.git>
config:
default_replication_method: LOG_BASED
filter_schemas: IFSAPP
filter_tables:
- IFSAPP-ABC_CLASS_TAB
host: xxxx
port: 1521
service_name: xxxx
user: ifsapp
metadata:
IFSAPP-ABC_CLASS_TAB:
replication-method: INCREMENTAL
replication-key: ROWVERSION
hawkar_mahmod
03/23/2025, 3:06 PMtap-growthbook
using the Meltano SDK. When testing without meltano it runs fine, but then when I invoke with meltano run
via uv run
I get a discovery related error, and I don't know how to debug it. Here's the error:hawkar_mahmod
03/24/2025, 3:48 PMget_records
method) the child class produces the expected number of records but the parent class just stops producing any data in the destination. I am only overwriting on the parent get_records
not the child stream.Gordon Klundt
03/24/2025, 7:57 PMStéphane Burwash
03/25/2025, 5:51 PMpayload
column which contains WAY too much PII, and I was hoping to be able to sync only the datapoints I needed (only grab values from payload.xyz
if payload.xyz
exists)
Thanks!
cc @visch since you're my tap-postgres guruhawkar_mahmod
03/28/2025, 5:25 PMReuben (Matatika)
04/10/2025, 3:19 AMrequest_records
entirely). Thought about setting up a date range parent stream to pass dates through as context, which I think would solve the state update part of my problem, but it felt counter-intuitive to how parent-child streams should work and incremental replication in general (I would end up with a bunch of interim dates stored in state as context, as well as the final date as replication_key_value
)visch
04/10/2025, 8:21 PMStéphane Burwash
04/15/2025, 2:06 PMtest
functions, but I'd like to understand a bit more how they work (source https://sdk.meltano.com/en/latest/testing.html#singer_sdk.testing.get_tap_test_class)
What do the tests actually DO? My goal would be that it only tests that the tap CAN run, and not that it tries to run to completion. Most of my taps are full_table, so running their tests would take WAY too long.
Thanks 😄hawkar_mahmod
05/08/2025, 9:16 AMSegmentsStream
) correctly fetches all segments and my override of get_child_context(record)
returns {"segment_id": record["id"]}
for the one segment I’m targeting.
- Child stream (SegmentMembersStream
) has schema
including "segment_id"
, no replication_key
, and overrides parse_response(response)
to yield only the member fields:
python
def parse_response(self, response):
for identifier in response.json().get("identifiers", []):
yield {
"member_id": identifier["id"],
"cio_id": identifier.get("cio_id"),
"email": identifier.get("email"),
}
- According to the docs, the SDK should automatically merge in the segment_id
from context after parse_response
(and before shipping the record out), as long as it’s in the schema. But in practice I only see segment_id
in the separate context
argument — it never appears in the actual record unless I manually inject it in `post_process`:
python
def post_process(self, row, context):
row["segment_id"] = context["segment_id"]
return row
Has anyone else seen this? Should the SDK be automatically adding parent-context fields into the record dict before emit, or is manual injection (in post_process
) the expected approach here? Any pointers or workaround suggestions are much appreciated! 🙏Siddu Hussain
05/10/2025, 1:59 AM