https://flyte.org logo
Join Slack
Powered by
# databricks-integration
  • h

    high-park-82026

    07/06/2022, 10:14 PM
    set the channel description: Integration w/ Databricks
  • f

    flaky-kite-6530

    07/07/2022, 12:35 AM
    Hello all. Chris from databricks here. Nothing official yet but just scoping.
    🦜 3
    f
    • 2
    • 1
  • f

    freezing-airport-6809

    09/12/2022, 10:47 PM
    cc @abundant-hamburger-66584
  • f

    freezing-airport-6809

    09/24/2022, 2:08 PM
    @handsome-noon-32363 can you paste your question here
  • h

    handsome-noon-32363

    09/25/2022, 6:06 AM
    i wrote the following code to integrate with flyte and databricks, could you please help me to resolve it import datetime import random,json from operator import add import flytekit from flytekit import Resources, task, workflow from pyspark import SparkContext from operator import add from flytekitplugins.spark import Spark from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator @task( task_config=DatabricksRunNowOperator(task_id='notebook_run', tasks=[{ "spark.driver.memory": "1000M", "spark.executor.memory": "1000M", "spark.executor.cores": "1", "spark.executor.instances": "2", "spark.driver.cores": "1", "new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2}, "notebook_task": {"notebook_path": "/Users/airflow@example.com/PrepareData"}, }])) def hello_spark(partitions: int) -> float: print("Starting Spark with Partitions: {}".format(partitions)) # n = 100000 * partitions sess = flytekit.current_context().spark_session
    f
    • 2
    • 4
  • u

    user

    09/28/2022, 7:26 AM
    This message was deleted.
    h
    h
    h
    • 3
    • 10
  • h

    handsome-noon-32363

    10/04/2022, 11:32 AM
    hi , i need help to write the databricks workflow, while the following code is working fine from dataclasses import dataclass from typing import Dict, Optional, Type from flytekit.configuration import SerializationSettings from flytekit.extend import SQLTask from flytekit.models import task as _task_model from flytekit.types.schema import FlyteSchema _SERVER_HOSTNAME_FIELD = "server_hostname" _HTTP_PATH = "http_path" _ACCESS_TOKEN = "access_token" _WAREHOUSE_FIELD = "warehouse" @dataclass class DatabricksConfig(object): server_hostname: Optional[str] = None http_path: Optional[str] = None access_token: Optional[str] = None warehouse: Optional[str] = None class DatabricksTask(SQLTask[DatabricksConfig]): _TASK_TYPE = "databricks" def __init__( self, name: str, query_template: str, task_config: Optional[DatabricksConfig] = None, inputs: Optional[Dict[str, Type]] = None, output_schema_type: Optional[Type[FlyteSchema]] = None, **kwargs, ): outputs = { "results": output_schema_type, } if task_config is None: task_config = DatabricksConfig() super().__init__( name=name, task_config=task_config, query_template=query_template, inputs=inputs, outputs=outputs, task_type=self._TASK_TYPE, **kwargs, ) self._output_schema_type = output_schema_type def get_config(self, settings: SerializationSettings) -> Dict[str, str]: return { _SERVER_HOSTNAME_FIELD: self.task_config.server_hostname, _HTTP_PATH: self.task_config.http_path, _ACCESS_TOKEN: self.task_config.access_token, _WAREHOUSE_FIELD: self.task_config.warehouse, } def get_sql(self, settings: SerializationSettings) -> Optional[_task_model.Sql]: with sql.connect(server_hostname = "XXXXXXX", http_path ="XXXXXX", access_token = "XXXXXXX") as connection: sql = _task_model.Sql(statement=self.query_template, dialect=_task_model.Sql.Dialect.ANSI) return sql
  • h

    handsome-noon-32363

    10/04/2022, 12:45 PM
    Following is my flyte wf file, can anyone help me to understand where i am doing wrong import typing import pandas as pd import numpy as np #from databricks import sql import os from flytekit import task, workflow import flyte_db_plugin as fdp #@task def generate_normal_df() -> pd.DataFrame: with fdp.DatabricksTask("","").get_sql("select * from student") as con: # result=con.sql_task("select * from student") return con # print(result) # result= pd.DataFrame(i,columns=[""]) # return result # for row in result: #print(row) # return row @task def compute_stats(df:pd.DataFrame) -> pd.DataFrame: return df @workflow def wf(): return compute_stats(df=generate_normal_df())
  • h

    handsome-noon-32363

    10/06/2022, 9:06 AM
    receiving the following error on UI, while it is working on local system [1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes. [fd93672939cc74521a93-n0-0] terminated with exit code (1). Reason [Error]. Message: , line 728, in exec_module File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed File "/root/myfly.py", line 7, in <module> import flyte_db_plugin as fdp ModuleNotFoundError: No module named 'flyte_db_plugin' Traceback (most recent call last): File "/usr/local/bin/pyflyte-fast-execute", line 8, in <module> sys.exit(fast_execute_task_cmd()) File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1130, in call return self.main(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1055, in main rv = self.invoke(ctx) File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1404, in invoke return ctx.invoke(self.callback, **ctx.params) File "/usr/local/lib/python3.7/site-packages/click/core.py", line 760, in invoke return __callback(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/flytekit/bin/entrypoint.py", line 507, in fast_execute_task_cmd subprocess.run(cmd, check=True) File "/usr/local/lib/python3.7/subprocess.py", line 512, in run output=stdout, stderr=stderr) subprocess.CalledProcessError: Command '['pyflyte-execute', '--inputs', 's3://dev-wm-max-ml-flyte-us-east-1/metadata/propeller/cloudops-max-flyte-demo-development-fd93672939cc74521a93/n0/data/inputs.pb', '--output-prefix', 's3://dev-wm-max-ml-flyte-us-east-1/metadata/propeller/cloudops-max-flyte-demo-development-fd93672939cc74521a93/n0/data/0', '--raw-output-data-prefix', 's3://dev-wm-max-ml-flyte-us-east-1/jh/fd93672939cc74521a93-n0-0', '--checkpoint-path', 's3://dev-wm-max-ml-flyte-us-east-1/jh/fd93672939cc74521a93-n0-0/_flytecheckpoints', '--prev-checkpoint', '""', '--dynamic-addl-distro', 's3://dev-wm-max-ml-flyte-us-east-1/z4/cloudops-max-flyte-demo/development/GYDH57ORDPDZUM46RHB6UB2UNE======/scriptmode.tar.gz', '--dynamic-dest-dir', '/root', '--resolver', 'flytekit.core.python_auto_container.default_task_resolver', '--', 'task-module', 'myfly', 'task-name', 'generate_normal_df']' returned non-zero exit status 1. .
    f
    h
    • 3
    • 6
  • h

    handsome-noon-32363

    10/11/2022, 11:37 PM
    Hi team, our databricks plugin is working fine in localhost but when i am trying in our development environment it is showing us the following error grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "failed to connect to all addresses" debug_error_string = "{"created":"@1665531182.235300256","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3217,"referenced_errors":[{"created":"@1665531182.235299134","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":165,"grpc_status":14}]}" can anyone please help me to solve this error
  • h

    handsome-noon-32363

    10/11/2022, 11:38 PM
    i changed the endpoint url in our config.yaml file
  • h

    handsome-noon-32363

    10/12/2022, 12:05 AM
    ok this is working now
  • h

    handsome-noon-32363

    10/27/2022, 8:38 AM
    Hi team I created the ECR image of our plugin and trying to deploy that, than following error is coming [1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes. [fa55da184477f4208bc9-n0-0] terminated with exit code (127). Reason [ContainerCannotRun]. Message: OCI runtime create failed: container_linux.go380 starting container process caused: exec: "pyflyte-fast-execute": executable file not found in $PATH: unknown. Can anyone help me to retrieve the UI.
    h
    • 2
    • 1
  • h

    handsome-noon-32363

    10/27/2022, 1:35 PM
    This is our Dockerfile FROM python:3.8-slim-buster WORKDIR /root ENV VENV /opt/venv ENV LANG C.UTF-8 ENV LC_ALL C.UTF-8 ENV PYTHONPATH /root RUN apt-get update && apt-get install -y build-essential # Install the AWS cli separately to prevent issues with boto being written over RUN pip3 install awscli # Similarly, if you're using GCP be sure to update this command to install gsutil # RUN curl -sSL https://sdk.cloud.google.com | bash # ENV PATH="$PATH:/root/google-cloud-sdk/bin" ENV VENV /opt/venv # Virtual environment RUN python3 -m venv ${VENV} ENV PATH="${VENV}/bin:$PATH" # Install Python dependencies COPY ./requirements.txt /root COPY ./flyte_db_plugin.py /root COPY ./myfly.py /root RUN pip install -r /root/requirements.txt # Copy the actual code COPY . /root # This tag is supplied by the build script and will be used to determine the version # when registering tasks, workflows, and launch plans ARG tag ENV FLYTE_INTERNAL_IMAGE $tag
    t
    • 2
    • 15
  • f

    freezing-airport-6809

    05/14/2023, 9:23 PM
    @freezing-airport-6809 has left the channel
  • f

    freezing-airport-6809

    05/14/2023, 9:23 PM
    archived the channel