Devin Bost
11/10/2023, 7:33 PMDevin Bost
11/13/2023, 3:07 PMmodified: langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/AnyscaleConfig.java
modified: langstream-cli/src/main/java/ai/langstream/cli/commands/applications/MermaidAppDiagramGenerator.java
modified: langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java
modified: langstream-core/src/main/java/ai/langstream/impl/resources/AIProvidersResourceProvider.java
modified: langstream-core/src/test/java/ai/langstream/impl/resources/ResourceNodeProviderTest.java
modified: langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KubernetesGenAIToolKitFunctionAgentProviderTest.java
Is there a way we could consolidate these to make it easier to add new configurations? It's easier for people in the community to add new configurations if they only need to add them in one place. It seems like there should be a pattern that we could leverage here.Devin Bost
11/14/2023, 3:50 PMRoberto Prudhomme
11/17/2023, 3:33 PMMarcin Dobosz
12/09/2023, 10:59 AM- name: "Google Drive Source"
id: "google-drive-src"
type: "python-source"
configuration:
className: "application.GoogleDriveFileLangChain"
driveId: "[redacted]"
idleTime: 60
pageSize: 20
environment:
- key: "DRIVE_CREDENTIALS"
value: "${secrets.drive.drive_credentials}"
resources:
disk:
enabled: true
size: 10M
type: "google-drive-src"
While deploying on GKE I'm having an issue with persistent-state/google-drive-src
permission being 755,
owned by `id=0`and user being id=10000
. So no write permission for my custom agent đ I was able to run a dirty workaround, but I'm wondering if I did mess it up somehow. Any ideas why I'm having this issue?Armin Woworsky
12/12/2023, 12:42 PM12:29:57.049 [main] INFO c.d.o.c.sink.state.LifeCycleManager -- CassandraSinkTask starting with config:
Global configuration:
contactPoints: [dev01-dc1-service.genai-langstream.svc.cluster.local]
port: 9042
maxConcurrentRequests: 500
maxNumberOfRecordsInBatch: 32
jmx: true
SSL configuration:
provider: None
hostnameValidation: true
keystore.path:
keystore.password: [hidden]
truststore.path:
truststore.password: [hidden]
openssl.keyCertChain:
openssl.privateKey:
Authentication configuration:
provider: PLAIN
username: dev01-superuser
password: [hidden]
gssapi.keyTab:
gssapi.principal:
gssapi.service: dse
Topic configurations:
name: langstreaminputtopic, codec settings: locale: en_US, timeZone: UTC, timestamp: CQL_TIMESTAMP, date: ISO_LOCAL_DATE, time: ISO_LOCAL_TIME, unit: MILLISECONDS
Table configurations:
{keyspace: langstreamapp5, table: documents, cl: LOCAL_ONE, ttl: -1, nullToUnset: true, deletesEnabled: true, mapping:
filename=value.filename
chunk_id=value.chunk_id
embeddings_vector=value.embeddings_vector
lang=value.language
text=value.text
num_tokens=value.chunk_num_tokens
}
datastax-java-driver configuration:
datastax-java-driver.advanced.metrics.session.cql-requests.refresh-interval=30 seconds
datastax-java-driver.advanced.connection.pool.local.size=4
datastax-java-driver.advanced.metrics.node.cql-messages.highest-latency=35 seconds
datastax-java-driver.basic.request.timeout=30 seconds
datastax-java-driver.basic.load-balancing-policy.local-datacenter=dc1
datastax-java-driver.advanced.protocol.compression=none
datastax-java-driver.advanced.metrics.session.enabled.1=cql-client-timeouts
datastax-java-driver.advanced.metrics.session.enabled.0=cql-requests
12:29:59.240 [main] INFO c.d.o.d.i.c.DefaultMavenCoordinates -- DataStax Java driver for Apache Cassandra(R) (com.datastax.oss:java-driver-core) version 4.15.0
12:29:59.736 [s0-admin-0] INFO c.d.o.d.internal.core.time.Clock -- Using native clock for microsecond precision
12:30:01.042 [s0-admin-0] WARN c.d.o.d.i.c.session.DefaultSession -- [s0] Unexpected error while refreshing schema during initialization, proceeding without schema metadata (CompletionException: java.lang.IllegalArgumentException: Could not parse type name vector<float, 5>)
12:30:01.262 [main] INFO a.langstream.impl.nar.NarFileHandler -- Closing classloader NarFileClassLoader{name='langstream-pulsar-runtime-0.5.0-nar.nar'}
12:30:01.262 [main] INFO a.langstream.impl.nar.NarFileHandler -- Closing classloader NarFileClassLoader{name='langstream-ai-agents-0.5.0-nar.nar'}
12:30:01.262 [main] INFO a.langstream.impl.nar.NarFileHandler -- Closing classloader NarFileClassLoader{name='langstream-agent-s3-0.5.0-nar.nar'}
12:30:01.263 [main] INFO a.langstream.impl.nar.NarFileHandler -- Closing classloader NarFileClassLoader{name='langstream-vector-agents-0.5.0-nar.nar'}
12:30:01.333 [main] INFO a.langstream.impl.nar.NarFileHandler -- Closing classloader NarFileClassLoader{name='langstream-agents-text-processing-0.5.0-nar.nar'}
12:30:01.456 [main] INFO a.l.runtime.agent.AgentRunnerStarter -- Error, NOW SLEEPING
com.datastax.oss.common.sink.ConfigException: Keyspace langstreamapp5 does not exist.
at com.datastax.oss.common.sink.state.LifeCycleManager.getTableMetadata(LifeCycleManager.java:384)
at com.datastax.oss.common.sink.state.LifeCycleManager.lambda$buildInstanceState$8(LifeCycleManager.java:455)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at com.datastax.oss.common.sink.state.LifeCycleManager.lambda$buildInstanceState$9(LifeCycleManager.java:476)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1779)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at com.datastax.oss.common.sink.state.LifeCycleManager.buildInstanceState(LifeCycleManager.java:479)
at com.datastax.oss.common.sink.state.LifeCycleManager.lambda$startTask$0(LifeCycleManager.java:134)
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
at com.datastax.oss.common.sink.state.LifeCycleManager.startTask(LifeCycleManager.java:129)
at com.datastax.oss.common.sink.AbstractSinkTask.start(AbstractSinkTask.java:67)
at ai.langstream.agents.vector.cassandra.CassandraWriter$CassandraVectorDatabaseWriter.initialise(CassandraWriter.java:182)
at ai.langstream.agents.vector.VectorDBSinkAgent.init(VectorDBSinkAgent.java:37)
at ai.langstream.runtime.agent.AgentRunner.lambda$initAgent$12(AgentRunner.java:978)
at ai.langstream.api.runner.code.AgentCodeAndLoader.executeWithContextClassloader(AgentCodeAndLoader.java:50)
at ai.langstream.runtime.agent.AgentRunner.initAgent(AgentRunner.java:971)
at ai.langstream.runtime.agent.CompositeAgentProcessor.init(CompositeAgentProcessor.java:105)
at ai.langstream.runtime.agent.AgentRunner.lambda$initAgent$12(AgentRunner.java:978)
at ai.langstream.api.runner.code.AgentCodeAndLoader.executeWithContextClassloader(AgentCodeAndLoader.java:50)
at ai.langstream.runtime.agent.AgentRunner.initAgent(AgentRunner.java:971)
at ai.langstream.runtime.agent.AgentRunner.initAgent(AgentRunner.java:955)
at ai.langstream.runtime.agent.AgentRunner.run(AgentRunner.java:187)
at ai.langstream.runtime.agent.AgentRunnerStarter.start(AgentRunnerStarter.java:123)
at ai.langstream.runtime.agent.AgentRunnerStarter.main(AgentRunnerStarter.java:58)
at ai.langstream.runtime.Main.main(Main.java:42)
Enrico Olivelli
12/13/2023, 7:16 AMArmin Woworsky
12/13/2023, 8:37 PMSELECT text,embeddings_vector FROM langstreamapp5.documents ORDER BY similarity_cosine(embeddings_vector, CAST(? as FLOAT ARRAY)) DESC LIMIT 20
and the statement fails with:
com.datastax.oss.driver.api.core.servererrors.SyntaxError: line 1:86 mismatched input '(' expecting EOF (... langstreamapp5.documents ORDER BY similarity_cosine[(]...)
Thanks again for your support.Marcin Dobosz
12/15/2023, 11:29 AM60s Warning OOMKilling node/gk3-langstream-cluster-1-pool-2-c939c962-aw9j Memory cgroup out of memory: Killed process 488573 (python3) total-vm:1006260kB, anon-rss:215684kB, file-rss:20864kB, shmem-rss:0kB, UID:10000 pgtables:700kB oom_score_adj:-997
I know that this is not strictly related to LangStream, but been wondering if you have any ideas how this can be solved. The pipeline resources.size
is set to 1
and this sets my cluster requests and limits memory to 512MB
. Increasing the limit does not help đEnrico Olivelli
12/20/2023, 12:06 PMMarcin Dobosz
01/09/2024, 10:39 PMlangstream app ui
. Anyone has some experience to share?David Dieruf
01/25/2024, 12:04 PMUpgrading and Downtime
When you update your application, all impacted agents in the pipeline are restarted at the same time, and if an agent has more than 1 replica, 1 replica is restarted at a time.
In Kubernetes terms, each agent is a StatefulSet, and updating agents behaves like a rolling update of StatefulSets.An then this:
Theproperty allows you to specify theresources
(scale vertically) and thesize
(scale horizontally) for the pipeline.parallelism
topics:
...
resources:
parallelism: 3
size: 2
pipeline:
...
SettingSo if I setto 3 will deploy the pipeline with 3 different replicas. Each replica will use the configuredparallelism
for CPU and memory.size
parallelism: 3
on a pipeline with 5 agents, then I have 5 statefulsets each having 3 pods. Thus when the app is updated each statefulset will be rolled, updating one pod at a time. This ensures there is no downtime because there is always a pod available BUT also has 2 different versions of an agent running together for a very short span.
Has anyone considered a mechanism for canary or blue/green deployments? My pipeline will be seeing a ton of continuous live traffic. Among other things, I'd like to watch for hallucinations. Ideally I could do this by only allowing a small subset of the traffic to new pods before replacing all the pods.David Dieruf
01/25/2024, 4:20 PMEnrico Olivelli
01/29/2024, 7:40 AMMarcin Dobosz
01/30/2024, 10:10 AMLangStream CLI 0.5.6 (d691a66f)
David Dieruf
01/31/2024, 2:19 PMDavid Dieruf
02/01/2024, 12:39 PMDavid Dieruf
02/01/2024, 1:45 PMChris Bartholomew
02/01/2024, 3:35 PM-XX:MaxRAMPercentage=50.0
David Dieruf
02/02/2024, 2:59 PMio.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PATCH at: <https://10.43.0.1:443/apis/langstream.ai/v1alpha1/namespaces/langstream-smart-logs/agents/smart-logs-search-store-tester?fieldManager=fabric8>. Message: failed to create typed patch object (/smart-logs-search-store-tester; <http://langstream.ai/v1alpha1|langstream.ai/v1alpha1>, Kind=Agent): .spec.options: field not declared in schema. Received status: Status(apiVersion=v1, code=500, details=null, kind=Status, message=failed to create typed patch object (/smart-logs-search-store-tester; <http://langstream.ai/v1alpha1|langstream.ai/v1alpha1>, Kind=Agent): .spec.options: field not declared in schema, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=null, status=Failure, additionalProperties={}).
David Dieruf
02/02/2024, 3:23 PMDavid Dieruf
02/02/2024, 3:47 PMDavid Dieruf
02/26/2024, 2:01 PMRecord
of...
HEADERS:
id=1
action_message="something"
action_topic="an-action-topic"
VALUE (string): "yes"
I want to
⢠drop any message with the value "no"
⢠reroute any message with a value of "yes" to a topic name provided in the headers
⢠default to the topic "default-action-topic"
Here's my first attempt:
- name: Dispatch Evaluation Results
id: dispatch-evaluation-results
type: dispatch
output: default-action-topic
configuration:
routes:
- when: fn:contains(value, "no")
action: drop
- when: fn:contains(value, "yes")
destination: headers.action_topic
Enrico Olivelli
02/26/2024, 2:22 PMEnrico Olivelli
02/26/2024, 2:25 PMDavid Dieruf
02/26/2024, 2:57 PMEnrico Olivelli
02/26/2024, 2:58 PMDavid Dieruf
02/26/2024, 4:54 PMDavid Dieruf
02/27/2024, 8:38 PMNicolò Boschi
02/27/2024, 8:39 PM