https://linen.dev logo
a

Alessandro Duico

02/02/2022, 3:15 PM
Hello, I am trying to implement incremental streams or slides to improve data recovery performance. since pulling everything in a single block takes a lot of resources and the aws instance crash I followed other sources and set state_checkpoint_interval but it keeps bringing back all the logs in one go, it should be 100 by 100 I think so
Copy code
class ServicesnowApi(HttpStream):
    url_base = "<https://xxx.service-now.com/api/now/v1/>"

    # Set this as a noop.
    primary_key = None
    # Save the state every 100 records
    state_checkpoint_interval = 100
    page_size = 100
    cursor_field = "sys_updated_on"

    def __init__(self, limit: str, sys_created_from: str, **kwargs):
        super().__init__(**kwargs)
        # Here's where we set the variable from our input to pass it down to the source.
        self.limit = limit
        self.sys_created_from = sys_created_from


    def path(self, **kwargs) -> str:
        # This defines the path to the endpoint that we want to hit.
        limit = self.limit
        sys_created_from = self.sys_created_from
        return f"table/incident?sysparm_offset=0&sysparm_limit={limit}&sysparm_query=sys_created_on>={sys_created_from} 00:00^active=ISNOTEMPTY"


    def request_params(
            self,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, Any] = None,
            next_page_token: Mapping[str, Any] = None,
    ) -> MutableMapping[str, Any]:
        limit = self.limit
        sys_created_from = self.sys_created_from
        return {"limit": limit, "sys_created_from":sys_created_from}


    def parse_response(
            self,
            response: requests.Response,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, Any] = None,
            next_page_token: Mapping[str, Any] = None,
    ) -> Iterable[Mapping]:
        result = response.json()['result']
        return result


    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        return None
d

Daniel Eduardo Portugal Revilla

02/02/2022, 4:00 PM
I am following this code https://github.com/airbytehq/airbyte/blob/b88fc446529b06b35c88793aabcb92b6806a04d1[…]ntegrations/connectors/source-amplitude/source_amplitude/api.py I see than Events( ) use that
Copy code
class Events(IncrementalAmplitudeStream):
    cursor_field = "event_time"
    date_template = "%Y%m%dT%H"
    primary_key = "uuid"
    state_checkpoint_interval = 1000
    time_interval = {"days": 3}
n

Nathan Gille

02/03/2022, 4:43 AM
Hey that numbers 1000, 2000 are just logs are not really related to this variable
Copy code
if (recordsRead % 1000 == 0) {
              <http://LOGGER.info|LOGGER.info>("Records read: {}", recordsRead);
            }
And I think the get_updated_state is called here for every 100 records
f

Famezilla Channel

02/03/2022, 6:01 AM
yes, because I put:
Copy code
state_checkpoint_interval = 100
page_size = 100
but nothing
Is it working ?
nop,
state_checkpoint_interval = 100
the last job has 10000 records and on S3 there is only one .json with 64MB. I imagine the expected output is 10000/100 files.
Can you share the complete logs ?
k

Khristina Rustanovich

02/03/2022, 3:36 PM
what more I need to implement to has incremental streams or slides?
m

Maxime edfeed

02/04/2022, 4:45 AM
Hey I can see that there are 10000 records created. Also the state checkpoint is more the source said abt when to save the state and not on the S3 side. Can you confirm if S3 has all records?
Hello @Harshith (Airbyte) the
state_checkpoint_interval = 100
on S3 there is only one file with 10000 records.
in addition, i am setting
cursor_field = "sys_updated_on"
but i can not see it here
Since you have defined the cursor field inside source we should it as source defined. Also I think the files are not created based on the checkpoint rather they are created based on every sync We don't pass this source config to destination
I am looking for two solutions. 1. I have 4000000 records and I need all the data on S3. Using the basic connector, the EC2 instance crashe! (t3.large). and I wanna pull data in chucks, pagination, in multiples small files fora better performance. 2. After that, i wanna pull only the new records from the last event pulling. I think need a watermark to know who was the last.
I asked this in the other slack channels a few days ago and they recommended me to use incremental stream
the data on my bucket at the moment, from the last airbyte execution
Yeah incremental looks good. I would suggest kindly look into sources like google-ads and shopify to understand the way incremental sync works
there should be a basic example like speedrun