https://langstream.ai logo
Join Slack
Powered by
# dev
  • d

    Devin Bost

    11/10/2023, 7:33 PM
    What would be the recommended path for ingesting documents from an on-prem location?
    e
    • 2
    • 4
  • d

    Devin Bost

    11/13/2023, 3:07 PM
    I'm noticing that there are quite a few touchpoints required to add a new Configuration type. For example, to add a configuration for Anyscale, here are all the places that must be updated:
    Copy code
    modified:   langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/AnyscaleConfig.java
            modified:   langstream-cli/src/main/java/ai/langstream/cli/commands/applications/MermaidAppDiagramGenerator.java
            modified:   langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java
            modified:   langstream-core/src/main/java/ai/langstream/impl/resources/AIProvidersResourceProvider.java
            modified:   langstream-core/src/test/java/ai/langstream/impl/resources/ResourceNodeProviderTest.java
            modified:   langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KubernetesGenAIToolKitFunctionAgentProviderTest.java
    Is there a way we could consolidate these to make it easier to add new configurations? It's easier for people in the community to add new configurations if they only need to add them in one place. It seems like there should be a pattern that we could leverage here.
  • d

    Devin Bost

    11/14/2023, 3:50 PM
    Thoughts on how we could get LangStream to be part of the CNCF landscape? https://landscape.cncf.io/
    e
    • 2
    • 2
  • r

    Roberto Prudhomme

    11/17/2023, 3:33 PM
    Community, does anyone knows about this error that I'm getting after deploy the get ready in Linux: "[boundedElastic-1] ERROR a.l.a.a.s.i.OpenAICompletionService -- Internal error while processing the streaming response com.azure.core.exception.HttpResponseException: Status code 429, "{ "error": { "message": "You exceeded your current quota, please check your plan and billing details.", "type": "insufficient_quota", "param": null, "code": "insufficient_quota" } }" That I need some special account for OpenAI or Azure to consider?
    e
    • 2
    • 4
  • m

    Marcin Dobosz

    12/09/2023, 10:59 AM
    Hi everyone! I'm working on a custom agent for Google Drive. I'm leveraging persistence for keeping timestamp as per https://docs.langstream.ai/building-applications/development-environment/stateful-agents My agent:
    Copy code
    - name: "Google Drive Source"
        id: "google-drive-src"
        type: "python-source"
        configuration:
          className: "application.GoogleDriveFileLangChain"
          driveId: "[redacted]"
          idleTime: 60
          pageSize: 20
          environment:
            - key: "DRIVE_CREDENTIALS"
              value: "${secrets.drive.drive_credentials}"
        resources:
          disk:
            enabled: true
            size: 10M
            type: "google-drive-src"
    While deploying on GKE I'm having an issue with
    persistent-state/google-drive-src
    permission being
    755,
    owned by `id=0`and user being
    id=10000
    . So no write permission for my custom agent 😞 I was able to run a dirty workaround, but I'm wondering if I did mess it up somehow. Any ideas why I'm having this issue?
    e
    n
    • 3
    • 12
  • a

    Armin Woworsky

    12/12/2023, 12:42 PM
    Hi everyone I reworked the example https://github.com/LangStream/langstream/tree/main/examples/applications/s3-source to use a local deployed Cassandra DB instead of the HerdDB. We are running an single-node instance of Cassandra (datastax/dse-server:7.0.0-alpha.2). Database is working as it should. When deploying my reworked app to our Langstream cluster I am getting the following error, one out of the two pods starts to loop. It seems that langstream is not able to process schema metadata if a table with a vector column definition is already present in the DB. If I drop all existing tables with a vector column defined the pod is working as expected. How can I mitigate this behaviour?? Is the DataStax Java Driver to old to handle this new type of column?? Thanks Armin
    Copy code
    12:29:57.049 [main] INFO  c.d.o.c.sink.state.LifeCycleManager -- CassandraSinkTask starting with config:
    Global configuration:
            contactPoints: [dev01-dc1-service.genai-langstream.svc.cluster.local]
            port: 9042
            maxConcurrentRequests: 500
            maxNumberOfRecordsInBatch: 32
            jmx: true
    SSL configuration:
            provider: None
            hostnameValidation: true
            keystore.path: 
            keystore.password: [hidden]
            truststore.path: 
            truststore.password: [hidden]
            openssl.keyCertChain: 
            openssl.privateKey: 
    Authentication configuration:
            provider: PLAIN
            username: dev01-superuser
            password: [hidden]
            gssapi.keyTab: 
            gssapi.principal: 
            gssapi.service: dse
    Topic configurations:
            name: langstreaminputtopic, codec settings: locale: en_US, timeZone: UTC, timestamp: CQL_TIMESTAMP, date: ISO_LOCAL_DATE, time: ISO_LOCAL_TIME, unit: MILLISECONDS
            Table configurations:
                    {keyspace: langstreamapp5, table: documents, cl: LOCAL_ONE, ttl: -1, nullToUnset: true, deletesEnabled: true, mapping:
                          filename=value.filename
                          chunk_id=value.chunk_id
                          embeddings_vector=value.embeddings_vector
                          lang=value.language
                          text=value.text
                          num_tokens=value.chunk_num_tokens
                    }
    datastax-java-driver configuration: 
          datastax-java-driver.advanced.metrics.session.cql-requests.refresh-interval=30 seconds
          datastax-java-driver.advanced.connection.pool.local.size=4
          datastax-java-driver.advanced.metrics.node.cql-messages.highest-latency=35 seconds
          datastax-java-driver.basic.request.timeout=30 seconds
          datastax-java-driver.basic.load-balancing-policy.local-datacenter=dc1
          datastax-java-driver.advanced.protocol.compression=none
          datastax-java-driver.advanced.metrics.session.enabled.1=cql-client-timeouts
          datastax-java-driver.advanced.metrics.session.enabled.0=cql-requests
    
    12:29:59.240 [main] INFO  c.d.o.d.i.c.DefaultMavenCoordinates -- DataStax Java driver for Apache Cassandra(R) (com.datastax.oss:java-driver-core) version 4.15.0
    12:29:59.736 [s0-admin-0] INFO  c.d.o.d.internal.core.time.Clock -- Using native clock for microsecond precision
    12:30:01.042 [s0-admin-0] WARN  c.d.o.d.i.c.session.DefaultSession -- [s0] Unexpected error while refreshing schema during initialization, proceeding without schema metadata (CompletionException: java.lang.IllegalArgumentException: Could not parse type name vector<float, 5>)
    12:30:01.262 [main] INFO  a.langstream.impl.nar.NarFileHandler -- Closing classloader NarFileClassLoader{name='langstream-pulsar-runtime-0.5.0-nar.nar'}
    12:30:01.262 [main] INFO  a.langstream.impl.nar.NarFileHandler -- Closing classloader NarFileClassLoader{name='langstream-ai-agents-0.5.0-nar.nar'}
    12:30:01.262 [main] INFO  a.langstream.impl.nar.NarFileHandler -- Closing classloader NarFileClassLoader{name='langstream-agent-s3-0.5.0-nar.nar'}
    12:30:01.263 [main] INFO  a.langstream.impl.nar.NarFileHandler -- Closing classloader NarFileClassLoader{name='langstream-vector-agents-0.5.0-nar.nar'}
    12:30:01.333 [main] INFO  a.langstream.impl.nar.NarFileHandler -- Closing classloader NarFileClassLoader{name='langstream-agents-text-processing-0.5.0-nar.nar'}
    12:30:01.456 [main] INFO  a.l.runtime.agent.AgentRunnerStarter -- Error, NOW SLEEPING
    com.datastax.oss.common.sink.ConfigException: Keyspace langstreamapp5 does not exist.
    	at com.datastax.oss.common.sink.state.LifeCycleManager.getTableMetadata(LifeCycleManager.java:384)
    	at com.datastax.oss.common.sink.state.LifeCycleManager.lambda$buildInstanceState$8(LifeCycleManager.java:455)
    	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
    	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
    	at com.datastax.oss.common.sink.state.LifeCycleManager.lambda$buildInstanceState$9(LifeCycleManager.java:476)
    	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    	at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1779)
    	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
    	at com.datastax.oss.common.sink.state.LifeCycleManager.buildInstanceState(LifeCycleManager.java:479)
    	at com.datastax.oss.common.sink.state.LifeCycleManager.lambda$startTask$0(LifeCycleManager.java:134)
    	at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
    	at com.datastax.oss.common.sink.state.LifeCycleManager.startTask(LifeCycleManager.java:129)
    	at com.datastax.oss.common.sink.AbstractSinkTask.start(AbstractSinkTask.java:67)
    	at ai.langstream.agents.vector.cassandra.CassandraWriter$CassandraVectorDatabaseWriter.initialise(CassandraWriter.java:182)
    	at ai.langstream.agents.vector.VectorDBSinkAgent.init(VectorDBSinkAgent.java:37)
    	at ai.langstream.runtime.agent.AgentRunner.lambda$initAgent$12(AgentRunner.java:978)
    	at ai.langstream.api.runner.code.AgentCodeAndLoader.executeWithContextClassloader(AgentCodeAndLoader.java:50)
    	at ai.langstream.runtime.agent.AgentRunner.initAgent(AgentRunner.java:971)
    	at ai.langstream.runtime.agent.CompositeAgentProcessor.init(CompositeAgentProcessor.java:105)
    	at ai.langstream.runtime.agent.AgentRunner.lambda$initAgent$12(AgentRunner.java:978)
    	at ai.langstream.api.runner.code.AgentCodeAndLoader.executeWithContextClassloader(AgentCodeAndLoader.java:50)
    	at ai.langstream.runtime.agent.AgentRunner.initAgent(AgentRunner.java:971)
    	at ai.langstream.runtime.agent.AgentRunner.initAgent(AgentRunner.java:955)
    	at ai.langstream.runtime.agent.AgentRunner.run(AgentRunner.java:187)
    	at ai.langstream.runtime.agent.AgentRunnerStarter.start(AgentRunnerStarter.java:123)
    	at ai.langstream.runtime.agent.AgentRunnerStarter.main(AgentRunnerStarter.java:58)
    	at ai.langstream.runtime.Main.main(Main.java:42)
    e
    • 2
    • 25
  • e

    Enrico Olivelli

    12/13/2023, 7:16 AM
    @Nicolò Boschi could we cut a release please ? Now the limit on the size of the messages for Python custom agents has been removed https://github.com/LangStream/langstream/pull/746 (@Marcin Dobosz if you build from the main branch you may verify that the fix solved your problem)
    n
    m
    c
    • 4
    • 37
  • a

    Armin Woworsky

    12/13/2023, 8:37 PM
    Hi all Is there a sample application available showing how to a similarity search on a vector table either on top of AstraDDor Cassandra. I am trying to port the s3-source sample from herddb to cassandra, but struggle to find the right syntax for the select statement. right now I use:
    SELECT text,embeddings_vector FROM langstreamapp5.documents ORDER BY similarity_cosine(embeddings_vector, CAST(? as FLOAT ARRAY)) DESC LIMIT 20
    and the statement fails with: com.datastax.oss.driver.api.core.servererrors.SyntaxError: line 1:86 mismatched input '(' expecting EOF (... langstreamapp5.documents ORDER BY similarity_cosine[(]...) Thanks again for your support.
    n
    e
    • 3
    • 7
  • m

    Marcin Dobosz

    12/15/2023, 11:29 AM
    Hey! I'm running a custom python agent on GKE and having some issues:
    Copy code
    60s         Warning   OOMKilling                               node/gk3-langstream-cluster-1-pool-2-c939c962-aw9j   Memory cgroup out of memory: Killed process 488573 (python3) total-vm:1006260kB, anon-rss:215684kB, file-rss:20864kB, shmem-rss:0kB, UID:10000 pgtables:700kB oom_score_adj:-997
    I know that this is not strictly related to LangStream, but been wondering if you have any ideas how this can be solved. The pipeline
    resources.size
    is set to
    1
    and this sets my cluster requests and limits memory to
    512MB
    . Increasing the limit does not help 😞
    e
    • 2
    • 9
  • e

    Enrico Olivelli

    12/20/2023, 12:06 PM
    @Nicolò Boschi what about cutting a new release with an update of LangChain with support for Google Gemini?
    n
    c
    • 3
    • 6
  • m

    Marcin Dobosz

    01/09/2024, 10:39 PM
    Hey. Is there any reason for a WebSocket (https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) connection with gateway to close after 30 sec? This does not occur if I'm running through
    langstream app ui
    . Anyone has some experience to share?
    e
    n
    • 3
    • 8
  • d

    David Dieruf

    01/25/2024, 12:04 PM
    I am thinking about zero downtime deployments and reading this in the docs:
    Upgrading and Downtime
    When you update your application, all impacted agents in the pipeline are restarted at the same time, and if an agent has more than 1 replica, 1 replica is restarted at a time.
    In Kubernetes terms, each agent is a StatefulSet, and updating agents behaves like a rolling update of StatefulSets.
    An then this:
    The
    resources
    property allows you to specify the
    size
    (scale vertically) and the
    parallelism
    (scale horizontally) for the pipeline.
    topics:
    ...
    resources:
    parallelism: 3
    size: 2
    pipeline:
    ...
    Setting
    parallelism
    to 3 will deploy the pipeline with 3 different replicas. Each replica will use the configured
    size
    for CPU and memory.
    So if I set
    parallelism: 3
    on a pipeline with 5 agents, then I have 5 statefulsets each having 3 pods. Thus when the app is updated each statefulset will be rolled, updating one pod at a time. This ensures there is no downtime because there is always a pod available BUT also has 2 different versions of an agent running together for a very short span. Has anyone considered a mechanism for canary or blue/green deployments? My pipeline will be seeing a ton of continuous live traffic. Among other things, I'd like to watch for hallucinations. Ideally I could do this by only allowing a small subset of the traffic to new pods before replacing all the pods.
    n
    • 2
    • 1
  • d

    David Dieruf

    01/25/2024, 4:20 PM
    Are there any examples of using schemas in a custom python processor agent?
    e
    c
    • 3
    • 5
  • e

    Enrico Olivelli

    01/29/2024, 7:40 AM
    @Nicolò Boschi I have pushed the fix about the annoying error with “docker run” https://github.com/LangStream/langstream/pull/756 could you please kickstart a release ?
    n
    • 2
    • 2
  • m

    Marcin Dobosz

    01/30/2024, 10:10 AM
    Hi all! I'm having a weird issue. While running two applications on a single tenant, I'm having a cross-over between their jobs. The result I'm getting is obviously from another application, despite being addressed on the gateway correctly. Those applications are quite similar in structure. Should I maybe assure that topic names are different? Is this a designed behavior? I'm running
    LangStream CLI 0.5.6 (d691a66f)
    e
    • 2
    • 4
  • d

    David Dieruf

    01/31/2024, 2:19 PM
    I wanted to include my LLM with my python processor. It's a 2gb file. When doing a dry run I am getting a "Maximum upload size exceeded" message. Are there any recommendations to work around this?
    e
    n
    • 3
    • 9
  • d

    David Dieruf

    02/01/2024, 12:39 PM
    I would like to use the gpu in my K8s cluster with my LS application. Is there a way to do this in a pipeline manifest? Maybe something in the resources area?
    e
    n
    • 3
    • 10
  • d

    David Dieruf

    02/01/2024, 1:45 PM
    Is there a place in the docs that tells me what packages and versions are pre-loaded in a python processor environment? My /lib is 247mb at the moment and makes deploying difficult.
    n
    • 2
    • 3
  • c

    Chris Bartholomew

    02/01/2024, 3:35 PM
    Is there any way to pass JVM options to the agent? For example, I would like to change the percentages of RAM used for heap on some agents, like this:
    Copy code
    -XX:MaxRAMPercentage=50.0
    e
    • 2
    • 16
  • d

    David Dieruf

    02/02/2024, 2:59 PM
    Could I get a little help on this error?
    Copy code
    io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PATCH at: <https://10.43.0.1:443/apis/langstream.ai/v1alpha1/namespaces/langstream-smart-logs/agents/smart-logs-search-store-tester?fieldManager=fabric8>. Message: failed to create typed patch object (/smart-logs-search-store-tester; <http://langstream.ai/v1alpha1|langstream.ai/v1alpha1>, Kind=Agent): .spec.options: field not declared in schema. Received status: Status(apiVersion=v1, code=500, details=null, kind=Status, message=failed to create typed patch object (/smart-logs-search-store-tester; <http://langstream.ai/v1alpha1|langstream.ai/v1alpha1>, Kind=Agent): .spec.options: field not declared in schema, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=null, status=Failure, additionalProperties={}).
    e
    n
    • 3
    • 14
  • d

    David Dieruf

    02/02/2024, 3:23 PM
    My python processor is erroring during init. Where should I see the exceptions for debugging?
    n
    • 2
    • 7
  • d

    David Dieruf

    02/02/2024, 3:47 PM
    My random thought of the day -- dependency management in a custom python agent is... interesting. It took all my know-how to line things up.
  • d

    David Dieruf

    02/26/2024, 2:01 PM
    Question about the dispatch agent. Given a
    Record
    of... HEADERS: id=1 action_message="something" action_topic="an-action-topic" VALUE (string): "yes" I want to • drop any message with the value "no" • reroute any message with a value of "yes" to a topic name provided in the headers • default to the topic "default-action-topic" Here's my first attempt:
    Copy code
    - name: Dispatch Evaluation Results
        id: dispatch-evaluation-results
        type: dispatch
        output: default-action-topic
        configuration:
          routes:
            - when: fn:contains(value, "no")
              action: drop
            - when: fn:contains(value, "yes")
              destination: headers.action_topic
    e
    • 2
    • 10
  • e

    Enrico Olivelli

    02/26/2024, 2:22 PM
    Did you try with value eq 'no' With single quotes ?
    d
    c
    • 3
    • 28
  • e

    Enrico Olivelli

    02/26/2024, 2:25 PM
    You can also add a 'log-event' agent before your agent, this way you can log the contents of the message
  • d

    David Dieruf

    02/26/2024, 2:57 PM
    Can I have two agents use the same input topic?
  • e

    Enrico Olivelli

    02/26/2024, 2:58 PM
    of course, you can explicitly set "input" on every agent. They will run in different pods for sure, as each pod starts only 1 Consumer
    ✅ 1
  • d

    David Dieruf

    02/26/2024, 4:54 PM
    Is there an example somewhere of setting an agent (or pipeline) log level?
    n
    • 2
    • 2
  • d

    David Dieruf

    02/27/2024, 8:38 PM
    Any ideas of how to set logging level within a custom python processor?
  • n

    Nicolò Boschi

    02/27/2024, 8:39 PM
    I think you can use the logging library methods