https://kedro.org/ logo
Join Slack
Powered by
# questions
  • g

    Gabriel Bandeira

    03/21/2025, 1:48 PM
    Hi, team, is there a way I can get environment variables in
    globals.yml
    ? Something like this:
    Copy code
    {import os}
    
    file_path_01_raw: /Volumes/{os.get("environment")}/01_raw
    h
    • 2
    • 2
  • p

    Pietro Peterlongo

    03/21/2025, 5:33 PM
    a question on hooks: hooks can be used to modify a parameter in the catalog? if I look at the signature of hook:
    def after_catalog_created(self, catalog: DataCatalog) -> None:
    it seems to me there is no way to make some change in the catalog, correct (I can load the parameter but I do not see a way to make a change). This might very well be on purpose (where hooks are only for logging and profiling). My use case (just so I do not follow in the XY problem trap https://en.wikipedia.org/wiki/XY_problem) is that I have a parameter where a default value should be computed starting from the value of another parameter (and this parameter is used in many nodes and I would like not to need to call the change everytime)
    h
    c
    r
    • 4
    • 4
  • r

    Richard Purvis

    03/21/2025, 7:57 PM
    Has anyone ever used one of the kedro polars datasets to load excel files from s3? It seems the calamine engine doesn't like
    fsspec
    objects and expects
    str
    or
    os.pathlike
    bytes
    .
    h
    d
    +2
    • 5
    • 10
  • v

    Viktoriia

    03/24/2025, 8:41 AM
    Hi Team, Is there a way to access the CLI arguments passed to
    kedro run
    command when creating a pipeline, i.e. within the function
    def create_pipeline(**kwargs)
    ? I'm most interested in
    conf-source
    and
    env
    .
    h
    d
    • 3
    • 17
  • g

    Gauthier Pierard

    03/25/2025, 7:52 AM
    hello, I am using
    joblib
    to run some nodes in parallel. so far it works fine, but it breaks my logging config since each separate joblib process overwrites the file as defined in
    logging.yml
    version: 1
    Copy code
    disable_existing_loggers: False
    formatters:
      simple:
        format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    handlers:
      console:
        class: logging.StreamHandler
        level: INFO
        formatter: simple
        stream: <ext://sys.stdout>
      file:
        class: logging.FileHandler
        level: INFO
        formatter: simple
        filename: "logs/kedro_run.log"
        mode: 'w'
    I could change mode to 'append' , but first i'd need to define a single logfile per kedro run (with a timestamp for example) so that I don't have the result of multiple runs in one log file. How to do this? Another solution would be to clear the log file on every run, but I'd need to dynamically retrieve the logfile path since the pipeline will run on several environments.
    h
    • 2
    • 2
  • p

    Puneet Saini

    03/25/2025, 5:29 PM
    Hey team! Do we have any plans for pycharm extensions for kedro?
    👀 2
    r
    j
    +2
    • 5
    • 13
  • l

    Lucas Fiorini

    03/26/2025, 6:31 PM
    Hey everybody! Is there any way to stop the execution without yielding an error? I have tried
    sys.exit(0)
    but the output anyway results in an ERROR. Is it possible to stop the execution without generating that type of output error?
    👀 1
    d
    n
    r
    • 4
    • 12
  • m

    minmin

    03/27/2025, 11:07 AM
    Hi kedro team, is there a way of specify tags using AND? e.g.
    kedro run --tags="tag1" and "tag2" #(only run if the node has both tag1 and tag2)
    as far as I can tell doing:
    kedro run --tags=tag1,tag2
    is akin to saying "run all nodes that have tag1 OR tag2. this is useful if you put all the parts of a pipelines namespace in the pipeline-level tags, then tag a single node in the pipleline. So you can then run one node for just one namespace.
    d
    • 2
    • 3
  • v

    Viktoriia

    03/28/2025, 8:35 AM
    Hi Team, I want to make use of templating for the catalog. My idea is to define the catalog in
    conf/base
    and then the respective parameters in each environment, like
    conf/dev
    ,
    conf/local
    , etc. I imaging something like this
    Copy code
    # conf/base/catalog.yaml
    
    companies:
      type: pandas.CSVDataset
      filepath: {_raw}/companies.csv
    Copy code
    # conf/local/catalog_globals.yaml
    
    _raw: data/raw
    Copy code
    #conf/dev/catalog_globals.yaml
    
    _raw: cloud-path/raw
    The problem is, that it only works if I have the
    catalog.yaml
    in each of the environments, which means a lot of duplication for me. Is there a better way I could do that?
    👀 1
    r
    • 2
    • 9
  • g

    Gauthier Pierard

    03/28/2025, 2:37 PM
    hello, I am using pipelines with namespaces and i need to retrieve the value of namespace in the nodes. I have a hook that works fine when i specify the namespace kedro run --namespace xx but when i run all my pipelines without specifying the --namespace then this hook returns None. How to acces the namespace (or full pipeline name
    <http://namespace.my|namespace.my>_pipeline
    ) from inside a node? my current hook:
    class NamespaceHook:
    namespace = None
    @hook_impl
    def before_pipeline_run(self, run_params, pipeline, catalog):
    NamespaceHook.namespace = run_params.get("namespace")
    <http://logger.info|logger.info>(f"Running pipeline with namespace: {NamespaceHook.namespace}")
    @staticmethod
    def get_namespace():
    return NamespaceHook.namespace
    👀 1
    r
    • 2
    • 4
  • v

    Vinicius Albert

    03/28/2025, 4:59 PM
    Hello, everyone!! I'd like to understand if kedro has a way to translate Kedro Pipelines in Databricks workflow, similar as Kedro has for VertexAI.
    👌 1
    j
    • 2
    • 1
  • m

    Mohamed El Guendouz

    03/31/2025, 2:03 PM
    Hello 🙂 In Kedro, can we use different dataset types for reading and writing the same dataset declared in the catalog? For example, reading in Spark and writing in Delta.
    d
    • 2
    • 3
  • p

    Puneet Saini

    04/01/2025, 5:33 AM
    Hey team! Where can I find kedro's yaml schema for catalog that I can set in my IDE?
    h
    d
    +2
    • 5
    • 7
  • r

    Robert Kwiatkowski

    04/01/2025, 7:39 AM
    Hello Team! Is there a way to use a conditional statement to run
    pipeline_1
    if condition A is True, and
    pipeline_2
    if condition B is True?
    h
    • 2
    • 5
  • g

    Gauthier Pierard

    04/01/2025, 12:14 PM
    ladies&gents, any plan to make partitioned datasets compatible with versioning?
    n
    d
    • 3
    • 16
  • b

    Bibo Bobo

    04/01/2025, 1:31 PM
    Hello guys, is there a way to reference parameters from the catalog? (the same way it works with credentials) In my case I am running an experiment with langchain and I am loading the instance of the class that creates a chain using catalog (similar to how it is done in the
    kedro_datasets_experimental
    or how similar to how partitions datasets do). So I need to pass some credentials to initialize the langchain instance of the model (OpenAI for example) which I can do just fine. The problem is that I want to have model name inside the parameters too because I also use kedro-mlflow plugin which automatically logs parameters to the mlflow and I want the model name and probably other params (e.g. temperature) to be logged too.
    h
    • 2
    • 2
  • g

    Gauthier Pierard

    04/01/2025, 2:25 PM
    what is the best way to access the catalog as defined in
    catalog.yml
    from a node? I'd like to add a dynamically defined dataset to it.
    h
    d
    e
    • 4
    • 6
  • r

    Rakib Sheikh

    04/02/2025, 6:34 AM
    Is there any recent documentation about using kedro with UV ? I only found somes github PR while researching at google
    j
    • 2
    • 2
  • m

    Matthias Roels

    04/02/2025, 8:40 AM
    Does kedro still requires git? I know in the past, I ran into issues when git was not installed in my prod environment.
    j
    n
    • 3
    • 5
  • n

    Nicolas Betancourt Cardona

    04/02/2025, 2:24 PM
    Hi, I'm having trouble when combining yielding nodes and overwriting partitioned datasets. I am using a partitioned dataset of a custom datset like this:
    Copy code
    partitioned_audio_dataset:
      type: partitions.PartitionedDataset
      path: data/output/mainfolder
      dataset:
        type: my_kedro_project.datasets.audio_dataset.SoundDataset
      filename_suffix: ".WAV"
    The node which outputs correspond to this catalog entry yields several dictionaries with keys of the form "subfolder_name/file.wav" so that when the node is done the output main folder should look like this:
    Copy code
    mainfolder:
          subdolder_1
          subfolder_2
          subfolder_3
          ....
          subfolder_n
    and inside each subfolder_i there must be several .wav files. This is working fine but the problem is when I run the node a second time. I would like the possibility to overwrite instead of adding new files to each subfolder. I thought the
    overwrite
    parameter of partitioned datasets would help but I think it does not quite works as desired when yielding. If I change the catalog entry to
    Copy code
    partitioned_audio_dataset:
      type: partitions.PartitionedDataset
      path: data/output/mainfolder
      overwrite: True
      dataset:
        type: my_kedro_project.datasets.audio_dataset.SoundDataset
      filename_suffix: ".WAV"
    then the main folder looks like this:
    Copy code
    mainfolder:
          subdolder_n
    with only one single WAV file in subfolder_n because each time the node yields it is deleting previous yielded files and folders. Is there a way I can use the overwrite parameter of partitioned dataset when yielding and obtain the desired folder structure?
    👀 1
    r
    • 2
    • 11
  • g

    Gabriel Aguiar

    04/02/2025, 3:51 PM
    Hello Kedro community, I'm encountering issues while trying to measure the execution time of my pipelines and manage namespaces dynamically, and I'm facing two distinct problems depending on whether or not I use namespaces. Context: ◦ *Project Structure:*My pipelines are organized into folders within
    src/peloptmize/pipelines/
    , where the folder name corresponds to the desired namespace. • Example: ▪︎
    src/peloptmize/pipelines/data_processing/pipeline.py
    -> namespace:
    data_processing
    ◦
    src/peloptmize/pipelines/data_science/pipeline.py
    -> namespace:
    data_science
    ◦ *Goal:*I want Kedro to dynamically infer the namespace of each pipeline based on the project's folder structure, without explicitly defining namespaces in nodes or pipelines. ◦ Also, I want to measure the execution time of each pipeline. Hook Code: To measure execution time and infer namespaces, I've implemented the following hook:
    Copy code
    from kedro.framework.context import KedroContext
    from kedro.framework.hooks import hook_impl
    from kedro.framework.project import pipelines
    from kedro.io import DataCatalog
    import os
    import time
    import pandas as pd
    from collections import defaultdict
    from kedro.pipeline import Pipeline
    from pathlib import Path
    
    class ProjectHooks:
        def __init__(self):
            self._pipeline_times = defaultdict(float)
            self._start_node_time = {}
            self._node_to_pipeline = {}
            self._printed = False
    
        @hook_impl
        def after_context_created(self, context: KedroContext) -> None:
            # ... (your databricks code) ...
            context.catalog
    
        @hook_impl
        def after_catalog_created(self, catalog: DataCatalog, conf_catalog) -> None:
            pipeline_registry.register_pipelines = pipeline_registry.register_dynamic_pipelines(catalog)
            pipelines.configure("peloptmize.pipeline_registry")
    
        @hook_impl
        def before_pipeline_run(self, pipeline: Pipeline, run_params, catalog):
            filepath = pipeline.filepath
            path = Path(filepath)
            parts = path.parts
            if "pipelines" in parts:
                namespace_index = parts.index("pipelines") + 1
                if namespace_index < len(parts) - 1:
                    namespace = parts[namespace_index]
                else:
                    namespace = "default"
            else:
                namespace = "default"
    
            for node in pipeline.nodes:
                node_name = node.name
                self._node_to_pipeline[node_name] = namespace
                print(f"Node: {node_name}, Namespace: {namespace}") # Added logs
    
        @hook_impl
        def before_node_run(self, node, catalog, inputs):
            self._start_node_time[node.name] = time.time()
    
        @hook_impl
        def after_node_run(self, node, catalog, inputs, outputs):
            start_time = self._start_node_time.get(node.name)
            if start_time:
                duration = time.time() - start_time
                subpipeline_name = self._node_to_pipeline.get(node.name, "unknown")
                self._pipeline_times[subpipeline_name] += duration
    
        @hook_impl
        def after_pipeline_run(self, pipeline, run_params, catalog):
            if not self._printed:
                self._printed = True
                df = pd.DataFrame.from_dict(
                    self._pipeline_times, orient="index", columns=["execution_time_seconds"]
                ).reset_index(names="subpipeline")
                df = df.sort_values("execution_time_seconds", ascending=False)
    
                print("\n" + "=" * 60)
                print("TEMPOS DE EXECUÇÃO POR SUBPIPELINE (dentro de __default__ ou All)")
                print("=" * 60)
                print(df.to_string(index=False, float_format="%.2f"))
                print("=" * 60 + "\n")
    Problems: ◦ *Namespace Issue (Without Explicit Namespaces):*When I do not explicitly define namespaces in my pipelines or nodes, execution times are aggregated under the name "no_namespace," indicating that nodes are not being correctly associated with their inferred namespaces. ◦ *Catalog Issue (With Namespaces):*However, when I do use namespaces in my pipelines, I encounter a "dataset not found" error when executing
    kedro run
    , even though the dataset is listed in my
    catalog.yml
    .
    Copy code
    ValueError: Pipeline input(s) {'generate_constraints.constraints_US8',...
    ### The generate_constraints in this case is the name of the namespace.
    Questions: • How can I resolve the "dataset not found" problem in the
    catalog.yml
    when using namespaces? • Are there more robust approaches to handling dynamic namespaces and time measurement in different environments? • Any help or suggestions would be greatly appreciated! kedro 0.19.5 kedro-datasets 3.0.1
    h
    a
    • 3
    • 8
  • v

    Vinicius Albert

    04/02/2025, 5:53 PM
    Hi everyone, I'm working with Kedro and using databricks.ManagedTableDataset to manage Delta tables in Databricks. I need to parameterize the catalog so that it not only defines the schema (column names and types) but also allows storing metadata such as: Column Type Comment (description of the column) Tags (custom metadata like domain, etc.) Here’s an example of my current catalog.yml:
    Copy code
    test_save:
      type: databricks.ManagedTableDataset
      catalog: blabla
      database: blabla
      table: blabla
      dataframe_type: spark
      write_mode: "overwrite"
      schema:
       fields:
         - name: "column_name"
           type: "column_type"
           nullable: false
           comment: "the description of column_name"
           tags:
              first_tag: "first_tag value"
              second_tag: "second_tag value"
    h
    • 2
    • 1
  • m

    Matthias Roels

    04/02/2025, 8:50 PM
    Does anyone have experience using kedro together with ray? If so, how do you use the two together?
    d
    • 2
    • 5
  • g

    Galen Seilis

    04/02/2025, 9:32 PM
    Is there a reference for what all the allowed hook arguments are? I see various allowed inputs depending on the type of hook, but I feel unsure about what is allowed in general. • https://docs.kedro.org/en/stable/hooks/introduction.html • https://docs.kedro.org/en/stable/hooks/common_use_cases.html • https://docs.kedro.org/en/stable/hooks/examples.html
    d
    j
    • 3
    • 8
  • b

    Bibo Bobo

    04/03/2025, 12:17 PM
    Hey guys, could you clarify if it's expected behavior that when you provide custom
    CONFIG_LOADER_ARGS
    in
    settings.py
    , some default keys get overwritten—even if you don’t explicitly override them? For example, if you set
    CONFIG_LOADER_ARGS
    to an empty dict, or only update something (e.g. the
    config_patterns
    ), the
    base_env
    becomes empty. So something like:
    Copy code
    CONFIG_LOADER_ARGS = {}
    # or
    CONFIG_LOADER_ARGS = {
        "config_patterns": {
            "globals": ["globals*", "globals*/**", "**/globals*"],
        }
    }
    breaks the configuration loading because
    base_env
    ends up being
    None
    . I’m asking because I expected
    CONFIG_LOADER_ARGS
    to act as an update to the default values, not a full replacement. From what I’ve seen with other keys, it seems like that is how it works—for example, other patterns remain intact even if you don’t include them in your custom
    CONFIG_LOADER_ARGS
    .
    a
    • 2
    • 2
  • g

    Gauthier Pierard

    04/04/2025, 3:26 AM
    I have a regular, non-namespaced pipeline followed by namespaced pipelines that depend on the outputs of the first one.
    kedro run
    runs everything fine but
    kedro run --namespace xx
    only executes the namespaced pipelines and skips the initial one, relying on outdated outputs. how to execute the initial one also when specifying the
    --namespace
    ?
    h
    • 2
    • 2
  • g

    Galen Seilis

    04/04/2025, 9:26 PM
    Assuming a fresh Kedro project without any custom starters/templates. What should I put for the type annotation of kwargs in create_pipeline?
    r
    • 2
    • 3
  • g

    Galen Seilis

    04/05/2025, 2:13 PM
    Is using
    kedro run --load-versions
    a supported way to reproduce a previous run using Kedro (and none of the integrations with DVC/iceberg/etc)?
    d
    • 2
    • 2
  • e

    Elvira Salakhova

    04/07/2025, 8:29 AM
    Hello, everyone! How do you manage model versioning in MLFlow inside kedro pipeline?
    d
    • 2
    • 2
  • c

    Chee Ming Siow

    04/07/2025, 8:45 AM
    Hi, require some clarification on OmegaConfigLoader(). I have duplicated keys across base and local conf environment. How do retrieve the configs without
    ValueError: Duplicate keys found in ...
    ? In my code, have a function that runs before the actual kedro pipeline. I wish to retrieve the config in the function and prioritize the config attributes defined in local env sample code
    Copy code
    ###### main.py #####
    if __name__ == "__main__":
        # Bootstrap the project to make the config loader available
        project_path = Path.cwd()
        bootstrap_project(project_path)
    
        # Create a Kedro session
        with KedroSession.create(project_path=project_path) as session:
            # You can now access the catalog, pipeline, etc. from the session
            # For example, to run the pipeline:
            conf_eda() # <------------- function
            session.run()
            pass
    
    ##### myfunc.py #####
    
    def conf_eda():
        project_path = Path.cwd()
        conf_path = str(project_path/"conf")
        conf_loader = OmegaConfigLoader(
            conf_source=conf_path,
            )  
        parameters = conf_loader["parameters"] # <----------- error
    
        print(parameters["model_options"])
    
    ##### conf/base/parameters_data_science.yml #####
    model_options:
      test_size: 100
      random_state: 3
    
    ##### conf/local/parameters_data_science.yml #####
    model_options:
      test_size: 300
      random_state: 3
    d
    • 2
    • 5
1...2728293031Latest