![]() The code for polling our delayed queue is in the following listing. ![]() By moving items into queues instead of executing them directly, we only need to have one or two of these running at any time (instead of as many as we have workers), so our polling overhead is kept low. If there is an item, we’ll acquire a lock based on the identifier in the item (a fine-grained lock), remove the item from the ZSET, and add the item to the proper queue. If there’s no item, or if the item still needs to wait, we’ll wait a brief period and try again. Because delayed items are only going into a single queue, we can just fetch the first item with the score. Unfortunately, there isn’t a convenient method in Redis to block on ZSETs until a score is lower than the current Unix timestamp, so we need to manually poll. Figure 6.10 A delayed task queue using a ZSET An example of the delayed queue emails to be sent can be seen in figure 6.10. But if we need to delay the item, we add the item to the delayed ZSET. When the queue item is to be executed without delay, we continue to use the old listbased queue. Listing 6.22 The execute_later() function The code to create an (optionally) delayed task can be seen next. For our unique identifier, we’ll again use a 128-bit randomly generated UUID. If the item can be executed immediately, we’ll insert the item into the list queue instead. The score of the item will be the time when the item should be executed. We include the unique identifier in order to differentiate all calls easily, and to allow us to add possible reporting features later if we so choose. We’ll instead use a secondary ZSET as described in the third option, because it’s simple, straightforward, and we can use a lock from section 6.2 to ensure that the move is safe.Įach delayed item in the ZSET queue will be a JSON-encoded list of four items: a unique identifier, the queue where the item should be inserted, the name of the callback to call, and the arguments to pass to the callback. We also can’t create a local waiting list as described in the second option, because if the worker process crashes for an unrelated reason, we lose any pending work items it knew about. We can’t wait/re-enqueue items as described in the first, because that’ll waste the worker process’s time. What if, for any item we wanted to execute in the future, we added it to a ZSET instead of a LIST, with its score being the time when we want it to execute? We then have a process that checks for items that should be executed now, and if there are any, the process removes it from the ZSET, adding it to the proper LIST queue. Normally when we talk about times, we usually start talking about ZSETs.The worker process could have a local waiting list for any items it has seen that need to be executed in the future, and every time it makes a pass through its while loop, it could check that list for any outstanding items that need to be executed.We could include an execution time as part of queue items, and if a worker process sees an item with an execution time later than now, it can wait for a brief period and then re-enqueue the item.Here are the three most straightforward ones: There are a few different ways that we could potentially add delays to our queue items. It’s our job to change or replace our task queue with something that can offer this feature. Rather than putting an item up for sale now, players can tell the game to put an item up for sale in the future. Fake Game Company has decided that they’re going to add a new feature in their game: delayed selling. With list-based queues, we can handle single-call per queue, multiple callbacks per queue, and we can handle simple priorities.
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |