ievv_batchframework — Framework for batch/bulk tasks¶
The intention of this module is to make it easier to write code for background tasks and some kinds of bulk operations.
Configuration¶
Add the following to your INSTALLED_APPS
-setting:
‘ievv_opensource.ievv_batchframework.apps.BatchOperationAppConfig’
Batchregistry - the high level API¶
Recommended Celery setup¶
Install Redis¶
Redis is very easy to install and use, and it is one of the recommended broker and result backends for Celery, so we recommend that you use this when developing with Celery. You may want to use the Django database instead, but that leaves you with a setup that is further from a real production environment, and using Redis is very easy if you use ievv devrun as shown below.
On Mac OSX, you can install redis with Homebrew using:
$ brew install redis
and most linux systems have Redis in their package repository. For other systems, go to http://redis.io/, and follow their install guides.
Configure Celery¶
First, you have to create a Celery Application for your project.
Create a file named celery.py
within a module that you know is
loaded when Django starts. The safest place is in the root of your
project module. So if you have:
myproject/
__init__.py
myapp/
__init__.py
models.py
mysettings/
settings.py
You should add the celery configuration in myproject/celery.py
. The rest of this
guide will assume you put it at this location.
Put the following code in myproject/celery.py
:
from __future__ import absolute_import
import os
from celery import Celery
# Ensure this matches your
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# The ``main``-argument is used as prefix for celery task names.
app = Celery(main='myproject')
# We put all the celery settings in out Django settings so we use
# this line to load Celery settings from Django settings.
# You could also add configuration for celery directly in this
# file using app.conf.update(...)
app.config_from_object('django.conf:settings')
# This debug task is only here to make it easier to verify that
# celery is working properly.
@app.task(bind=True)
def debug_add_task(self, a, b):
print('Request: {0!r} - Running {} + {}, and returning the result.'.format(
self.request, a, b))
return a + b
And put the following code in myproject/__init__.py
:
from __future__ import absolute_import
# This will make sure the Celery app is always imported when
# Django starts so that @shared_task will use this app.
from .celery import app as celery_app
Add the following to your Django settings:
# Celery settings
BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Oslo' # Change to your preferred timezone!
CELERY_IMPORTS = [
'ievv_opensource.ievv_batchframework.celery_tasks',
]
CELERYD_TASK_LOG_FORMAT = '[%(asctime)s: %(levelname)s/%(processName)s] ' \
'[%(name)s] ' \
'[%(task_name)s(%(task_id)s)] ' \
'%(message)s'
# ievv_batchframework settings
IEVV_BATCHFRAMEWORK_CELERY_APP = 'myproject.celery_app'
Setup ievv devrun — All your development servers in one command, and add ievvdevrun.runnables.redis_server.RunnableThread()
and ``
to your IEVVTASKS_DEVRUN_RUNNABLES
. You should end up with something like this:
IEVVTASKS_DEVRUN_RUNNABLES = {
'default': ievvdevrun.config.RunnableThreadList(
# ievvdevrun.runnables.dbdev_runserver.RunnableThread(), # Uncomment if using django_dbdev
ievvdevrun.runnables.django_runserver.RunnableThread(),
ievvdevrun.runnables.redis_server.RunnableThread(),
ievvdevrun.runnables.celery_worker.RunnableThread(app='myproject'),
),
}
At this point, you should be able to run:
$ ievv devrun
to start the Django server, redis and the celery worker. To test that everything is working:
Take a look at the output from
ievv devrun
, and make sure that yourdebug_add_task
(from celery.py) is listed as a task in the[tasks]
list printed by the celery worker on startup. If it is not, this probably means you did not put the code in themyproject/__init__.py
example above in a place that Django reads at startup. You may want to try to move it into the same module as yoursettings.py
and restartievv devrun
.Start up the django shell and run the
debug_add_task
:$ python manage.py shell >>> from myproject.celery import debug_add_task >>> result = debug_add_task.delay(10, 20) >>> result.wait() 30
If this works, Celery is configured correctly.
Developing with asyncronous actions¶
When developing with asyncronous tasks with the setup from the introduction guide above,
you need to restart ievv devrun
each time you change some code used by an asyncronous
action. This means that if you add an Action or ActionGroup, or change any code used within
an Action or ActionGroup, you have to stop ievv devrun
, and start it again. We provide
two options for avoiding this.
Option 1: Run all actions synchronously¶
This is great for unit tests, and for developing and debugging code in your
ievv_opensource.ievv_batchframework.batchregistry.Action.executable()
methods. To enable this, add the following to your settings:
IEVV_BATCHFRAMEWORK_ALWAYS_SYNCRONOUS = True
Option 2: Run celery worker manually¶
Restarting ievv devrun
can take some time if you have lots of commands that have
to stop and start again. You can save some time for each change if you remove/comment out
the ievvdevrun.runnables.celery_worker.RunnableThread
line from the IEVVTASKS_DEVRUN_RUNNABLES
setting (restart ievv devrun
after this change), and run Celery manually with the following
command instead:
$ celery -A myproject worker -l debug
Now you can start and stop only the Celery worker instead of restarting ievv devrun
.
Batchregistry API¶
-
exception
ievv_opensource.ievv_batchframework.batchregistry.
ActionGroupSynchronousExecutionError
(actiongroupresult)[source]¶ Bases:
Exception
-
exception
ievv_opensource.ievv_batchframework.batchregistry.
ActionError
(error_data_dict)[source]¶ Bases:
Exception
-
ievv_opensource.ievv_batchframework.batchregistry.
action_factory
(baseclass, name)[source]¶ Factory for creating
Action
classes. This is simply a thin wrapper aroundtype
to dynamically create a subclass of the givenbaseclass
with a different name.There are two use cases for using this:
- Give re-usable action classes a better name.
- Use the same
Action
subclass multiple times in the sameActionGroup
.
Both of these cases can also be solved with subclasses, and that is normally a better solution.
Parameters: - baseclass – The class to create a subclass of.
- name – The name of the subclass.
-
class
ievv_opensource.ievv_batchframework.batchregistry.
Action
(**kwargs)[source]¶ Bases:
object
An action is the subclass for code that can be executed as part of an
ActionGroup
.You create a subclass of this class, and override
exectute()
to implement an action, and you add your subclass to anActionGroup
to use your action class.-
classmethod
run
(**kwargs)[source]¶ Run the action - used internally.
Parameters: - kwargs – Kwargs for the action.
- executed_by_celery – Must be
True
if the action is executed by a celery task. This is required to configure the correct logger.
-
logger
¶ Get the logger for this action.
-
classmethod
-
class
ievv_opensource.ievv_batchframework.batchregistry.
ActionGroupExecutionInfo
(actiongroup, mode, route_to_alias, actiongroupresult=None, batchoperation=None)[source]¶ Bases:
object
Return value from
ActionGroup.run()
, with information about how the ActionGroup was executed.-
actiongroup
¶ -
The :class:`.ActionGroup` object that was executed.
-
mode
¶ -
The mode the ActionGroup was executed with.
-
route_to_alias
¶ -
The route_to_alias that was used to route/prioritize the execution of
-
the ActionGroup.
-
is_asynchronous
¶ Property that returns
True
if the ActionGroup was executed asynchronously.
-
actiongroupresult
¶ Property for getting the
ActionGroupResult
if the ActionGroup was executed in synchronous mode.Raises: AttributeError
– Ifmode
isActionGroup.MODE_ASYNCHRONOUS
.
-
batchoperation
¶ Property for getting the
ievv_opensource.ievv_batchframework.models.BatchOperation
object that was created if the ActionGroup was executed in asynchronous mode.Raises: AttributeError
– Ifmode
isActionGroup.MODE_ASYNCHRONOUS
.
-
-
class
ievv_opensource.ievv_batchframework.batchregistry.
ActionGroup
(name, actions=None, mode=None, route_to_alias=None)[source]¶ Bases:
object
An ActionGroup is a list of
actions
that can be executed both synchronously and asynchronously.Parameters: - name – The name of the ActionGroup.
- actions – A list of actions.
- mode – The default mode of the ActionGroup.
Defaults to
MODE_ASYNCHRONOUS
. You will often want to determine this from the input (I.E.: Use asynchronous if sending more than 500 newsletters), and this can be done by extending this class and overridingget_mode()
. - route_to_alias – Defines where to route this ActionGroup when
is is executed in asynchronous mode. Defaults to
Registry.ROUTE_TO_ALIAS_DEFAULT
. You can determine the route dynamically each time the ActionGroup is executed by overridingget_route_to_alias()
.
-
MODE_ASYNCHRONOUS
= 'asynchronous'¶ Constant for asynchronous (background/Celery) mode of execution.
-
MODE_SYNCHRONOUS
= 'synchronous'¶ Constant for synchronous (blocking) mode of execution.
-
add_action
(action)[source]¶ Add a action.
Parameters: action – A subclass of Action
(not an object, but a class).
-
add_actions
(actions)[source]¶ Add actions.
Parameters: actions – A list of Action
subclasses (classes not actions).
-
get_mode
(**kwargs)[source]¶ Get the mode to run the ActionGroup in. Must return one of
MODE_ASYNCHRONOUS
orMODE_SYNCHRONOUS
.The use-case for overriding this method is optimization. Lets say you have to re-index your blogposts in a search engine each time they are updated. If you update just a few blogpost, you may want to do that in synchronous mode, but if you update 500 blogposts, you will probably want to re-index in asynchronous mode (I.E. in Celery).
Parameters: kwargs – The kwargs
the user provided torun()
.
-
get_route_to_alias
(**kwargs)[source]¶ Define where to route this ActionGroup when is is executed in asynchronous mode.
This is the method you want to override to handle priority of your asynchronously executed ActionGroups.
Lets say you have a huge blog, with lots of traffic. After updating a blogpost, you need to do some heavy postprocessing (image optimization, video transcoding, etc.). If you update a newly posted blogpost this postprocessing should be placed in a high-priority queue, and if you update an old blogpost, this postprocessing should be placed in a low-priority queue. To achieve this, you simply need to create a subclass of ActionGroup, and override this method to return
Registry.ROUTE_TO_ALIAS_HIGHPRIORITY
for recently created blogpost, andRegistry.ROUTE_TO_ALIAS_DEFAULT
for old blogposts.Parameters: kwargs – The kwargs
the user provided torun_asynchronous()
.Returns: One of the route-to aliases added to the Registry
usingRegistry.add_route_to_alias()
. This will always includeRegistry.ROUTE_TO_ALIAS_DEFAULT
andRegistry.ROUTE_TO_ALIAS_HIGHPRIORITY
.Return type: str
-
run_synchronous
(**kwargs)[source]¶ Run the ActionGroup in blocking/synchronous mode.
Parameters: kwargs – Kwargs for Action
.
-
get_batchoperation_options
(**kwargs)[source]¶ You can override this if you create a re-usable ActionGroup subclass that sets options for the
ievv_opensource.ievv_batchframework.models.BatchOperation
based onkwargs
.Called by
run_asynchronous()
to get the kwargs forievv_opensource.ievv_batchframework.models.BatchOperationManager.create_asynchronous()
.If you override this, you should normally call
super()
, and update the kwargs returned by super.Parameters: kwargs – The kwargs
the user provided torun_asynchronous()
.Returns: Kwargs for ievv_opensource.ievv_batchframework.models.BatchOperationManager.create_asynchronous()
Return type: dict
-
create_batchoperation
(**kwargs)[source]¶ Used by
run_asynchronous()
to create theievv_opensource.ievv_batchframework.models.BatchOperation
object.You normally do not need to override this - override
get_batchoperation_options()
instead.Warning
Overriding this may lead to breaking code if the inner workings of this framework is changed/optimized in the future.
Parameters: kwargs – See run_asynchronous()
.Returns: The created ievv_opensource.ievv_batchframework.models.BatchOperation
.Return type: BatchOperation
-
run
(**kwargs)[source]¶ Runs one of
run_asynchronous()
andrun_synchronous()
. The method to run is determined by the return-value ofget_mode()
:- If
get_mode()
returnsMODE_ASYNCHRONOUS
,run_asynchronous()
is called. - If
get_mode()
returnsMODE_SYNCHRONOUS
,run_synchronous()
is called.
Parameters: - context_object – context_object for
ievv_opensource.ievv_batchframework.models.BatchOperation
. - started_by – started_by for
ievv_opensource.ievv_batchframework.models.BatchOperation
. - **kwargs – Kwargs for
Action
. Forwarded torun_asynchronous()
andrun_synchronous()
.
- If
-
class
ievv_opensource.ievv_batchframework.batchregistry.
Registry
[source]¶ Bases:
ievv_opensource.utils.singleton.Singleton
The registry of
ActionGroup
objects.-
add_route_to_alias
(route_to_alias, task_callable)[source]¶ Add a route-to alias.
Parameters: - route_to_alias (str) – The alias.
- task_callable (func) – The callable rq task.
-
add_actiongroup
(actiongroup)[source]¶ Add an
ActionGroup
to the registry.Parameters: actiongroup – The ActionGroup
object to add.
-
remove_actiongroup
(actiongroup_name)[source]¶ Remove an
ActionGroup
from the registry.Parameters: actiongroup_name – The name of the actiongroup. Raises: KeyError
– If noActionGroup
with the providedactiongroup_name
exists in the registry.
-
get_actiongroup
(actiongroup_name)[source]¶ Get an
ActionGroup
object from the registry.Parameters: actiongroup_name – The name of the actiongroup. Returns: An ActionGroup
object.Return type: ActionGroup Raises: KeyError
– If noActionGroup
with the providedactiongroup_name
exists in the registry.
-
The BatchOperation model¶
The BatchOperation model is at the hearth of ievv_batchframework
.
Each time you start a batch process, you create an object
of ievv_opensource.ievv_batchframework.models.BatchOperation
and use that to communicate the status, success/error data and
other metadata about the batch operation.
Asynchronous operations¶
An asynchronous operation is the most common use case for the BatchOperation model. It is used to track a task that is handled (E.g.: completed) by some kind of asynchronous service such as a cron job or a Celery task.
Lets say you have want to send an email 15 minutes after a blog post has been created unless the user cancels the email sending within within 15 minutes. You would then need to:
- Create a BatchOperation object each time a blog post is created.
- Use some kind of batching service, like Celery, to poll for BatchOperation objects that asks it to send out email.
- Delete the BatchOperation if a user clicks “cancel” within 15 minutes of the creation timestamp.
The code for this would look something like this:
from ievv_opensource.ievv_batchframework.models import BatchOperation
myblogpost = Blog.objects.get(...)
BatchOperation.objects.create_asynchronous(
context_object=myblogpost,
operationtype='new-blogpost-email')
# code to send the batch operation to the batching service (like celery)
# with a 15 minute delay, or just a service that polls for
# BatchOperation.objects.filter(operationtype='new-blogpost-email',
# created_datetime__lt=timezone.now() - timedelta(minutes=15))
# The batching service code
def my_batching_service(...):
batchoperation = BatchOperation.objects.get(...)
batchoperation.mark_as_running()
# ... send out the emails ...
batchoperation.finish()
# In the view for cancelling email sending
BatchOperation.objects\
.filter(operationtype='new-blogpost-email',
context_object=myblogpost)\
.remove()
Synchronous operations¶
You may also want to use BatchOperation for synchronous operations. This is mainly useful for complex bulk create and bulk update operations.
Lets say you have Game objects with a one-to-many relationship to Player objects with a one-to-many relationship to Card objects. You want to start all players in a game with a card. How to you batch create all the players with a single card?
You can easily batch create players with bulk_create
, but you can not
batch create the cards because they require a Player. So you need to a way
of retrieving the players you just batch created.
If you create a BatchOperation with
context_object
set to the Game, you will get a unique identifier for the operation (the id of
the BatchOperation). Then you can set that identifier as an attribute on all
the batch-created Player objects (preferrably as a foreign-key), and retrieve
the batch created objects by filtering on the id of the BatchOperation.
After this, you can iterate through all the created Player objects, and create a
list of Card objects for your batch create operation for the cards.
Example:
game = Game.objects.get(...)
batchoperation = BatchOperation.objects.create_synchronous(
context_object=game)
players = []
for index in range(1000):
player = Player(
game=game,
name='player{}'.format(index),
batchoperation=batchoperation)
players.append(player)
Player.objects.bulk_create(players)
created_players = Player.objects.filter(batchoperation=batchoperation)
cards = []
available_cards = [...] # A list of available card IDs
for player in created_players:
card = Card(
player=player,
cardid=random.choice(available_cards)
)
cards.append(card)
Card.objects.bulk_create(cards)
batchoperation.finish()
As you can see in the example above, instead of having to perform 2000 queries (one for each player, and one for each card), we now only need 5 queries no matter how many players we have (or a few more on database servers that can not bulk create 1000 items at a time).
Data model API¶
-
class
ievv_opensource.ievv_batchframework.models.
BatchOperationManager
[source]¶ Bases:
django.db.models.manager.Manager
Manager for
BatchOperation
.-
create_synchronous
(input_data=None, **kwargs)[source]¶ Create a synchronous
BatchOperation
.An synchronous batch operation starts with
BatchOperation.status
set toBatchOperation.STATUS_RUNNING
andstarted_running_datetime
set just as ifBatchOperation.mark_as_running()
was called. So calling this would have the same result as callingcreate_asynchronous()
and then callingBatchOperation.mark_as_running()
, but this will just use one database query instead of two.The
BatchOperation
is cleaned before it is saved.Parameters: - input_data – The input data.
A python object to set as the input data using the
BatchOperation.input_data()
property. - **kwargs – Forwarded to the constructor for
BatchOperation
.
Returns: The created BatchOperation object.
Return type: - input_data – The input data.
A python object to set as the input data using the
-
create_asynchronous
(input_data=None, **kwargs)[source]¶ Create an asynchronous
BatchOperation
. An asynchronous batch operation starts withBatchOperation.status
set toBatchOperation.STATUS_UNPROCESSED
.The
BatchOperation
is cleaned before it is saved.Parameters: - input_data – The input data.
A python object to set as the input data using the
BatchOperation.input_data()
property. - **kwargs – Forwarded to the constructor for
BatchOperation
.
Returns: The created BatchOperation object.
Return type: - input_data – The input data.
A python object to set as the input data using the
-
-
class
ievv_opensource.ievv_batchframework.models.
BatchOperation
(*args, **kwargs)[source]¶ Bases:
django.db.models.base.Model
Defines a batch operation.
-
STATUS_UNPROCESSED
= 'unprocessed'¶ One of the possible values for
status
. Defines the BatchOperation as uprocessed (not yet started). This only makes sense for background tasks. They will typically be created with the unprocessed status, and then set toSTATUS_RUNNING
when the batching service starts running the operation.
-
STATUS_RUNNING
= 'running'¶ One of the possible values for
status
. Defines the BatchOperation as running (in progress).
-
STATUS_FINISHED
= 'finished'¶ One of the possible values for
status
. Defines the BatchOperation as finished.
-
STATUS_CHOICES
= [('unprocessed', 'unprocessed'), ('running', 'running'), ('finished', 'finished')]¶ Allowed values for
status
. Possible values are:
-
RESULT_NOT_AVAILABLE
= 'not-available'¶ One of the possible values for
result
. This is used when we have no result yet (the operation is not finished).
-
RESULT_SUCCESSFUL
= 'successful'¶ One of the possible values for
result
. Defines the BatchOperation as failed. This is set if the operation could not be completed because of an error. Any details about the result of the operation can be stored inoutput_data_json
.
-
RESULT_FAILED
= 'failed'¶ One of the possible values for
result
. Defines the BatchOperation as failed. This is set if the operation could not be completed because of an error. Any error message(s) should be stored inoutput_data_json
.
-
RESULT_CHOICES
= [('not-available', 'not available yet (processing not finished)'), ('successful', 'successful'), ('failed', 'failed')]¶ Allowed values for
result
. Possible values are:
-
started_by
¶ The user that started this batch operation. Optional, but it is good metadata to add for debugging.
-
created_datetime
¶ The datetime when this batch operation was created. Defaults to
timezone.now()
.
-
started_running_datetime
¶ The datetime when this batch operation started running. This is not the same as
created_datetime
, this is the time when the operation started processing.
-
finished_datetime
¶ The datetime when this batch operation was finished.
-
context_content_type
¶ The content type for
context_object
.
-
context_object_id
¶ The id field for
context_object
.
-
context_object
¶ Generic foreign key that identifies the context this operation runs in. This is optional.
-
operationtype
¶ The type of operation. This is application specific - you typically use this if you allow multiple different batch operations on the same
context_object
. This is not required, and defaults to empty string.
-
status
¶ The status of the operation. The allowed values for this field is documented in
STATUS_CHOICES
. Defaults toSTATUS_UNPROCESSED
.
-
result
¶ The result of the operation. The allowed values for this field is documented in
RESULT_CHOICES
. Defaults toRESULT_NOT_AVAILABLE
.
-
input_data_json
¶ Input data for the BatchOperation. You should not use this directly, use the
input_data
property instead.
-
output_data_json
¶ Output data for the BatchOperation. You should not use this directly, use the
output_data
property instead.
-
input_data
¶ Decode
BatchOperation.input_data_json
and return the result.Return None if input_data_json is empty.
-
output_data
¶ Decode
BatchOperation.output_data_json
and return the result.Returns: None if output_data_json is empty, or the decoded json data if the output_data
is not empty.Return type: object
-
mark_as_running
()[source]¶ Mark the batch operation as running.
Sets the
status
toSTATUS_RUNNING
,started_running_datetime
to the current datetime, clean and save.
-
finish
(failed=False, output_data=None)[source]¶ Mark the bulk operation as finished.
Sets
result
as documented in thefailed
parameter below. Setsfinished_datetime
to the current datetime. Setsoutput_data_json
as documented in theoutput_data
parameter below.Parameters: - failed (boolean) – Set this to
False
to setresult
toRESULT_FAILED
. The default isTrue
, which means thatresult
is set toRESULT_SUCCESSFUL
- output_data – The output data.
A python object to set as the output data using the
BatchOperation.output_data()
property.
- failed (boolean) – Set this to
-
exception
DoesNotExist
¶
-
exception
MultipleObjectsReturned
¶
-
clean
()[source]¶ Hook for doing any extra model-wide validation after clean() has been called on every field by self.clean_fields. Any ValidationError raised by this method will not be associated with a particular field; it will have a special-case association with the field defined by NON_FIELD_ERRORS.
-