laila ribke
12/22/2022, 10:03 PMlaila ribke
12/27/2022, 6:06 AMJamie Turner
01/01/2023, 12:44 AMTill Blesik
01/03/2023, 8:44 PMspec.yaml
file. If I understand the tutorial correctly (Step 3: Connecting to the API | Airbyte Documentation), the spec is part of the source_connector-name/connector-name.yaml
file. Am I supposed to manually copy the spec section from that file into its own file or is there a step I am missing?Akash Ghadge
01/06/2023, 12:01 PMBen Greene
01/10/2023, 2:24 PMBen Greene
01/10/2023, 7:45 PMTill Blesik
01/10/2023, 9:17 PM/modules/{module_id}/data
stream offers you a selection of modules provided by /modules
. Is that even supported by the low-code CDK or would I need to switch to the Python CDK?Sushant
01/12/2023, 3:57 PMMatheus Pinheiro
01/12/2023, 6:54 PMAntony Ede
01/16/2023, 3:35 AM[
{
"status": 1,
"uuid": "jwy3kwppp4yuqg86",
...
"Q1r1": "8",
"Q2a": "I thought it was great.",
"Q2b": "I thought it was not so great."
}
]
Note that the status
and uuid
keys are returned in every response while the Q*
keys are dynamic and vary depending on the survey. Ideally I think I’d restructure the response to something like this. It would then be easy to represent in the schema and should normalise how I want.
[
{
"status": 1,
"uuid": "jwy3kwppp4yuqg86",
...
"answers": [
{ "question": "Q1r1", "answer": "8" },
{ "question": "Q2a", "answer": "I thought it was great." },
{ "question": "Q2b", "answer": "I thought it was not so great." }
]
}
]
How should the be handled?
Can/Should I restructure the JSON Schema as above?
Do I need to move to the Python CDK instead to do this?
Can I get away with just ingesting the full raw responses and do the normalisation?Iván Alberquilla
01/17/2023, 12:23 PMresponse = <http://requests.post|requests.post>(endpoint, json={'query': query})
is that possible using the HTTP source? or only get functions are allowed? How can I add the json parameter to the body?Iván Alberquilla
01/17/2023, 1:28 PMhttp_method
?Iván Alberquilla
01/17/2023, 1:58 PMRachel RIZK
01/20/2023, 11:46 AMVITOR OLIVEIRA DOS SANTOS
01/20/2023, 4:58 PM{
"historical_ocupation":{
"city1_historical_occupation": {
"date": "2023-01-20T00:00:00Z"
},
"city2_historical_occupation": {
"date": "2023-01-18T00:00:00Z"
},
"city3_historical_occupation": {
"date": "2023-01-20T00:00:00Z"
}
},
"historical_daily_public":{
"city1_historical_daily_public": {
"date": "2023-01-20T00:00:00Z"
},
"city2_historical_daily_public": {
"date": "2023-01-20T00:00:00Z"
},
"city3_historical_daily_public": {
"date": "2023-01-20T00:00:00Z"
}
}
}
class KeyAccessStream(HttpStream):
def __init__(self, url, sites: List[str], **kwargs):
super().__init__(**kwargs)
self.url = url
self.sites = sites
self.localizacao = self.sites[0]
@property
def url_base(self) -> str:
return f'{self.url}/dashboards/'
def next_page_token(self,
response: requests.Response) -> Optional[Mapping[str,
Any]]:
"""
:param response: the most recent response from the API
:return If there is another page in the result, a mapping (e.g: dict)
containing information needed to query the next page
in the response.
If there are no more pages in the result, return None.
"""
return None
def parse_response(self,
response: requests.Response,
**kwargs) -> Iterable[Mapping]:
"""
:return an iterable containing each record in the response
"""
if isinstance(response.json(), list):
for item in response.json():
if isinstance(item, dict):
item.update({'site':self.localizacao})
yield item
if isinstance(response.json(), dict):
for key, item in dict(response.json()).items():
if isinstance(item, list):
for value in item:
if isinstance(value, dict):
value.update({'site':self.localizacao})
yield value
return
def stream_slices(self, *, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]:
return self.sites
class IncrementalKeyAccessStream(KeyAccessStream, IncrementalMixin):
# Fill in to checkpoint stream reads after N records.
# This prevents re-reading of data if the stream fails for any reason.
state_checkpoint_interval = None
_cursor_value = None
def __init__(self, url, sites, start_date: datetime, **kwargs):
super().__init__(url, sites, **kwargs)
self.start_date = start_date
self._cursor_value = None
@property
def cursor_field(self) -> str:
"""
:return str: The name of the cursor field.
"""
return "date"
@property
def state(self) -> Mapping[str, Any]:
"""
:return: A dictionary representing the current state of the stream.
"""
state = {}
if self._cursor_value:
state[casing.camel_to_snake(self.localizacao+self.__class__.__name__)] = {self.cursor_field: self._cursor_value}
else:
state[casing.camel_to_snake(self.localizacao+self.__class__.__name__)] = { self.cursor_field: self.start_date.strftime("%Y-%m-%dT%H:%M:%SZ")}
return state
@state.setter
def state(self, value: Mapping[str, Any]):
"""
:param value: A dictionary representing the new state of the stream.
"""
self._cursor_value = value[casing.camel_to_snake(self.__class__.__name__)][casing.camel_to_snake(self.localizacao+self.__class__.__name__)][self.cursor_field]
def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
if self._cursor_value:
print(self._cursor_value)
for record in super().read_records(*args, **kwargs):
latest_record_date = datetime.strptime(record[self.cursor_field],
"%Y-%m-%dT%H:%M:%SZ")
if self._cursor_value:
if self._cursor_value < str(latest_record_date):
self._cursor_value = str(latest_record_date)
yield record
else:
self._cursor_value = str(latest_record_date)
yield record
class HistoricalOccupation(IncrementalKeyAccessStream):
primary_key = None
@property
def http_method(self) -> str:
return "POST"
def request_body_json(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping[str, Any]]:
"""
Override when creating POST/PUT/PATCH requests
to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is.
If returns a dict that it will be converted to a urlencoded form.
E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the
'request_body_data' and 'request_body_json'
functions can be overridden.
"""
if stream_state:
print("Aqui")
start_at = datetime.strptime(
stream_state[casing.camel_to_snake(self.localizacao+self.__class__.__name__)][self.cursor_field],
"%Y-%m-%dT%H:%M:%SZ")
start_at = start_at.strftime("%Y-%m-%dT%H:%M:%SZ")
print("Recuperei o state:",start_at)
else:
start_at = self.start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
print(start_at, self.localizacao)
print(casing.camel_to_snake(self.localizacao+self.__class__.__name__))
return {"companies": None,
"category": None,
"scale": "HOURS",
"endAt": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
"startAt": start_at}
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
self.localizacao = stream_slice
return stream_slice + "/historical-occupation"
Elliot Trabac
01/23/2023, 10:45 AMquestions_stream:
$ref: "*ref(definitions.base_stream)"
$options:
name: "questions"
primary_key: "id"
path: "/question/{{ stream_slice.parent_id }}"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
extractor:
field_pointer: []
stream_slicer:
type: SubstreamSlicer
parent_stream_configs:
- stream: "*ref(definitions.reviews_stream)"
parent_key: "question_id"
stream_slice_field: "parent_id"
The problem is that in the parent stream, most the the reviews have the same question_id.
So in the question stream I’m getting many many duplicates and the extract takes a lot of time.
What would you recommend to fetch the unique question_id in this stream?Theodor Sjöstedt
01/23/2023, 7:09 PM/auth
endpoint
2. POST request with a csv file as payload with list of ids that should be exported
3. Request a status endpoint and wait for the result file to be ready
4. Download the file through another endpoint.
We wanna fetch data from this api at least nightly.
Is this workflow worth implementing a custom connector for this, or should I just hack something together in Airflow? :)Aman Satya
01/26/2023, 10:57 AMAman Satya
01/26/2023, 10:58 AMAman Satya
01/26/2023, 10:58 AMThomas Pedot
01/27/2023, 5:02 PMAkash Ghadge
01/30/2023, 3:57 PMpython main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
I am getting an data on command line but at the end of the response I am getting an error
"failure_type": "system_error"}}
I am not aware of the reason for this error, I have attached the screen shots for the response below, Please anyone help me understand the reason behind this error
Thank youRyan (Airbyte)
01/30/2023, 5:29 PMRyan (Airbyte)
01/31/2023, 8:06 PMspec.yaml
where it looks like the latest version uses a manifest.yaml
See below for my error
E FileNotFoundError: [Errno 2] No such file or directory: '/Users/rbernstein/airbyte/airbyte-integrations/connectors/source-shelter-luv/source_shelter_luv/spec.yaml'
Kacper Adler
02/01/2023, 11:10 AMKacper Adler
02/01/2023, 11:11 AMfeatures_stream:
$ref: "*ref(definitions.base_stream)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.selector_features)"
I want this to accept 403 response as INFO/WARNING - in aha.io it's not required to have features with products. It's possible to have only one but source forces bothBen Greene
02/01/2023, 9:41 PMJimmy McBroom
02/03/2023, 11:40 PM