Gabriel Bandeira
03/21/2025, 1:48 PMglobals.yml
?
Something like this:
{import os}
file_path_01_raw: /Volumes/{os.get("environment")}/01_raw
Pietro Peterlongo
03/21/2025, 5:33 PMdef after_catalog_created(self, catalog: DataCatalog) -> None:
it seems to me there is no way to make some change in the catalog, correct (I can load the parameter but I do not see a way to make a change). This might very well be on purpose (where hooks are only for logging and profiling). My use case (just so I do not follow in the XY problem trap https://en.wikipedia.org/wiki/XY_problem) is that I have a parameter where a default value should be computed starting from the value of another parameter (and this parameter is used in many nodes and I would like not to need to call the change everytime)Richard Purvis
03/21/2025, 7:57 PMfsspec
objects and expects str
or os.pathlike
bytes
.Viktoriia
03/24/2025, 8:41 AMkedro run
command when creating a pipeline, i.e. within the function def create_pipeline(**kwargs)
? I'm most interested in conf-source
and env
.Gauthier Pierard
03/25/2025, 7:52 AMjoblib
to run some nodes in parallel. so far it works fine, but it breaks my logging config since each separate joblib process overwrites the file as defined in logging.yml
version: 1
disable_existing_loggers: False
formatters:
simple:
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: simple
stream: <ext://sys.stdout>
file:
class: logging.FileHandler
level: INFO
formatter: simple
filename: "logs/kedro_run.log"
mode: 'w'
I could change mode to 'append' , but first i'd need to define a single logfile per kedro run (with a timestamp for example) so that I don't have the result of multiple runs in one log file. How to do this?
Another solution would be to clear the log file on every run, but I'd need to dynamically retrieve the logfile path since the pipeline will run on several environments.Puneet Saini
03/25/2025, 5:29 PMLucas Fiorini
03/26/2025, 6:31 PMsys.exit(0)
but the output anyway results in an ERROR. Is it possible to stop the execution without generating that type of output error?minmin
03/27/2025, 11:07 AMkedro run --tags="tag1" and "tag2" #(only run if the node has both tag1 and tag2)
as far as I can tell doing:
kedro run --tags=tag1,tag2
is akin to saying "run all nodes that have tag1 OR tag2.
this is useful if you put all the parts of a pipelines namespace in the pipeline-level tags, then tag a single node in the pipleline. So you can then run one node for just one namespace.Viktoriia
03/28/2025, 8:35 AMconf/base
and then the respective parameters in each environment, like conf/dev
, conf/local
, etc. I imaging something like this
# conf/base/catalog.yaml
companies:
type: pandas.CSVDataset
filepath: {_raw}/companies.csv
# conf/local/catalog_globals.yaml
_raw: data/raw
#conf/dev/catalog_globals.yaml
_raw: cloud-path/raw
The problem is, that it only works if I have the catalog.yaml
in each of the environments, which means a lot of duplication for me. Is there a better way I could do that?Gauthier Pierard
03/28/2025, 2:37 PM<http://namespace.my|namespace.my>_pipeline
) from inside a node?
my current hook:
class NamespaceHook:
namespace = None
@hook_impl
def before_pipeline_run(self, run_params, pipeline, catalog):
NamespaceHook.namespace = run_params.get("namespace")
<http://logger.info|logger.info>(f"Running pipeline with namespace: {NamespaceHook.namespace}")
@staticmethod
def get_namespace():
return NamespaceHook.namespace
Vinicius Albert
03/28/2025, 4:59 PMMohamed El Guendouz
03/31/2025, 2:03 PMPuneet Saini
04/01/2025, 5:33 AMRobert Kwiatkowski
04/01/2025, 7:39 AMpipeline_1
if condition A is True, and pipeline_2
if condition B is True?Gauthier Pierard
04/01/2025, 12:14 PMBibo Bobo
04/01/2025, 1:31 PMkedro_datasets_experimental
or how similar to how partitions datasets do).
So I need to pass some credentials to initialize the langchain instance of the model (OpenAI for example) which I can do just fine. The problem is that I want to have model name inside the parameters too because I also use kedro-mlflow plugin which automatically logs parameters to the mlflow and I want the model name and probably other params (e.g. temperature) to be logged too.Gauthier Pierard
04/01/2025, 2:25 PMcatalog.yml
from a node? I'd like to add a dynamically defined dataset to it.Rakib Sheikh
04/02/2025, 6:34 AMMatthias Roels
04/02/2025, 8:40 AMNicolas Betancourt Cardona
04/02/2025, 2:24 PMpartitioned_audio_dataset:
type: partitions.PartitionedDataset
path: data/output/mainfolder
dataset:
type: my_kedro_project.datasets.audio_dataset.SoundDataset
filename_suffix: ".WAV"
The node which outputs correspond to this catalog entry yields several dictionaries with keys of the form "subfolder_name/file.wav" so that when the node is done the output main folder should look like this:
mainfolder:
subdolder_1
subfolder_2
subfolder_3
....
subfolder_n
and inside each subfolder_i there must be several .wav files. This is working fine but the problem is when I run the node a second time. I would like the possibility to overwrite instead of adding new files to each subfolder. I thought the overwrite
parameter of partitioned datasets would help but I think it does not quite works as desired when yielding. If I change the catalog entry to
partitioned_audio_dataset:
type: partitions.PartitionedDataset
path: data/output/mainfolder
overwrite: True
dataset:
type: my_kedro_project.datasets.audio_dataset.SoundDataset
filename_suffix: ".WAV"
then the main folder looks like this:
mainfolder:
subdolder_n
with only one single WAV file in subfolder_n because each time the node yields it is deleting previous yielded files and folders. Is there a way I can use the overwrite parameter of partitioned dataset when yielding and obtain the desired folder structure?Gabriel Aguiar
04/02/2025, 3:51 PMsrc/peloptmize/pipelines/
, where the folder name corresponds to the desired namespace.
• Example:
▪︎ src/peloptmize/pipelines/data_processing/pipeline.py
-> namespace: data_processing
◦ src/peloptmize/pipelines/data_science/pipeline.py
-> namespace: data_science
◦ *Goal:*I want Kedro to dynamically infer the namespace of each pipeline based on the project's folder structure, without explicitly defining namespaces in nodes or pipelines.
◦ Also, I want to measure the execution time of each pipeline.
Hook Code:
To measure execution time and infer namespaces, I've implemented the following hook:
from kedro.framework.context import KedroContext
from kedro.framework.hooks import hook_impl
from kedro.framework.project import pipelines
from kedro.io import DataCatalog
import os
import time
import pandas as pd
from collections import defaultdict
from kedro.pipeline import Pipeline
from pathlib import Path
class ProjectHooks:
def __init__(self):
self._pipeline_times = defaultdict(float)
self._start_node_time = {}
self._node_to_pipeline = {}
self._printed = False
@hook_impl
def after_context_created(self, context: KedroContext) -> None:
# ... (your databricks code) ...
context.catalog
@hook_impl
def after_catalog_created(self, catalog: DataCatalog, conf_catalog) -> None:
pipeline_registry.register_pipelines = pipeline_registry.register_dynamic_pipelines(catalog)
pipelines.configure("peloptmize.pipeline_registry")
@hook_impl
def before_pipeline_run(self, pipeline: Pipeline, run_params, catalog):
filepath = pipeline.filepath
path = Path(filepath)
parts = path.parts
if "pipelines" in parts:
namespace_index = parts.index("pipelines") + 1
if namespace_index < len(parts) - 1:
namespace = parts[namespace_index]
else:
namespace = "default"
else:
namespace = "default"
for node in pipeline.nodes:
node_name = node.name
self._node_to_pipeline[node_name] = namespace
print(f"Node: {node_name}, Namespace: {namespace}") # Added logs
@hook_impl
def before_node_run(self, node, catalog, inputs):
self._start_node_time[node.name] = time.time()
@hook_impl
def after_node_run(self, node, catalog, inputs, outputs):
start_time = self._start_node_time.get(node.name)
if start_time:
duration = time.time() - start_time
subpipeline_name = self._node_to_pipeline.get(node.name, "unknown")
self._pipeline_times[subpipeline_name] += duration
@hook_impl
def after_pipeline_run(self, pipeline, run_params, catalog):
if not self._printed:
self._printed = True
df = pd.DataFrame.from_dict(
self._pipeline_times, orient="index", columns=["execution_time_seconds"]
).reset_index(names="subpipeline")
df = df.sort_values("execution_time_seconds", ascending=False)
print("\n" + "=" * 60)
print("TEMPOS DE EXECUÇÃO POR SUBPIPELINE (dentro de __default__ ou All)")
print("=" * 60)
print(df.to_string(index=False, float_format="%.2f"))
print("=" * 60 + "\n")
Problems:
◦ *Namespace Issue (Without Explicit Namespaces):*When I do not explicitly define namespaces in my pipelines or nodes, execution times are aggregated under the name "no_namespace," indicating that nodes are not being correctly associated with their inferred namespaces.
◦ *Catalog Issue (With Namespaces):*However, when I do use namespaces in my pipelines, I encounter a "dataset not found" error when executing kedro run
, even though the dataset is listed in my catalog.yml
.
ValueError: Pipeline input(s) {'generate_constraints.constraints_US8',...
### The generate_constraints in this case is the name of the namespace.
Questions:
• How can I resolve the "dataset not found" problem in the catalog.yml
when using namespaces?
• Are there more robust approaches to handling dynamic namespaces and time measurement in different environments?
• Any help or suggestions would be greatly appreciated!
kedro 0.19.5
kedro-datasets 3.0.1Vinicius Albert
04/02/2025, 5:53 PMtest_save:
type: databricks.ManagedTableDataset
catalog: blabla
database: blabla
table: blabla
dataframe_type: spark
write_mode: "overwrite"
schema:
fields:
- name: "column_name"
type: "column_type"
nullable: false
comment: "the description of column_name"
tags:
first_tag: "first_tag value"
second_tag: "second_tag value"
Matthias Roels
04/02/2025, 8:50 PMGalen Seilis
04/02/2025, 9:32 PMBibo Bobo
04/03/2025, 12:17 PMCONFIG_LOADER_ARGS
in settings.py
, some default keys get overwritten—even if you don’t explicitly override them?
For example, if you set CONFIG_LOADER_ARGS
to an empty dict, or only update something (e.g. the config_patterns
), the base_env
becomes empty. So something like:
CONFIG_LOADER_ARGS = {}
# or
CONFIG_LOADER_ARGS = {
"config_patterns": {
"globals": ["globals*", "globals*/**", "**/globals*"],
}
}
breaks the configuration loading because base_env
ends up being None
.
I’m asking because I expected CONFIG_LOADER_ARGS
to act as an update to the default values, not a full replacement. From what I’ve seen with other keys, it seems like that is how it works—for example, other patterns remain intact even if you don’t include them in your custom CONFIG_LOADER_ARGS
.Gauthier Pierard
04/04/2025, 3:26 AMkedro run
runs everything fine but kedro run --namespace xx
only executes the namespaced pipelines and skips the initial one, relying on outdated outputs. how to execute the initial one also when specifying the --namespace
?Galen Seilis
04/04/2025, 9:26 PMGalen Seilis
04/05/2025, 2:13 PMkedro run --load-versions
a supported way to reproduce a previous run using Kedro (and none of the integrations with DVC/iceberg/etc)?Elvira Salakhova
04/07/2025, 8:29 AMChee Ming Siow
04/07/2025, 8:45 AMValueError: Duplicate keys found in ...
?
In my code, have a function that runs before the actual kedro pipeline. I wish to retrieve the config in the function and prioritize the config attributes defined in local env
sample code
###### main.py #####
if __name__ == "__main__":
# Bootstrap the project to make the config loader available
project_path = Path.cwd()
bootstrap_project(project_path)
# Create a Kedro session
with KedroSession.create(project_path=project_path) as session:
# You can now access the catalog, pipeline, etc. from the session
# For example, to run the pipeline:
conf_eda() # <------------- function
session.run()
pass
##### myfunc.py #####
def conf_eda():
project_path = Path.cwd()
conf_path = str(project_path/"conf")
conf_loader = OmegaConfigLoader(
conf_source=conf_path,
)
parameters = conf_loader["parameters"] # <----------- error
print(parameters["model_options"])
##### conf/base/parameters_data_science.yml #####
model_options:
test_size: 100
random_state: 3
##### conf/local/parameters_data_science.yml #####
model_options:
test_size: 300
random_state: 3