https://discord.cloudflare.com logo
Join Discord
Powered by
# queues-beta
  • g

    Gary Somerhalder

    05/22/2023, 5:44 PM
    Id like to try queues
  • d

    Dani Foldi

    05/22/2023, 5:45 PM
    happy to brainstorm ideas, if you can share more of your use case - we can give you all possible options, but hard to recommend one without knowing what you want to achieve 😅
  • g

    Gary Somerhalder

    05/22/2023, 5:45 PM
    Right on
  • g

    Gary Somerhalder

    05/22/2023, 5:45 PM
    Can I dm you?
  • s

    Skye

    05/22/2023, 5:45 PM
    I'd recommend you go through the docs at https://developers.cloudflare.com/queues if you haven't already
  • g

    Gary Somerhalder

    05/22/2023, 5:46 PM
    Thanks Skye!
  • g

    Gary Somerhalder

    05/22/2023, 5:46 PM
    How are retries handled
  • s

    Skye

    05/22/2023, 5:46 PM
    They go into that in depth at !
  • d

    Dani Foldi

    05/22/2023, 5:46 PM
    sure
  • g

    Gary Somerhalder

    05/22/2023, 5:47 PM
    You all rock!
  • r

    Rehman Afzal

    05/23/2023, 4:53 AM
    I am creating chat application having different users and different functionalities like personal messages, group messages etc. Can i use cloudflare queues to send and receive messages or it is not recommended? Please guide me
  • m

    mizchi

    05/23/2023, 2:27 PM
    I have a question about Queues and Durable Objects: I want to bloadcast data received by Queues to a Durable Objects WebSocket, but I don't know how. Durable Objects cannot receive queues directly at the moment (am I right?). Here is the code I am trying to write.
    Copy code
    ts
    import { Env } from "./types";
    
    // websocket handler durable objects
    export { TestObject } from "./test_object";
    
    export default {
      async fetch(
        request: Request,
        env: Env,
        _ctx: ExecutionContext
      ): Promise<Response> {
        return await handleErrorsWithUgrade(request, async () => {
          if (request.url.endsWith("/connect")) {
            const id = env.test.idFromName("test");
            const testObject = env.test.get(id);
            return testObject.fetch(request);
          }
          return Response.json({ ok: true });
        });
      },
      async queue(batch: MessageBatch<Error>, env: Env): Promise<void> {
        let messages = JSON.stringify(batch.messages);
        const id = env.test.idFromName("test");
        // Here: I want to connect existed socket and emit(log)
        // const testObject = env.test.get(id);
        // testObject.connect() ????
    
        console.log(`consumed from our queue: ${messages}`);
      },
    };
    
    async function handleErrorsWithUgrade(request: Request, func: any) {
      try {
        return await func();
      } catch (err) {
        if (request.headers.get("Upgrade") === "websocket") {
          const pair = new WebSocketPair();
          pair[1].accept();
          // @ts-ignore
          pair[1].send(JSON.stringify({ error: err.stack }));
          pair[1].close(1011, "Uncaught exception during session setup");
          return new Response(null, { status: 101, webSocket: pair[0] });
        }
        // @ts-ignore
        return new Response(err.stack, { status: 500 });
      }
    }
    s
    • 2
    • 1
  • y

    Yaron

    05/23/2023, 9:20 PM
    Hi, I love the idea of Queues in cloudflare, I wrote one, and I can't get it to work, or at least can't test it successfully I wrote a producer like so: [[queues.producers]] queue = "twitter-mentions-queue" binding = "TWITTER_MENTIONS_QUEUE"
    Copy code
    const queueTweet = async (tweetId: string, prompt: string, env: any) => {
      const log = {tweetId, prompt};
      await env.TWITTER_MENTIONS_QUEUE.send(log);
      console.log('added to queue', log) // Console log is fired successfully
    }
    And a consumer like this [[queues.consumers]] queue = "twitter-mentions-queue" max_batch_size = 10 max_batch_timeout = 5
    Copy code
    async queue(batch: MessageBatch<Error>, env: Environment): Promise<void> {
        for (const message of batch.messages) {
          console.log('Received', message); // This never runs, looked for it in tail, also the ui shows zero queue messages always
        }
      },
    Any ideas why I can't see messages?
  • e

    emilio

    05/23/2023, 9:29 PM
    Looks like a type error in your consumer? Try changing the type of
    batch
    to just
    MessageBatch
    instead of
    MessageBatch<Error>
  • y

    Yaron

    05/23/2023, 10:53 PM
    Thanks Emilio, thats helpful, but it didn't solve it. Should I be able to see messages if the provider is running on dev? (I tested on the cloud as well) Also, about the consumer: - The consumer also has fetch implemented along side queue, is that ok? - Will I see the queue event if I am doing wrangler tail on the consumer? UPDATE Ok managed to get it working, Guess I couldn't add items to the queue from the wrangler dev
  • m

    mizchi

    05/24/2023, 5:06 AM
    Yes, you are right, the mold there is wrong. But it does not matter at runtime in this case.(fixed)
  • r

    rohanbabuam

    05/24/2023, 6:30 AM
    Hi, I'm trying to implement a consumer queue which consumes messages on-demand rather than automatically. I want to make it such that messages are consumed only when requested by a worker. Is this possible?
  • e

    elithrar

    05/24/2023, 12:30 PM
    On the roadmap: https://blog.cloudflare.com/messages-at-your-speed-with-concurrency-and-explicit-acknowledgement/
  • r

    rohanbabuam

    05/24/2023, 2:39 PM
    Great. Thanks @elithrar.
  • l

    Larry

    05/24/2023, 10:00 PM
    Why do you want to use a websocket for this? Why not just use the normal approach to communicating with the DO from inside your queue() handler? Assuming you have a good reason, I can see one concrete problem with the code you have above and another likely problem. The concrete problem is that
    const testObject
    is scoped to just the worker fetch handler. You can't even see it inside your queue handler. The second problem, that I suspect you will have is that even if you declare testObject before the start of the main worker fetch() handler so it's in scope, you have no guarantee that the worker instance that created the connection will be the same one as the one that's called by cloudflare via your queue() handler. It's best to think of these two pieces of code as completely seperate. Cloudflare will grabe them and instantiate them as needed. Once they are done running, they can disappear. If you really have a good reason to use WebSockets from your queue() handler, you'll have to create the connection to the DO from inside there, but the only reason I can think of for doing that is that it might be slightly more efficient to make multiple round trips to a DO this way. For a single call though, the traditional way of talking to a DO is going to be more efficient because you don't have the handshake overhead of establishing the WebSocket connection. That said, it would be a premature optimization to even try this.
  • m

    mizchi

    05/25/2023, 2:07 PM
    The reason I wanted to use websockets is because I wanted to prototype real-time notifications to users using Cloudflare Queues; imagine something like GitHub's notifications, which might be easier to understand. The architecture I had in mind was to send queues from an external server, distribute them to the necessary users at the front end of the DO, and send them to an already established websocket connection with the browser. However, with a batch interval of a few seconds for Queues, it would not be any more real-time than that, and it may not be practical to manage all user notifications with a single Queue.
  • m

    mizchi

    05/25/2023, 2:13 PM
    > The second problem, that I suspect you will have is that even if you declare testObject before the start of the main worker fetch() handler so it's in scope, you have no guarantee that the worker instance that created the connection will be the same one as the one that's called by cloudflare via your queue() handler. Yes, that was my main concern.
  • m

    mizchi

    05/25/2023, 2:16 PM
    > the traditional way of talking to a DO is going to be more efficient because you don't have the handshake overhead of establishing the WebSocket connection It certainly seems like it would work in theory, but I don't know the specific implementation pattern.
  • l

    lvmcg

    05/26/2023, 9:50 AM
    Hello there! Sorry if my msg is in the wrong place, I have never used this before. I am working with cloudlfare workers queues, and I find that if I test triggering a queued task that takes increasingly long to run, then if it runs in less than 30 seconds all is well, but if it takes longer than 30 seconds, it fails and I see a CPU limit exceeded error in the dashboard. However, according to the docs, a worker queue invocation should be able to run for up to 15 minutes if on the workers unbound paid plan, which I am. Is there any way to confirm this with cloudflare, and/or has anyone else tested this and found that the queued tasks fail at 30 seconds anyway? I am assuming the 15 minutes means CPU time, as the documentation does not make sense if it means anything else. Thanks for any help you can provide! https://developers.cloudflare.com/workers/platform/limits/#cpu-runtime
  • r

    rodbs

    05/26/2023, 9:59 PM
    I want to generate TTS with Azure Cognitive Services. I send a text, it generates a mp3 and eventually you upload the file to R2. It works on Node. I've tried to implement it with a standard Worker using promises and
    waituntil
    and it fails. Not user the reason; it might because a Worker is not capable of handle it ... https://discord.com/channels/595317990191398933/1100814451151741028/1101567116890620015 My question is: would it work with a Queue? or because underlying it'' s executing a Worker I'll experience the same issues?
  • k

    kian

    05/26/2023, 10:01 PM
    If it takes more than 30 seconds then
    waitUntil
    wouldn't be suitable
  • k

    kian

    05/26/2023, 10:01 PM
    by that I mean, waitUntil will just close 30 seconds after a response is returned (currently)
  • r

    rodbs

    05/26/2023, 10:02 PM
    No, it doesn't take more than 30 sec. That's why I don't understand what's going on
  • Queue processing stops locally
    v

    Vâjhâtz

    12/18/2023, 3:27 PM
    Yeah, I have logging for each batch method call and it just stops outputting the log into the terminal. Yeah, I'll see about opening a bug report. Thanks for the answer 🙂
    • 1
    • 1
  • Some updates for @Queues users:
    t

    treedor

    12/24/2023, 1:59 PM
    Hi @elithrar do you think the
    delay
    feature will be out sometime soon? Similar timeline as HTTP pull?
    q
    e
    • 3
    • 3