Tasks
Concept
So you want to run a background task running at some specified interval? Thankfully, that's a common thing to do, whether you're working with some API or handling some background logic don't worry, the Pycord tasks extension is here to make life easier for you!
With the tasks extension, you can make background tasks without worrying about the asyncio hell and mind bogglers like "what if my internet dies" and "how do I handle reconnecting" with the added benefit of a lot of useful additions that are one line of code away!
Example Usage
import discord
from discord.ext import tasks
bot = discord.Bot()
@bot.event
async def on_ready():
very_useful_task.start()
@tasks.loop(seconds=5)
async def very_useful_task():
print('doing very useful stuff.')
bot.run("TOKEN")
If you do run this code in your terminal you should see something like this after about 15 seconds:
doing very useful stuff.
doing very useful stuff.
doing very useful stuff.
For a more useful example here's a task in a cog context ripped straight from the docs:
from discord.ext import tasks, commands
class MyCog(commands.Cog):
def __init__(self):
self.index = 0
self.printer.start()
def cog_unload(self):
self.printer.cancel()
@tasks.loop(seconds=5)
async def printer(self):
print(self.index)
self.index += 1
As you'd expect this will increment a number and print it out every 5 seconds like so:
0
# 5 seconds later
1
# 5 more seconds later
2
# ...
Tasks
Now let's get into the nitty-gritty of tasks.
As you've seen tasks can work in both outer scope and class contexts and the handle is roughly the same, you define a task using the tasks.loop
decorator and use the start
method to start it, the difference is you add the self
argument when in a class context so since most people have all their bot's code in Cogs all the following code blocks will be in a Cog context to make it easy to copy it then apply whatever modifications you might need!
Creating a task
A look at discord.ext.tasks.loop
's definition in the documentation reveals that:
- As you've seen before it expects the time between each execution, that however doesn't have to be in seconds as
there are
seconds
,minutes
andhours
parameters to make it easier for us, they can also be floats in case you want ultra-precision. - You can also pass in a
datetime.time
object or a sequence of them in thetime
parameter which will make it run at the specified time(s). - You can pass in how many times a loop runs before it stops by passing in the
count
parameter, you can useNone
to make it run forever which is also the default. - The loop function must be a coroutine.
These are all really useful, and they aren't the only parameters so if you want to know more about them check out the docs!
Attributes
A task has the following attributes:
current_loop
: The current loop the task is on.hours
,minutes
,seconds
: attributes that represent the time between each execution.time
: A list of datetime.time objects that represent the times the task will run, returnsNone
if no datetime objects were passed.next_iteration
: Adatetime.datetime
object that represents the next time the next iteration of the task will run, can returnNone
if the task stopped running.
These attributes serve as a really powerful asset to get info about your loop.
Example
Now let's create a cog that handles a leaderboard we have in our bot using Tasks then explain what we did after that and also provide a refresher of how slash commands groups work in cogs.
For the sake of this example let's pretend that we have a leaderboard module that does all the leaderboard handling for
us and that computing the leaderboard is very expensive computationally wise, so we want to store one version of it that
gets regularly updated instead of generating it every time someone calls the /leaderboard view
command.
from discord.ext import tasks, commands
from discord.commands import SlashCommandGroup, CommandPermissions
from leaderboard import * # Mock leaderboard module.
class LeaderboardCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.update_leaderboard.start()
self.leaderboard_embed = generate_leaderboard_embed()
leaderboard = SlashCommandGroup(name='leaderboard', description='Leaderboard commands.')
@tasks.loop(minutes=10)
async def update_leaderboard(self):
print('Updating leaderboard...')
await update_leaderboard()
self.leaderboard_embed = generate_leaderboard_embed()
@update_leaderboard.before_loop
async def before_update_leaderboard(self):
await self.bot.wait_until_ready()
print("About to start updating leaderboard.")
@update_leaderboard.after_loop
async def after_update_leaderboard(self):
leaderboard_cleanup()
print("Stopped updating leaderboard.")
@update_leaderboard.error
async def update_leaderboard_error(self, error):
print(f"Oh no, an error occurred while updating the leaderboard. Error: {error}")
@leaderboard.command()
async def view(self, ctx):
await ctx.respond(embed=self.leaderboard_embed)
@leaderboard.command()
async def update_info(self, ctx):
await ctx.respond(f"""***Leaderboard Info***\n
Last update: {(600 - self.update_leaderboard.next_iteration.timestamp())//60} minutes ago.\n
Next update: in {self.update_leaderboard.next_iteration.timestamp() // 60} minutes.\n
Time between loops: {self.update_leaderboard.minutes} minutes.\n
Times updated this session: {self.update_leaderboard.current_loop}.
Running? {self.update_leaderboard.is_running()}
Failed? {self.update_leaderboard.failed()}""", ephemeral=True)
# Now for the owner only commands:
leaderboard_config = SlashCommandGroup(name="leaderboard_config", description="Leaderboard configuration commands", permission=[CommandPermissions("owner", 2, True)])
@leaderboard_config.command()
async def set_time_between_updates(self, ctx, minutes: int):
self.update_leaderboard.change_interval(minutes=minutes)
await ctx.respond(f"Set time between updates to {minutes} minutes.")
@leaderboard_config.command()
async def stop(self, ctx):
self.update_leaderboard.stop()
await ctx.respond("Stopped the leaderboard update.")
@leaderboard_config.command()
async def restart(self, ctx):
self.update_leaderboard.restart()
await ctx.respond("Restarted the leaderboard update.")
def setup(bot):
bot.add_cog(LeaderboardCog(bot))
Phew, that was quite a bit of code.
Now to explain what's going on:
First things first we create a cog and in its __init__
function we start the update_leaderboard
task and generate
the first instance of our leaderboard's embed.
After that, we define the update_leaderboard
task using the loop
decorator, and we make it run every ten minutes by
passing 10 to the minutes
parameter.
Then that we define the before_update_leaderboard
task using the before_loop
decorator, and we make it wait until the
bot is ready before starting the task.
Next up we define the after_update_leaderboard
task using the after_loop
decorator, and we make it clean up the
leaderboard when the loop finally stops running.
Then we define the update_leaderboard_error
task using the error
decorator, and we make it print any errors that may
occur while we update our leaderboard to the console.
Then we get into the fun stuff, we create a slash command group using the SlashCommandGroup
class and name it
leaderboard
, we then create the view
sub-command which sends the embed generated when the leaderboard is updated and
update_info
which shows a lot of data about the task using some attributes and functions available to us.
We also create a slash command group called leaderboard_config
which is only available to the owner of the bot.
Then we create the set_time_between_updates
sub-command which takes in the time in minutes and changes the time
between updates of the leaderboard using the change_interval
method, the stop
sub-command which stops the task using
the stop
method and restart
which restarts the task using the restart
method.
Finally, we add the LeaderboardCog
cog to the bot, and we're done!
I'd highly recommend you check the tasks extension documentation for more info on how to use them and what other functions they have.