Elvira Salakhova
04/07/2025, 8:29 AMChee Ming Siow
04/07/2025, 8:45 AMValueError: Duplicate keys found in ...
?
In my code, have a function that runs before the actual kedro pipeline. I wish to retrieve the config in the function and prioritize the config attributes defined in local env
sample code
###### main.py #####
if __name__ == "__main__":
# Bootstrap the project to make the config loader available
project_path = Path.cwd()
bootstrap_project(project_path)
# Create a Kedro session
with KedroSession.create(project_path=project_path) as session:
# You can now access the catalog, pipeline, etc. from the session
# For example, to run the pipeline:
conf_eda() # <------------- function
session.run()
pass
##### myfunc.py #####
def conf_eda():
project_path = Path.cwd()
conf_path = str(project_path/"conf")
conf_loader = OmegaConfigLoader(
conf_source=conf_path,
)
parameters = conf_loader["parameters"] # <----------- error
print(parameters["model_options"])
##### conf/base/parameters_data_science.yml #####
model_options:
test_size: 100
random_state: 3
##### conf/local/parameters_data_science.yml #####
model_options:
test_size: 300
random_state: 3
Puneet Saini
04/07/2025, 10:59 AMRobert Kwiatkowski
04/08/2025, 10:53 AMWinston Ong
04/08/2025, 3:43 PMkedro run --pipeline data_processing --env=production
from the spaceflights-pandas starter.
DatasetError: Failed while loading data from dataset CSVDataset(filepath=bucket-name/companies.csv, load_args={}, protocol=s3,
save_args={'index': False}).
Forbidden
conf/production/catalog.yml:
companies:
type: pandas.CSVDataset
filepath: <s3://bucket-name/companies.csv>
credentials: prod_s3
reviews:
type: pandas.CSVDataset
filepath: <s3://bucket-name/reviews.csv>
credentials: prod_s3
shuttles:
type: pandas.ExcelDataset
filepath: <s3://bucket-name/shuttles.xlsx>
load_args:
engine: openpyxl
credentials: prod_s3
conf/production/credentials.yml:
prod_s3:
client_kwargs:
aws_access_key_id: <<access_key>>
aws_secret_access_key: <<secret_access_key>>
I'm quite sure my credentials are correct and bucket access is okay, because I ran the following script and I am able to retrieve the file.
import boto3
s3 = boto3.client(
's3',
aws_access_key_id='<<access_key>>',
aws_secret_access_key='<<secret_access_key>>'
)
response = s3.get_object(Bucket='bucket-name', Key='companies.csv')
print(response['Body'].read().decode())
Winston Ong
04/09/2025, 12:03 AMPuneet Saini
04/09/2025, 1:32 PMif "parameters" in key
in the code for omegaconf_config.py
. Imagine a scenario where we are trying to load some common parameters using common
patterns and country
parameters using country
patterns in settings.py. In that case we can actually do country_parameters
or common_parameters
and it would still work as expected. Need your thoughtsRalf Kowatsch
04/10/2025, 9:21 AMBibo Bobo
04/11/2025, 11:43 AM"{namespace}.{layer}-{folder}#csv_all":
type: "${globals:datasets.partitioned_dataset}"
path: data/{layer}/{namespace}/{folder}
dataset:
type: "${globals:datasets.pandas_csv}"
"{namespace}.{layer}-{filename}#single_csv":
type: "${globals:datasets.pandas_csv}"
filepath: data/{layer}/{namespace}/{filename}.csv
And in pipeline definitions I can have either something like this
pipeline(
[
node(
func=do_stuff,
inputs=[
# other params
"05_model_input-folder_name#csv_all",
],
outputs="some_output",
)
],
namespace="some_namespace",
)
Or something like this depending on whether I want to make a test run on fraction of the data or on the full dataset
pipeline(
[
node(
func=do_stuff,
inputs=[
# other params
"05_model_input-filename#single_csv",
],
outputs="some_output",
)
],
namespace="some_namespace",
)
And I want to have a configuration in yaml where I can easily change the type of the dataset that is used in the pipeline.
Ideally I would like to have a single config from which I can set all the parameters that are used in the pipeline. And have something like this as a result
pipeline(
[
node(
func=do_stuff,
inputs=[
# other params
"dataset",
],
outputs="some_output",
)
],
namespace="some_namespace",
)
I see that when you create pipelines using Kedro cli it creates function with this signature def create_pipeline(**kwargs) -> Pipeline:
so I assume there is way to provide params and have something like this
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=do_stuff,
inputs=[
# other params
kwargs.get("dataset"),
],
outputs="some_output",
)
],
namespace="some_namespace",
)
But I am not sure how to do it in a right way. I have several pipelines like this and want all of them to be dynamic like this. Should I change the default logic in pipeline_registry.py
and pass those kwargs from there or is there a more simple way to achieve something like this?Davi Sales Barreira
04/11/2025, 5:42 PMkedro
with uv
. If I start the package with Pyspark, I get an error. Here are the steps to reproduce.
Start running:
uvx kedro new
When prompted, I choose the option to install all tools (this includes pyspark).
The project is created. I get into the directory and run:
uv run ipython
Inside ipython, if I try %load_ext kedro.ipython
, then I get the error:
The operation couldn't be completed. Unable to locate a Java Runtime.
Please visit <http://www.java.com> for information on installing Java.
/Users/davi/test/.venv/lib/python3.11/site-packages/pyspark/bin/spark-class: line 97: CMD: bad array subscript
head: illegal line count -- -1
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ in <module>:1 │
│ │
│ /Users/davi/test/.venv/lib/python3.11/site-packages/IPyt │
│ hon/core/interactiveshell.py:2482 in run_line_magic │
│ │
│ 2479 │ │ │ if getattr(fn, "needs_local_scope", False): │
│ 2480 │ │ │ │ kwargs['local_ns'] = self.get_local_scope(stack_depth) │
│ 2481 │ │ │ with self.builtin_trap: │
│ ❱ 2482 │ │ │ │ result = fn(*args, **kwargs)
....
PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
Any idea on what might be happening? BTW, I'm on a Mac.Davi Sales Barreira
04/12/2025, 1:57 PMpolars.ParquetDataset
does not exists.Daniel Mesquita
04/14/2025, 3:27 PMkedro run -p subpipe
would need to rely on tags to execute a part of it. Is there any feature like this?Mohamed El Guendouz
04/14/2025, 4:00 PMValueError: Failed to find the pipeline named 'XXXXXX'. It needs to be generated and returned by the 'register_pipelines' function.
However, when I run kedro run --pipeline <pipeline>
locally on my machine, the pipeline is correctly detected and executed.
Just to clarify, I do have an __init__.py
file in the pipeline directory, and my register_pipelines()
function uses find_pipelines()
as shown below:
from kedro.framework.project import find_pipelines
from kedro.pipeline import Pipeline
def register_pipelines() -> dict[str, Pipeline]:
pipelines = find_pipelines()
pipelines["__default__"] = sum(pipelines.values())
return pipelines
Do you have any idea what could be causing this issue on the cluster? Any insights or suggestions would be greatly appreciated.
Thank you in advance!Sven-Arne Quist
04/15/2025, 9:23 AMManoel Pereira de Queiroz
04/15/2025, 6:32 PMconf
directory) so I can easily access parameters and datasets with catalog.load
or the ConfigLoader
class, instead of configuring my connection to GCP + recreate the parameters from the ground-up in the new application?
Thanks in advance and keep up with the good work, this project is awesome!Łukasz Janiec
04/17/2025, 12:09 PMdef get_shortest_path(
G: nx.MultiDiGraph, origin: tuple[float, float], destination: tuple[float, float]
) -> list[tuple[int, int]]:
"""
Get the shortest path between two points in the graph.
:param G: The road network graph.
:param origin: The (latitude, longitude) of the origin.
:param destination: The (latitude, longitude) of the destination.
:return: list of edges in the shortest path.
"""
orig_node = ox.distance.nearest_nodes(G, origin[1], origin[0])
dest_node = ox.distance.nearest_nodes(G, destination[1], destination[0])
shortest_path = nx.shortest_path(G, orig_node, dest_node, weight="length")
path_edges = list(zip(shortest_path[:-1], shortest_path[1:]))
return path_edges
But it becomes a problem when I am trying to use it with CLI `kedro run`:
UserWarning: An error occurred while importing the
'networking_route_optimizer.pipelines.data_ingestion' module. Nothing defined therein will be
returned by 'find_pipelines'.
Traceback (most recent call last):
File "/home/ljaniec/.local/lib/python3.10/site-packages/kedro/framework/project/__init__.py", line
442, in find_pipelines
pipeline_module = importlib.import_module(pipeline_module_name)
File "/usr/lib/python3.10/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name, package, level)
File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 688, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 883, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File
"/home/ljaniec/workspace/networking-route-optimizer/src/networking_route_opti
mizer/pipelines/data_ingestion/__init__.py", line 1, in <module>
from .pipeline import create_pipeline # NOQA
File
"/home/ljaniec/workspace/networking-route-optimizer/src/networking_route_opti
mizer/pipelines/data_ingestion/pipeline.py", line 3, in <module>
from networking_route_optimizer.pipelines.data_ingestion.nodes import (
File
"/home/ljaniec/workspace/networking-route-optimizer/src/networking_route_opti
mizer/pipelines/data_ingestion/nodes.py", line 3, in <module>
import osmnx as ox
ModuleNotFoundError: No module named 'osmnx'
warnings.warn(
What is the problem there? I know that OSMNx standard installation uses conda
and I installed it with pip
, but I would expect this to be a problem both in script and in the pipeline...Fazil Topal
04/20/2025, 2:13 PMHugo Acosta
04/21/2025, 7:52 AMSudip Bhandari
04/22/2025, 2:52 PMmykedroproject/
), as specified in my catalog.yml
. However, I've noticed that when I implement MLflow, artifacts and metrics are logged in a different location (under the mlruns
directory). This results in the same outputs being stored twice: once through Kedro and again via MLflow.
Do you have any advice on how to address this issue so that I store results only once? Ideally, I would like to have specific artifacts displayed in the MLflow UI, sourced directly from the mykedroproject/
folder.
Thanks in advance!!Puneet Saini
04/28/2025, 8:20 AMpolars.LazyPolarsDataset
for which I assume the filepath needs to be a glob pattern. But since kedro-datasets>=6.0.0, we are checking the availability of the file itself without expanding the glob pattern if passed in. Is this a bug or am I doing something wrong?Fazil Topal
04/28/2025, 10:53 AMJuan Luis
04/28/2025, 11:13 AMMikołaj Tym
04/28/2025, 1:44 PMJordan Barlow
04/28/2025, 4:51 PMibis.FileDataset
, kedro-datasets>=7.0.0
).
Kedro seems to make an assumption with the filepath
catalog key of a dataset, that the dataset can be read from and written to that same path.
However, Backend.write_parquet
and <http://Backend.to|Backend.to>_parquet
are different when load_args={'hive_partitioning': True}
, as the corresponding DuckDB functions require a directory arg when writing, but a nested glob when reading:
https://duckdb.org/docs/stable/data/partitioning/hive_partitioning.html
This is reflected at the Ibis level as well:
https://github.com/ibis-project/ibis/issues/10939
Things still work if you have a catalog entry like this:
my_hive:
type: ibis.FileDataset
filepath: data/01_raw/my_hive/first_col=*/second_col=*/*.parquet
table_name: my_hive
file_format: parquet
connection: ${_duckdb}
load_args:
hive_partitioning: true
save_args:
partition_by: ${tuple:first_col,second_col}
But the write operation will treat the entire filepath like a directory path, and you end up with something like:
my_hive
└── first_col=*
└── second_col=*
└── *.parquet
└── first_col=val_1
├── second_col=cat_1
│ └── data_0.parquet
├── second_col=cat_2
└── data_0.parquet
└── ...
This isn't really a Kedro design problem – perhaps the DuckDB API should be more symmetric. Has anyone else overcome this at the Kedro level?
Thanks.Lino Fernandes
04/29/2025, 6:02 AMbefore_dataset_loaded
/ before_dataset_saved
pandas.ParquetDataset
and partitions.PartitionedDataset
Matthias Roels
04/29/2025, 4:26 PMPythonModel
in case you want to store a model combined with its preprocessing steps (which you always have to do imo). How can you do that with kedro (or kedro-mlflow)?
The problem is that you probably fitted preprocessors in earlier nodes and persisted the result. As far as I can tell from the docs, MLflow requires its artifacts in a custom models to be persisted on disk (which you can do with the catalog) but these path strings are not readily available in the kedro nodes to be passed to the constructor of pyfunc…
Any tips, ideas welcome 😀Pedro Sousa Silva
04/30/2025, 9:34 AMroot: ${oc.env:AWS_S3_ROOT}
AWS_S3_ROOT
Nicolas Betancourt Cardona
04/30/2025, 2:55 PMPedro Sousa Silva
05/05/2025, 1:29 PMglobals
configuration.", so I wonder if there's any workaround to my requirement:
We have a project where the frontend action will trigger my kedro run in Databricks (via Databricks Jobs REST API). Some parameters from the frontend will override some of my default kedro parameters (this works fine), but i also need to override a dataset definition based on one of these parameters. Particularly, i want my dataset to be written to a specific location that depends on a runtime_param `simulation_id`:
my globals.yaml:
root: ${oc.env:AWS_S3_ROOT}
simulation_id: ${uuid:""} # ideally something like ${runtime_params:simulation_id}, but i know it's not possible
folders:
m_frontend: "09_frontend_reporting/${..simulation_id}"
my catalog.yaml:
simulation_json:
type: json.JSONDataset
filepath: ${globals:root}/${globals:folders.m_frontend}/simulation_${globals:simulation_id}.json
What are my options to achieve this?Joseph McLeish
05/07/2025, 1:00 PMkedro run
in the directory of the project (after having run uv pip install -r requirements.txt
), I get the following error:
PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
It seems like I need to install Java for this to work, but there's no mention of Java anywhere in the docs, so this doesn't feel like the right option. I'm on Windows running locally on VS Code and didn't encounter any issues with requirements installation). Is anyone able to help with this error? Thanks! 🙂