https://flyte.org logo
Join Slack
Powered by
# flyte-connectors
  • g

    glamorous-carpet-83516

    09/29/2024, 7:21 AM
    cc @steep-jackal-21573. would like to know you thoughts too
  • f

    freezing-airport-6809

    09/29/2024, 1:59 PM
    I like the idea of managing agents externally- cc @high-park-82026 Just a question in multi cluster deployments we will have to replicate this config map everywhere Flyte config system can handle multiple configs, but currently propeller does not auto reload the config (this is possible)
    b
    • 2
    • 4
  • s

    steep-jackal-21573

    09/30/2024, 7:44 AM
    I also think this is a great idea! I don't remember details of the agent configuration, but I think if the AgentMetadataService is implemented by a gRPC service, most of configuration could be omitted. Is that correct? A quick question, how will propeller discover the configmaps?
    d
    b
    • 3
    • 6
  • d

    dry-pizza-97077

    09/30/2024, 8:47 AM
    I think that’s a great improvement. My only concern would be around yaml sanity. As long as there is a configmap merge operation, one single team uploading a bad yaml can break all other teams’ config, so propeller should be given a good mechanism to prevent bad agent yamls from making it to the final configmap. Ideally propeller should use the latest known good config for every single agent, and one breaking config yaml for a given agent should be ignored and shouldn’t block other teams from updating some other agent config
    f
    b
    • 3
    • 6
  • a

    alert-oil-1341

    10/01/2024, 12:08 AM
    Are there any
    flyteagent
    dashboards people are using to monitor the agents? We're just using default k8s deployment monitoring now, but would be nice to have something a bit more agent specific
    d
    g
    • 3
    • 4
  • h

    high-park-82026

    10/02/2024, 10:22 PM
    @billowy-church-83438, as others have said, you can mount multiple yaml files as in your example to get propeller to merge them. The merge logic is important to understand though; We use viper to load and merge config files, yaml maps (that's everything except those with
    - <something>
    inside them are considered maps) are merged additvely (existing keys are merged not replaced). Lists, however, are replaced (so if you have a list of 2 elements in one file and 3 elements in another, the last one wins with those 3 elements in the final config). I could be wrong though, I've not touched this code in a while. Can you show me an exact config map you are hoping to use? the one posted above doesn't have any overlap I could see...
  • b

    billowy-church-83438

    10/02/2024, 10:40 PM
    Thanks for the follow up. I can provide the centralized configuration and the minimum changes for a custom agent so you can compare
  • b

    billowy-church-83438

    10/02/2024, 10:40 PM
    The centralized configuration • The propeller reads the config from
    /etc/flyte/config
    (from configmap)
    Copy code
    [flyte@flytepropeller-f6d98fbd8-bqdgc /]$ ls -ls /etc/flyte/config/
    total 0
    0 lrwxrwxrwx 1 root 65534 17 Oct  2 04:54 admin.yaml -> ..data/admin.yaml
    0 lrwxrwxrwx 1 root 65534 25 Oct  2 04:54 agent_service.yaml -> ..data/agent_service.yaml
    0 lrwxrwxrwx 1 root 65534 17 Oct  2 04:54 cache.yaml -> ..data/cache.yaml
    0 lrwxrwxrwx 1 root 65534 19 Oct  2 04:54 catalog.yaml -> ..data/catalog.yaml
    0 lrwxrwxrwx 1 root 65534 19 Oct  2 04:54 copilot.yaml -> ..data/copilot.yaml
    0 lrwxrwxrwx 1 root 65534 16 Oct  2 04:54 core.yaml -> ..data/core.yaml
    0 lrwxrwxrwx 1 root 65534 27 Oct  2 04:54 enabled_plugins.yaml -> ..data/enabled_plugins.yaml
    0 lrwxrwxrwx 1 root 65534 15 Oct  2 04:54 k8s.yaml -> ..data/k8s.yaml
    0 lrwxrwxrwx 1 root 65534 20 Oct  2 04:54 kingkong.yaml -> ..data/kingkong.yaml
    0 lrwxrwxrwx 1 root 65534 18 Oct  2 04:54 logger.yaml -> ..data/logger.yaml
    0 lrwxrwxrwx 1 root 65534 16 Oct  2 04:54 mufn.yaml -> ..data/mufn.yaml
    0 lrwxrwxrwx 1 root 65534 28 Oct  2 04:54 resource_manager.yaml -> ..data/resource_manager.yaml
    0 lrwxrwxrwx 1 root 65534 19 Oct  2 04:54 storage.yaml -> ..data/storage.yaml
    0 lrwxrwxrwx 1 root 65534 21 Oct  2 04:54 task_logs.yaml -> ..data/task_logs.yaml
    • centralized agent-service.yaml
    Copy code
    [flyte@flytepropeller-f6d98fbd8-bqdgc /]$ cat /etc/flyte/config/agent_service.yaml
    plugins:
      agent-service:
        defaultAgent:
          endpoint: flyteagent:8000
          insecure: true
        supportedTaskTypes:
        - sensor
    • centralized enabled_plugins
    Copy code
    [flyte@flytepropeller-f6d98fbd8-bqdgc /]$ cat /etc/flyte/config/enabled_plugins.yaml
    plugins:
      agent-service:
        supportedTaskTypes:
        - sensor
    tasks:
      task-plugins:
        default-for-task-types:
          container: container
          container_array: k8s-array
          echo: echo
          mpi: mpi
          pytorch: pytorch
          ray: ray
          sensor: agent-service
          sidecar: sidecar
          tensorflow: tensorflow
        enabled-plugins:
        - container
        - sidecar
        - k8s-array
        - tensorflow
        - mufn
        - mpi
        - pytorch
        - ray
        - echo
        - agent-service
    h
    • 2
    • 2
  • b

    billowy-church-83438

    10/02/2024, 10:40 PM
    Custom agent configuration • the minimum needed configuration for the custom
    agent-service
    Copy code
    plugins:
          agent-service:
            agents:
              custom-agent:
               endpoint: custom_agent_end_point_FQDN:8000
               insecure: true
            supportedTaskTypes:
            - custom_sensor
            - customtask
    • Custom plugins related (minimum)
    Copy code
    plugins:
          agent-service:
            supportedTaskTypes:
            - customsensor
            - customtask
        tasks:
          task-plugins:
            default-for-task-types:
              customtask: agent-service
              customsensor: agent-service
              # OTHER TASKS are configured by the  centralized team of flyte team
  • f

    freezing-airport-6809

    10/06/2024, 2:04 AM
    Shall we add python task execution on databricks through databricks agent?
    d
    • 2
    • 11
  • a

    alert-oil-1341

    10/07/2024, 2:45 PM
    We're looking to add locking capabilities to our workflows(mostly for efficiency gains) like so:
    Copy code
    @workflow
    def locking_wf() -> None:
        acquired, lock_handle = acquire_lock(lock_handle="my_lock", duration_in_seconds=10)
    
        cond = (
            conditional("test lock")
            .if_(acquired.is_true())
            .then(task1(lock_handle))
            .else_()
            .then(task2())
        )
    
        cond > release_lock(lock_handle=lock_handle)
    Where
    acquire_lock
    and
    release_lock
    are Agents responsible for maintaining the lock. Curious if others have similar use-cases and or implementations?
    d
    f
    • 3
    • 9
  • a

    alert-oil-1341

    10/09/2024, 5:26 PM
    What's the best way to introduce a non-backward compatible change to an agent? For instance, say the outputs are changed. Our thought is that we'll have to do it at a package level, i.e.
    my.agent
    and
    my.v2.agent
    and also set the task names uniquely, i.e.
    my_task
    and
    my_task_v2
    . Any other thoughts?
    d
    • 2
    • 1
  • b

    brief-window-55364

    10/21/2024, 2:05 PM
    Hey friends cc: @damp-lion-88352. I'm trying the agent stuff out and I've defined my agent configs like so:
    Copy code
    apiVersion: v1
    data:
      config.yaml: |
        plugins:
          agent-service:
            agents:
              mockAgent:
                endpoint: 'dns:///flyte-mock-agent'
                insecure: true
                defaultServiceConfig: '{"loadBalancingConfig": [{"round_robin":{}}]}'
                timeouts:
                  ExecuteTaskSync: 300s
                  GetTask: 100s
                defaultTimeout: 30s
    kind: ConfigMap
    Note: the lack of
    defaultAgent
    But my executions are failing with
    [1/1] currentAttempt done. Last Error: USER::failed to get grpc connection with error: failed to exit idle mode: passthrough: received empty target in Build()
    which seems like it's not picking up the config correctly (even tough I know it's doing the listing just fine) . Do I need to specific the defaultAgent field?
    d
    • 2
    • 12
  • b

    brief-window-55364

    10/22/2024, 9:31 AM
    Hey folks, we found a problem on how the SyncAgent grpc client is streaming into the grpc server. I've proposed a fix here https://github.com/flyteorg/flyte/pull/5884 issue: https://github.com/flyteorg/flyte/issues/5885. cc: @damp-lion-88352
    f
    d
    • 3
    • 11
  • d

    damp-lion-88352

    10/24/2024, 4:46 AM
    Rafael and I just discussed the idea of using
    ContainerError
    in Flytekit's agent service, and I completely agree with the suggestion. I'd love to hear what others think about it. For reference: - ContainerError: https://github.com/flyteorg/flyte/blob/master/flyteidl/protos/flyteidl/core/errors.proto#L13-L24 - Agent service: https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/agent_service.py
    b
    a
    • 3
    • 4
  • a

    alert-oil-1341

    11/08/2024, 9:12 PM
    In the
    AsyncAgentExecutorMixin
    code, the delete method is scheduled in the signal_handler, which is only called when a signal SIGINT is emitted. This is preventing resource clean up as part of normal execution (without any interruptions) and is preventing one of our test cases from progressing. What are thoughts on simply making this a final step in the execution, ensuring we get to that code every time? Something like:
    Copy code
    def execute(self: PythonTask, **kwargs) -> LiteralMap:
        ctx = FlyteContext.current_context()
        ss = ctx.serialization_settings or SerializationSettings(ImageConfig())
        output_prefix = ctx.file_access.get_random_remote_directory()
    
        from flytekit.tools.translator import get_serializable
    
        task_template = get_serializable(OrderedDict(), ss, self).template
        self._agent = AgentRegistry.get_agent(task_template.type, task_template.task_type_version)
    
        resource_meta = asyncio.run(
            self._create(task_template=task_template, output_prefix=output_prefix, inputs=kwargs)
        )
    
        try:
            # Main execution logic
            resource = asyncio.run(self._get(resource_meta=resource_meta))
    
            if resource.phase != TaskExecution.SUCCEEDED:
                raise FlyteUserException(f"Failed to run the task {self.name} with error: {resource.message}")
    
            # Process outputs
            if task_template.interface.outputs and resource.outputs is None:
                local_outputs_file = ctx.file_access.get_random_local_path()
                ctx.file_access.get_data(f"{output_prefix}/outputs.pb", local_outputs_file)
                output_proto = utils.load_proto_from_file(literals_pb2.LiteralMap, local_outputs_file)
                return LiteralMap.from_flyte_idl(output_proto)
    
            if resource.outputs and not isinstance(resource.outputs, LiteralMap):
                return TypeEngine.dict_to_literal_map(ctx, resource.outputs)
    
            return resource.outputs
        finally:
            # Cleanup logic that runs after the try block, even if it returns or raises an exception
            try:
                asyncio.run(self._agent.delete(resource_meta=resource_meta))
            except Exception as e:
                logger.error(f"Error during resource cleanup: {e}")
    d
    • 2
    • 27
  • d

    damp-lion-88352

    11/13/2024, 1:31 PM
    @brief-window-55364 wants to add a new field for the agent's error message. He suggests creating a new error IDL specifically for the agent and thinks we should avoid using
    ContainerError
    for this. The reason is that most agent failures are due to backend API errors, not issues within its container. What do you all think? cc @glamorous-carpet-83516 https://github.com/flyteorg/flyte/pull/5916#pullrequestreview-2402463075
    b
    g
    • 3
    • 17
  • d

    dry-egg-91175

    11/22/2024, 8:35 PM
    Hi, we need some clarification on Flyte Agents. We recently wrote a custom Flyte Agent to create a RayJob on our GKE cluster. Following the examples, we added a custom task (inheriting from
    PythonFunctionTask
    ) that maps to the custom agent and passes the parameters to the agent via
    get_custom
    method. We haven't yet deployed the Agent Service, but we're confused that won't the Flytepropeller create a container for the custom task inheriting from
    PythonFunctionTask
    ? Do we even need the custom task?
    g
    • 2
    • 3
  • d

    damp-lion-88352

    11/28/2024, 4:22 PM
    Hi, all I just wrote a template for building your own custom agent in flyte. if this bothered you before, do you want to take a look and give me some feedback to make this better? I'll do it next week, happy thanks giving day <3 https://github.com/Future-Outlier/flyte-custom-agent-template/tree/main
    a
    f
    • 3
    • 6
  • g

    gray-ram-51379

    12/03/2024, 1:02 AM
    hey everyone, we are seeing a lot of re-starts in agent pod that do not happen to any of our other pods, we are curious if you guys have seen this before or have any idea why this might happen?
    Copy code
    yaou@yaou-mn1 ~ % k get pods
    NAME                               READY   STATUS    RESTARTS       AGE
    ambassador-69d4998797-2rf9r        1/1     Running   0              4d23h
    datacatalog-864796d889-ngqx2       1/1     Running   0              4d23h
    flyte-pod-webhook-76d88c67-6x7rb   1/1     Running   0              4d23h
    flyteadmin-5cf46b95c7-8dnnk        1/1     Running   0              4d23h
    flyteagent-56bd59b5d4-9vjcg        1/1     Running   37 (33m ago)   6d3h
    flyteconsole-69c4c8795d-hnhg8      1/1     Running   0              67d
    d
    • 2
    • 14
  • c

    colossal-nightfall-74781

    03/04/2025, 5:32 PM
    Has anyone successfully changed the polling rate of a custom agent? I tried different configurations as specified here but I haven't been able to. The agent
    get
    method still runs every 10sec. Any advice would be appreciated
    g
    • 2
    • 16
  • c

    colossal-nightfall-74781

    03/18/2025, 2:17 PM
    Is there a way to save state for an agent task? I believe the ResourceMetadata are immutable and won't change after calling the agent's create method. We currently create a custom metastore to save attributes for individual tasks and update them on each invocation. It would be great if we could alter the resource metadata instead.
    g
    • 2
    • 5
  • a

    alert-oil-1341

    04/01/2025, 2:42 PM
    After upgrading from 1.13.x to 1.14.1, we've noticed that our Async Agents seem to call
    get
    sometimes in rapid succession, causing issues w/ database lookups. We expected the get to be called at some interval, but that doesn't seem to be the case always. Is this interval configurable and if so, how?
    g
    a
    • 3
    • 13
  • c

    colossal-nightfall-74781

    04/17/2025, 3:52 PM
    I noticed that long-running agents are no longer called after ~1.5 days. Flyte still shows the active tasks, but there are no more logs for the specific agent task instance. Is this expected behavior?
    f
    • 2
    • 6
  • f

    freezing-airport-6809

    04/17/2025, 4:55 PM
    has renamed the channel from "flyte-agents" to "flyte-connectors"
  • f

    freezing-airport-6809

    04/17/2025, 4:55 PM
    set the channel topic: Previously called agents
  • h

    hallowed-toothbrush-42565

    04/29/2025, 8:58 PM
    Is there a way to pass Flyte context to Flyte connectors? I see that the topic was discussed last year, and I'm curious if there have been any updates since then. (We are building our connectors with flytekit==1.15.3, and the context is still empty.) Specifically, we are trying to retrieve the task identifier (which should be
    task_id=TASK:{project}:{domain}:{launch_plan}.{workflow}.{task}:{version}
    ) from the context, but calling
    current_context()
    inside the connector returns
    task_id=TASK:local:local:local:local
    . If this is not supported yet, is there a recommended workaround?
    g
    g
    • 3
    • 5
  • a

    alert-oil-1341

    05/27/2025, 9:33 PM
    We use conditional blocks a lot, and one thing we're considering doing is creating simple utility connectors that do things like date comparisons and such to prevent spinning up that check in a pod. This feels weird, and we're just curious if there's another approach? Basically, there seems to be a group of super simple pieces of work like this that would benefit from not having to spin up infra.
    h
    g
    • 3
    • 4
  • c

    colossal-nightfall-74781

    06/25/2025, 9:16 PM
    Is there a way to use Decks in Connectors? My connector polls a long running job in a different backend and I like to create a Streaming Deck that shows the progress of the job. Would I have to override the
    async _get
    method of
    AsyncConnecorExecutorMixin
    and use the resource output as input for my deck? If I invoke Deck.publish() inside that method, will it show in the Flyte Console?
    g
    • 2
    • 3
  • a

    alert-oil-1341

    08/06/2025, 3:31 PM
    We've got a fun one... We've created a custom
    SyncAgentBase
    agent w/ Flytekit 1.15.4 and ran about 1000 workflows. Of those, a handful gave us this error:
    Copy code
    currentAttempt done. Last Error: USER::failed to create task from agent with rpc error: code = Internal desc = failed to create edge_artifact task with error:
     Trace:
    
        Traceback (most recent call last):
          File "/usr/local/lib/python3.11/site-packages/flytekit/extend/backend/agent_service.py", line 102, in wrapper
            res = await func(self, request, context, *args, **kwargs)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
          File "/usr/local/lib/python3.11/site-packages/flytekit/extend/backend/agent_service.py", line 121, in CreateTask
            agent.create,
            ^^^^^^^^^^^^
        AttributeError: 'EdgeArtifactAgent' object has no attribute 'create'
    
    Message:
    
        AttributeError: 'EdgeArtifactAgent' object has no attribute 'create'.
    What's weird is that this is in the
    AsyncAgentBase
    code path. So I'm wondering if there's an issue w/ looking up the agent in the registry or something? Or why we get this strange behavior.
    f
    • 2
    • 8