https://kedro.org/ logo
Join Slack
Powered by
# questions
  • j

    Juan Luis

    02/23/2023, 3:45 AM
    a colleague not in this Slack brings this to my attention: Kedro seems to play not so nicely with in-tree venvs. the workflow would need to be something like this: 1. Install Kedro somehow to have the
    kedro
    CLI 2.
    kedro new
    creates a new directory 3.
    cd {newdir} && python -m venv .venv
    4. One needs to install Kedro inside the
    .venv
    again seems like this is not a problem if one uses out-of-tree environments, like conda does. Is there a way around for the other case? Something like: 1.
    mkdir {newdir} && cd {newdir} && python3 -m venv .venv
    2. Install Kedro in the new
    .venv
    3. (From
    {newdir}
    )
    kedro new --here
    👍 3
    d
    s
    +2
    • 5
    • 12
  • v

    Vici

    02/23/2023, 12:48 PM
    Hey, I'm trying to get some stuff done with
    kedro micropkg
    , but failing so far 😕. So I've built that custom Dataset class
    MyDataset
    . I'd love to share it as a micro-package with a colleague of mine. This should be possible, as the micro-packaging docs say :
    A micro-package can be any part of Python code in a Kedro project [...].
    But running
    kedro micropkg package src/my_project/extras/datasets/my_folder/my_dataset.py
    only yields me the following error message:
    Copy code
    kedro.framework.cli.utils.KedroCliError: The micro-package location you provided is not a valid Python module path
    Run with --verbose to see the full exception
    Error: The micro-package location you provided is not a valid Python module path
    I tried running with
    --verbose
    , but it's not a valid argument for any of kedro, micropkg or package. So that didn't help. Neither do I understand how
    my_dataset.py
    is not a valid Python module 😢. Any of you have an idea how one would go about resolving this issue? Thanks in advance!
    d
    d
    j
    • 4
    • 9
  • r

    Rafał Nowak

    02/24/2023, 8:20 AM
    Hi, I am trying to run the kedro pipeline with
    --params "section1.section2.name:value"
    where
    section1.section2
    is defined in
    parameters.yml
    so it seems that there is some tree
    section1.section2
    with some parameters. I would like to change only one of them. I think I know that kedro is not able to override only one paramter in the tree. I have to overrside the full root which is not user friendly in the CLI. I see that since kedro 0.18.5, one can use OmegaConf now. Does it change this limitatiom? If so, is it possible to use
    global_patterns
    like it was in TemplatedConfigLoader ?
    v
    • 2
    • 5
  • r

    Robertqs

    02/24/2023, 9:34 AM
    Hello everyone, I’m just wondering if kedro has any builtin capacity for scheduling a simple daily run pipeline? I know a proper way might be relying on a scheduler such as airflow. But just curious the minimal steps required, do we have any cron like kedro cli magic on this? Couldn’t find any info, so really appreciate it if you could point me to relavent documentation.
    m
    n
    • 3
    • 5
  • b

    Bailey

    02/24/2023, 10:26 AM
    Hi all, I've recently begun exploring Kedro as a way to better structure our ML projects. Our company is pretty involved with Databricks for our data processes so I'm quite excited to see the upcoming changes related to Kedro & Databricks. I just have a question around loading Delta Tables in Databricks as a data catalog into Kedro. I see there is a DeltaTableDataSet class - is it possible to just provide a table name to this class to load the data or must it be an absolute path? Also is it then exposed as a spark df to pass into pipelines? Thanks and I'm very keen to learn more about Kedro
    d
    • 2
    • 1
  • b

    Balachandran Ponnusamy

    02/25/2023, 12:58 AM
    Hi Team...we are trying to run pyspark job in dataproc cluster...following steps are followed(pls refer screenshot): 1. wheel file was generated for the project 2. pushed wheel file and conf, logs folders/files into the dataproc cluster 3. pip install wheel 4. run kedro . when running kedro, it throws error below ...Can you pls help what are we missing here :ERROR org.apache.spark.SparkContext: Error initializing SparkContext. org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/root/.sparkStaging/application_1677266242748_0002/pyspark.zip could only be written to 0 of the 1 minReplication nodes. There are 0 datanode(s) running and 0 node(s) are excluded in this operation.
    d
    • 2
    • 2
  • s

    Sebastian Cardona Lozano

    02/25/2023, 1:21 AM
    Hi all. I'm trying to use Kedro to develop a ML pipeline with Spark using a Dataproc cluster in GCP. I'd like to load a table from Big Query in a Spark dataset, how could I define that in the catalog? I know that I can use "plain" PySpark to read the table but I'd like to use the catalog. Thanks!
    d
    b
    m
    • 4
    • 5
  • z

    Zoran

    02/26/2023, 8:27 PM
    Hi all. I'm trying to create multiple pipelines which are the same only difference is parameteres from dataset. There has to be as many pipelines as there are parameters in dataset?
  • x

    Xinghong Fang

    02/27/2023, 3:14 AM
    [SOLVED] Hi all, I have a simple kedro pipeline which reads input file from S3 and update a postgres database. We want to use AWS Lambda (containerized) to run this since it is the simplest and cheapest way. However, we are hit with the
    _multiprocessing.SemLock is not implemented
    issue when launching the pipeline. A quick google search bring me to this issue https://stackoverflow.com/questions/34005930/multiprocessing-semlock-is-not-implemented-when-running-on-aws-lambda Looks like AWS Lambda's python runtime is missing
    /dev/shm
    , which seems to be needed by the
    KedroSession
    Has anyone successfully ran a kedro pipeline on AWS Lambda? Thanks in advance!
    n
    • 2
    • 8
  • h

    Hugo Evers

    02/27/2023, 10:17 AM
    Hi Guys, How can i change the default dataset from MemoryDataset to a kedro-mlflow dataset? or provide some rule to map such datasets without having to maintain two sources of datasets? Currently, i am using a modular pipeline to create namespaces for different experiments i want to run in a single session. For example, i am testing the accuracy of several prediction methods in two ways: 1. a random train-test split 2. a date-based train-test split, this is to check performance on the latest data and detect drift. Now i can very easily create multiple pipelines by remapping some inputs and outputs using the modular pipeline concept, however, i want to cache some of the training steps since these are very big (and costly) multi-modal models. I use kedro-mlflow to log the artefact and metrics to mlflow and s3, however this requires such datasets to be described in the catalog.yml. I used the templatedconfig and the Jinja2 syntax to create a list of datasets, however now i have to maintain these lists in two different places, which is begging for bugs. My prefered solution would be to have a single parameters file, where i specify all the parameters for which i want to run them in a grid (ParameterGrid). This could look like:
    Copy code
    parameters.yml
    
    ParameterGrid:
      name_of_parameter:
        version_1:
          - value1
          - value2
        version_2:
          - value1
    etc.
    and now i could run through these options with the namespace. However, now i need to have dataset entries in the catalog.yml which match these
    version_1
    and
    version_2
    names. Since i dont want these to be stored in memory and than destroyed. Instead i want to use the kedro_mlflow datasets. so for example for the parquet files i would use something like:
    Copy code
    X_test_{{ split_crit }}:
      type: kedro_mlflow.io.artifacts.MlflowArtifactDataSet
      data_set:
          type: pandas.ParquetDataSet
          filepath: <s3://sagemaker-vertex/data/05_model_input/X_test_{{> split_crit }}.parquet
    and for the metrics:
    Copy code
    my_model_metrics_{{ split_crit }}:
        type: kedro_mlflow.io.metrics.MlflowMetricDataSet
        key: accuracy
    and for the models
    Copy code
    multi_modal_model:
      type: kedro_mlflow.io.models.MlflowModelLoggerDataSet
      flavor: mlflow.pyfunc
      pyfunc_workflow: python_model
      save_args:
        conda_env:
            python: "3.9.10"
            dependencies:
                - "mlflow==1.27.0"
    However, in kedro these output datasets cannot be shared (even though in mlflow this would be fine)
    d
    • 2
    • 20
  • t

    Tomás Rojas

    02/27/2023, 7:25 PM
    Hi everyone, I am currently using Kedro to analyze some data from experiments (experimental physics) and I managed to make a
    CustomDataSet
    for this purpose. The problem is that I want to make a
    PartitionedDataSet
    from it but I get complications. Here is the class I made:
    Copy code
    class LedExperiment(AbstractDataSet):
        def __init__(self, filepath: str):
            breakpoint()
            self.path = filepath
            self.files = glob.glob(os.path.join(filepath, "*"))
            self.files.sort()
            self.gate_voltage = self.get_gate_voltage(self.path)
            self.info_path, self.voltages_path, self.data_path = self.files
    
        @staticmethod
        def get_gate_voltage(path: str) -> float:
            """
            This is a function that is able to get the gate voltage from the folder name
            that is the root of the data
            :param path: path of the data, ussualy but not restricted to self.path
            :return: the voltage from the Dirac Point used as gate voltage
            """
            # note: sometimes there is more than one measurement for one voltage from the DP, it should
            # be always separed by an underscore "_".
            breakpoint()
            folder_name = os.path.split(path)[-1]
            gate_voltage = float(folder_name)
            return gate_voltage
    
    
        @staticmethod
        def get_info(path: str, gate_voltage: float) -> pd.DataFrame:
            """
            This method takes a path to the info file and returns a pandas
            datatrame of one row and the info in each column
            :param path: path to the info file of the experiment
            :param gate_voltage: this is the gate voltage with respect to the Dirac Point
            :return: a pandas dataframe with the parsed information
            """
            with open(path, "r") as f:
                r = f.read()
    
            r = r.split("\n")[1:-2]
            r = [i.split(",") for i in r]
            r = [item for sublist in r for item in sublist]
            r = [i.replace(" ", "") for i in r]
            r = {i.split("=")[0]: i.split("=")[1] for i in r}
    
            r["Vmin"] = float(r["Vmin"][:-1])
            r["Vmax"] = float(r["Vmax"][:-1])
            r["Vstep"] = float(r["Vstep"][:-1])
            r["Cycles"] = int(r["Cycles"])
            r["waitingtime"] = float(r["waitingtime"][:-1])
            r["timeatlight"] = float(r["timeatlight"][:-1])
            r["timeatdark"] = float(r["timeatdark"][:-1])
            r["wavelength"] = float(r["wavelength"][:-2])
            r["gate_voltage"] = gate_voltage
            info = pd.DataFrame(r, index=["value"])
            return info
    
        @staticmethod
        def get_led_voltage_list(voltage_list_path: str) -> pd.DataFrame:
            """
            This funtion takes the path to the file containing the list of the voltages to the led driver
            and returns a pandas dataframe containing all the voltages in the order they appear in the file
            which is the same order as they were used.
            :param voltage_list_path: path to the file containing the voltage list.
            :return: a pandas dataframe with all the information.
            """
    
            with open(voltage_list_path, "r") as f:
                r = f.read()
            r = r.split("\n")[:-1][::2]
    
            voltages = [float(i) for i in r]
            voltages = pd.DataFrame(voltages, columns=["LED driver voltages"])
            return voltages
    
        @staticmethod
        def get_data(data_path: str) -> pd.DataFrame:
            """
            This function reads the data from the experiment
            :param data_path: path to the file containing the time series data
            :return: a pandas dataframe with the time series data of the currents
            """
            return pd.read_csv(data_path, sep="\t")
    
        def _load(self) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
            """
            This function loads the data using the information provided in the init
            :return: A tuple with the information, LED voltages and data DataFrames in
            that order.
            """
            breakpoint()
            info = self.get_info(self.info_path, self.gate_voltage)
            led_voltages = self.get_led_voltage_list(self.voltages_path)
            data = self.get_data(self.data_path)
    
            return info, led_voltages, data
    
        def _save(self, data) -> None:
            # TODO: finish saving method
            pass
    
        def _describe(self) -> Dict[str, Any]:
            """
            Returns a dict that describes the attributes of the dataset.
            :return: Returns a dict that describes the attributes of the dataset.
            """
            return dict(
                information_path=self.info_path,
                voltages_path=self.voltages_path,
                data_path=self.data_path,
                gate_voltage=self.gate_voltage  # note that this is w respect to the DP
            )
    The thing is that when I make a PartitionedDataSet from it the paths get all messed up, which is not ideal, it results in the class having errors. Can anyone help me with this? EDIT: I added 3 replies to the thread explaining further the issue
    n
    • 2
    • 41
  • z

    Zoran

    02/28/2023, 4:53 PM
    Hi all. Is there someone created custom dataset for MongoDB (if yas, can it share)?
    d
    • 2
    • 2
  • t

    tomohiko kato

    03/01/2023, 9:24 AM
    Hi team! I tried Experiment tracking in Kedro-Viz tutorial, and I can not open experiment tracking screen in
    Access run data and compare runs
    step. (Data fetching does not seem to be working and nothing is displayed.) error message(
    kedro_viz\api\graphql\serializers.py
    , line 46)
    Copy code
    run_command=run_blob.get("cli", {}).get("command_path"),
    AttributeError: 'str' object has no attribute 'get'
    The environment and version are follows.
    Copy code
    OS: windows
    IDE: pycharm
    venv: pyenv
    python: 3.9.10
    kedro: 0.18.5
    kedro-viz: 0.5.3
    The error log seems to indicate that the
    run_blob
    perse is not working. Actually I checked with degugger and
    run_blob.get("cli", {})
    was recognized as str, not dict. Is this a problem specific to my environment?
    error.log
    a
    r
    j
    • 4
    • 12
  • m

    Matheus Pinto

    03/01/2023, 12:23 PM
    Hi Team, we have custom class that creates a sklearn compatible pipeline and perform fit, predict methods, the thing is in this class the last estimator is a tensorflow model (with also custom objects, such a loss functions) and due to that, the object is not serializable. Tensoflow object and pickle kedro datasets don’t work. Do you have an advice on how to save this object?. We where thinking in a custom dataset, saving everything that is not tf model as pickle and then just saving the model as tf object dataset. is this the most elegant solution?, do you have any ideas on how to solve this ? Class:
    Copy code
    class ModelPipeline(BaseEstimator):
        """A pipeline to train and make predictions using a machine learning model.
    
        Args:
            params (dict): A dictionary containing the parameters required to build the pipeline.
    
        Attributes:
            pipeline (list): A list of transformers and an estimator built using the given parameters.
            data_prepocessing_pipe (list): A list of transformers in the pipeline
                used for data pre-processing.
            estimator: The estimator in the pipeline used for making predictions.
            target_names (list): A list of column names in the target variable.
            is_fitted (bool): A flag indicating if the estimator is fitted or not.
    
        """
    
        def __init__(self, params: dict):
            """Initializes the ModelPipeline object.
    
            The object is initialized with the given parameters to create a pipeline.
    
            Args:
                params (dict): A dictionary containing the parameters required to build the pipeline.
            """
            self.params = params
            self.target_params = list(
                params["model"]["data_preparation"]["target_builder"]["kwargs"].values()
            )[0]
            func_path = self.params["builder_function"]
            module_name, func_name = func_path.rsplit(".", 1)
            module = importlib.import_module(module_name)
            func = getattr(module, func_name)
            self.pipeline = func(self.params)
    
        def fit(self, X, y):
            """Fits the pipeline to the given data.
    
            Fits each transformer in the pre-processing pipeline
            to the data and then fits the estimator to the transformed data.
    
            Args:
                X (array-like or sparse matrix): Input data of shape (n_samples, n_features)
                y (array-like or sparse matrix): Target values of shape (n_samples,) or
                (n_samples, n_targets)
    
            Returns:
                self: Returns an instance of self.
            """
            X_ = self.pipeline[:-1].fit_transform(X, y)
            self.pipeline[-1:].fit(X_, y)
            self.is_fitted = True
            return self
    
        @check_is_fitted
        @ensure_data_quality
        def predict(self, X: tp.Union[Matrix, Vector]) -> tp.Union[Matrix, Vector]:
            """Predicts the target variable using the fitted pipeline.
    
            Transforms the input data using the pre-processing pipeline and then makes predictions
            using the fitted estimator.
    
            Args:
                X (array-like or sparse matrix): Input data of shape (n_samples, n_features)
    
            Returns:
                pandas.DataFrame: A dataframe containing the predicted values of
                    shape (n_samples, n_targets).
            """
            X_ = self.pipeline[:-1].transform(X)
            y_pred = self.pipeline[-1].predict(X_)
            return y_pred
    
        @check_is_fitted
        @ensure_data_quality
        def inference(
            self, X: tp.Union[Matrix, Vector], y: tp.Union[Matrix, Vector] = None
        ) -> pd.DataFrame:
            """Custom inference using the fitted pipeline.
    
            Transforms the input data using the pre-processing pipeline and then makes inference
            using the fitted estimator and for the specific problem., classification and regression
    
            Args:
                X (array-like or sparse matrix): Input data of shape (n_samples, n_features)
    
            Returns:
                pandas.DataFrame: A dataframe containing the predicted values of
                    shape (n_samples, n_targets).
            """
            X_ = self.pipeline[:-1].transform(X)
            inference = self.pipeline[-1].inference(X_, y)
            return inference
    w
    d
    • 3
    • 7
  • n

    nawaz ahmad

    03/02/2023, 6:26 AM
    Hi team, are there any good tutorials on using kedro with flask?
    t
    • 2
    • 1
  • s

    Sergei Benkovich

    03/02/2023, 12:48 PM
    is there a way to use same node with different sets of input/outputs? my_node(a) -> b my_node(c) -> d i think i saw something related in the documentation but i think it looked too complicated , any easy way to do it?
    r
    d
    • 3
    • 23
  • n

    Nicolas Rosso

    03/02/2023, 1:27 PM
    Hello friends, I am using kedro 0.18.4 with python 3.7 When running this pipeline with the 'kedro run' command, I get this error message: 'TypeError: pipeline() got an unexpected keyword argument 'tags_hierarchy' anyone have any idea how to fix this?
    Copy code
    from kedro.pipeline import Pipeline, node, pipeline
    from .nodes import medium_posts_extract_file, medium_posts_transform_file, medium_posts_upload_transformed_file_to_gcp, medium_posts_persist_file_in_gcp, delete_files
    from datetime import datetime
    
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    #Defino los nodos dentro del pipeline y el orden de ejecución. Cada nodo puede tener 1 o mas funciones (definidas en nodes.py)
    def create_pipeline(**kwargs) -> Pipeline:
        return pipeline(
            [
                node( 
                    func=medium_posts_extract_file,
                    inputs=None,
                    outputs="medium_posts_raw_file",
                    name="medium_posts_extract_file_node",
                    tags=["extract"]
                ),
                node(
                    func=medium_posts_transform_file,
                    inputs="medium_posts_raw_file",
                    outputs="medium_posts_transformed_file",
                    name="medium_posts_transform_file_node",
                    tags=["transform"]
                ),
                node(
                    func=medium_posts_upload_transformed_file_to_gcp,
                    inputs="medium_posts_transformed_file",
                    outputs=None,
                    name="medium_posts_upload_transformed_file_to_gcp_node",
                    tags=["upload"]
                ),
                node(
                    func=medium_posts_persist_file_in_gcp,
                    inputs="medium_posts_raw_file",
                    outputs=None,
                    name="medium_posts_persist_file_in_gcp_node",
                    tags=["persist"]
                ),
                node(
                    func=delete_files,
                    inputs="medium_posts_transformed_file",
                    outputs=None,
                    name="delete_files_node",
                    tags=["delete"]
                )
            ],
            tags_hierarchy={
                "extract": [],
                "transform": ["extract"],
                "upload": ["transform"],
                "persist": ["upload"],
                "delete": ["persist"]
            }
        )
    d
    • 2
    • 17
  • j

    Juan Diego

    03/02/2023, 2:39 PM
    Hi all! Just a quick one, is it possible to reference the CONF_SOURCE value in the templates?
    data_folder: ${CONF_SOURCE}/data
    This obviously doesn’t works, but you get the idea. Many thanks! ☺️
    d
    • 2
    • 9
  • r

    Ricardo Araújo

    03/02/2023, 6:26 PM
    Hi y'all! Say I have a very standard pipeline like this:
    get-data -> train-model -> evaluate-model
    . Now, the model can be any of sklearn's models, all with the same interface. What I'd like to do is, from a list of models specified in
    parameters
    , run many instances of this pipeline each with one model of the list (of course, I'd like pipelines to run in parallel). I can use modular pipelines to instantiate the pipeline many times, but I'm not sure how to use the model list in the parameters file. Any ideas?
    v
    • 2
    • 4
  • b

    Balachandran Ponnusamy

    03/02/2023, 8:01 PM
    Hi team...when we do kedro package, it was able to package all the standard folders, but we had custom folders like utils etc ...which it was not including it...Can you pls help how this can b done
    d
    • 2
    • 7
  • s

    Sebastian Pehle

    03/03/2023, 9:40 AM
    Someone ever tried to 'deploy' a kedro project into Snowpark (as stored procedure?)? With the restriction to the conda snowflake channel in an otherwise sandbox environment, getting kedro itself as well as all the dependencies inside the project itself seems quite a lot to wrap ones head around.
    d
    m
    • 3
    • 4
  • r

    Ricardo Araújo

    03/03/2023, 11:53 PM
    Is it possible to have access to the namespace that a node function is running in?
  • a

    azazel daiki

    03/04/2023, 9:10 AM
    Hi Guys, how do we pass the current date in globals.yml file. I want to pass the current date as a parameter but sometimes we can push any other required date in the parameters.
    m
    • 2
    • 1
  • d

    David

    03/05/2023, 4:44 PM
    [Kedro Pipeline, deployment with Prefect] Hey everyone, has anyone been able to successfully create a Prefect 2 deployment from a Kedro project? If so, could you share some insights or tips on how to go about it? I'm currently working on a project and would love to integrate Prefect into my Kedro pipeline, but I'm not sure where to start. Any help would be greatly appreciated! I am trying to convert the
    register_prefect_flow.py
    script given by Kedro documentation. Unfortunately, I don't seem to succeed because Prefect 1.0 API is different from Prefect 2.0. Thanks in advance !
    d
    • 2
    • 1
  • o

    Ofir

    03/05/2023, 10:46 PM
    Hi Everyone, does anyone know whether Kedro supports Prefect 2.0? The deployment configuration with Prefect in the official documentation seems to refer to Prefect 1.0.
    d
    z
    • 3
    • 20
  • z

    Zoran

    03/06/2023, 2:11 PM
    Hi everyone, I was wondering if it is possible to repeat the pipeline, or call on another pipeline, depending on the output of the current pipeline, in run time?
    d
    • 2
    • 4
  • z

    Ziren Lin

    03/06/2023, 8:55 PM
    Hi Team, I followed the instructions for Kedro Experiment Tracking. I set up everything as described HERE. But when I ran the
    kedro viz
    query, it shows error message in the screenshot. And when I clicked the experiment tracking tab, I can't see anything. I am wondering how to fix this to see the results. Can anyone please help? Thanks!
    t
    • 2
    • 6
  • z

    Ziren Lin

    03/06/2023, 10:34 PM
    Hi team, a general question for Kedro experiment tracking. If I have a bunch of potential models and preprocessing techniques we can use, how can we use the experiment tracking to compare the performances of different preprocessing techniques/models? Thanks in advance
    d
    • 2
    • 3
  • t

    Tomás Rojas

    03/06/2023, 11:32 PM
    Hi team, a very basic question about
    PartitionedDataSets
    . I noticed they return a dictionary with bounded methods for loading each dataset. My question is: Is there a way to write the nodes simply as a function of the object returned by the bounded method or should I write the nodes thinking about the dictionary?
    t
    d
    • 3
    • 8
  • b

    Brian Gereke

    03/07/2023, 6:34 PM
    Is it possible to do use the same
    globals.yml
    to template both
    parameters.yml
    and
    catalog.yml
    with the new
    OmegaConfigLoader
    ?
1...141516...31Latest