ievv_batchframework — Framework for batch/bulk tasks

The intention of this module is to make it easier to write code for background tasks and some kinds of bulk operations.

Configuration

Add the following to your INSTALLED_APPS-setting:

‘ievv_opensource.ievv_batchframework.apps.BatchOperationAppConfig’

Batchregistry - the high level API

Developing with asyncronous actions

When developing with asyncronous tasks with the setup from the introduction guide above, you need to restart ievv devrun each time you change some code used by an asyncronous action. This means that if you add an Action or ActionGroup, or change any code used within an Action or ActionGroup, you have to stop ievv devrun, and start it again. We provide two options for avoiding this.

Option 1: Run all actions synchronously

This is great for unit tests, and for developing and debugging code in your ievv_opensource.ievv_batchframework.batchregistry.Action.executable() methods. To enable this, add the following to your settings:

IEVV_BATCHFRAMEWORK_ALWAYS_SYNCRONOUS = True

Option 2: Run celery worker manually

Restarting ievv devrun can take some time if you have lots of commands that have to stop and start again. You can save some time for each change if you remove/comment out the ievvdevrun.runnables.celery_worker.RunnableThread line from the IEVVTASKS_DEVRUN_RUNNABLES setting (restart ievv devrun after this change), and run Celery manually with the following command instead:

$ celery -A myproject worker -l debug

Now you can start and stop only the Celery worker instead of restarting ievv devrun.

Batchregistry API

exception ievv_opensource.ievv_batchframework.batchregistry.ActionGroupSynchronousExecutionError(actiongroupresult)[source]

Bases: Exception

exception ievv_opensource.ievv_batchframework.batchregistry.ActionError(error_data_dict)[source]

Bases: Exception

ievv_opensource.ievv_batchframework.batchregistry.action_factory(baseclass, name)[source]

Factory for creating Action classes. This is simply a thin wrapper around type to dynamically create a subclass of the given baseclass with a different name.

There are two use cases for using this:

  • Give re-usable action classes a better name.
  • Use the same Action subclass multiple times in the same ActionGroup.

Both of these cases can also be solved with subclasses, and that is normally a better solution.

Parameters:
  • baseclass – The class to create a subclass of.
  • name – The name of the subclass.
class ievv_opensource.ievv_batchframework.batchregistry.Action(**kwargs)[source]

Bases: object

An action is the subclass for code that can be executed as part of an ActionGroup.

You create a subclass of this class, and override exectute() to implement an action, and you add your subclass to an ActionGroup to use your action class.

classmethod run(**kwargs)[source]

Run the action - used internally.

Parameters:
  • kwargs – Kwargs for the action.
  • executed_by_celery – Must be True if the action is executed by a celery task. This is required to configure the correct logger.
classmethod get_name()[source]

Get the name of this action.

logger

Get the logger for this action.

execute()[source]

Execute the action. Must be overridden in subclasses.

class ievv_opensource.ievv_batchframework.batchregistry.ActionGroupExecutionInfo(actiongroup, mode, route_to_alias, actiongroupresult=None, batchoperation=None)[source]

Bases: object

Return value from ActionGroup.run(), with information about how the ActionGroup was executed.

actiongroup
The :class:`.ActionGroup` object that was executed.
mode
The mode the ActionGroup was executed with.
route_to_alias
The route_to_alias that was used to route/prioritize the execution of
the ActionGroup.
is_asynchronous

Property that returns True if the ActionGroup was executed asynchronously.

actiongroupresult

Property for getting the ActionGroupResult if the ActionGroup was executed in synchronous mode.

Raises:AttributeError – If mode is ActionGroup.MODE_ASYNCHRONOUS.
batchoperation

Property for getting the ievv_opensource.ievv_batchframework.models.BatchOperation object that was created if the ActionGroup was executed in asynchronous mode.

Raises:AttributeError – If mode is ActionGroup.MODE_ASYNCHRONOUS.
class ievv_opensource.ievv_batchframework.batchregistry.ActionGroup(name, actions=None, mode=None, route_to_alias=None)[source]

Bases: object

An ActionGroup is a list of actions that can be executed both synchronously and asynchronously.

Parameters:
  • name – The name of the ActionGroup.
  • actions – A list of actions.
  • mode – The default mode of the ActionGroup. Defaults to MODE_ASYNCHRONOUS. You will often want to determine this from the input (I.E.: Use asynchronous if sending more than 500 newsletters), and this can be done by extending this class and overriding get_mode().
  • route_to_alias – Defines where to route this ActionGroup when is is executed in asynchronous mode. Defaults to Registry.ROUTE_TO_ALIAS_DEFAULT. You can determine the route dynamically each time the ActionGroup is executed by overriding get_route_to_alias().
MODE_ASYNCHRONOUS = 'asynchronous'

Constant for asynchronous (background/Celery) mode of execution.

MODE_SYNCHRONOUS = 'synchronous'

Constant for synchronous (blocking) mode of execution.

add_action(action)[source]

Add a action.

Parameters:action – A subclass of Action (not an object, but a class).
add_actions(actions)[source]

Add actions.

Parameters:actions – A list of Action subclasses (classes not actions).
get_mode(**kwargs)[source]

Get the mode to run the ActionGroup in. Must return one of MODE_ASYNCHRONOUS or MODE_SYNCHRONOUS.

The use-case for overriding this method is optimization. Lets say you have to re-index your blogposts in a search engine each time they are updated. If you update just a few blogpost, you may want to do that in synchronous mode, but if you update 500 blogposts, you will probably want to re-index in asynchronous mode (I.E. in Celery).

Parameters:kwargs – The kwargs the user provided to run().
get_route_to_alias(**kwargs)[source]

Define where to route this ActionGroup when is is executed in asynchronous mode.

This is the method you want to override to handle priority of your asynchronously executed ActionGroups.

Lets say you have a huge blog, with lots of traffic. After updating a blogpost, you need to do some heavy postprocessing (image optimization, video transcoding, etc.). If you update a newly posted blogpost this postprocessing should be placed in a high-priority queue, and if you update an old blogpost, this postprocessing should be placed in a low-priority queue. To achieve this, you simply need to create a subclass of ActionGroup, and override this method to return Registry.ROUTE_TO_ALIAS_HIGHPRIORITY for recently created blogpost, and Registry.ROUTE_TO_ALIAS_DEFAULT for old blogposts.

Parameters:kwargs – The kwargs the user provided to run_asynchronous().
Returns:One of the route-to aliases added to the Registry using Registry.add_route_to_alias(). This will always include Registry.ROUTE_TO_ALIAS_DEFAULT and Registry.ROUTE_TO_ALIAS_HIGHPRIORITY.
Return type:str
run_synchronous(**kwargs)[source]

Run the ActionGroup in blocking/synchronous mode.

Parameters:kwargs – Kwargs for Action.
get_batchoperation_options(**kwargs)[source]

You can override this if you create a re-usable ActionGroup subclass that sets options for the ievv_opensource.ievv_batchframework.models.BatchOperation based on kwargs.

Called by run_asynchronous() to get the kwargs for ievv_opensource.ievv_batchframework.models.BatchOperationManager.create_asynchronous().

If you override this, you should normally call super(), and update the kwargs returned by super.

Parameters:kwargs – The kwargs the user provided to run_asynchronous().
Returns:Kwargs for ievv_opensource.ievv_batchframework.models.BatchOperationManager.create_asynchronous()
Return type:dict
create_batchoperation(**kwargs)[source]

Used by run_asynchronous() to create the ievv_opensource.ievv_batchframework.models.BatchOperation object.

You normally do not need to override this - override get_batchoperation_options() instead.

Warning

Overriding this may lead to breaking code if the inner workings of this framework is changed/optimized in the future.

Parameters:kwargs – See run_asynchronous().
Returns:The created ievv_opensource.ievv_batchframework.models.BatchOperation.
Return type:BatchOperation
run_asynchronous(**kwargs)[source]
Parameters:kwargs – Kwargs for Action.
run(**kwargs)[source]

Runs one of run_asynchronous() and run_synchronous(). The method to run is determined by the return-value of get_mode():

Parameters:
class ievv_opensource.ievv_batchframework.batchregistry.Registry[source]

Bases: ievv_opensource.utils.singleton.Singleton

The registry of ActionGroup objects.

add_route_to_alias(route_to_alias, task_callable)[source]

Add a route-to alias.

Parameters:
  • route_to_alias (str) – The alias.
  • task_callable (func) – The callable rq task.
add_actiongroup(actiongroup)[source]

Add an ActionGroup to the registry.

Parameters:actiongroup – The ActionGroup object to add.
remove_actiongroup(actiongroup_name)[source]

Remove an ActionGroup from the registry.

Parameters:actiongroup_name – The name of the actiongroup.
Raises:KeyError – If no ActionGroup with the provided actiongroup_name exists in the registry.
get_actiongroup(actiongroup_name)[source]

Get an ActionGroup object from the registry.

Parameters:actiongroup_name – The name of the actiongroup.
Returns:An ActionGroup object.
Return type:ActionGroup
Raises:KeyError – If no ActionGroup with the provided actiongroup_name exists in the registry.
run(actiongroup_name, **kwargs)[source]

Shortcut for:

Registry.get_instance().get_actiongroup(actiongroup_name)                .run(**kwargs)

The BatchOperation model

The BatchOperation model is at the hearth of ievv_batchframework. Each time you start a batch process, you create an object of ievv_opensource.ievv_batchframework.models.BatchOperation and use that to communicate the status, success/error data and other metadata about the batch operation.

Asynchronous operations

An asynchronous operation is the most common use case for the BatchOperation model. It is used to track a task that is handled (E.g.: completed) by some kind of asynchronous service such as a cron job or a Celery task.

Lets say you have want to send an email 15 minutes after a blog post has been created unless the user cancels the email sending within within 15 minutes. You would then need to:

  • Create a BatchOperation object each time a blog post is created.
  • Use some kind of batching service, like Celery, to poll for BatchOperation objects that asks it to send out email.
  • Delete the BatchOperation if a user clicks “cancel” within 15 minutes of the creation timestamp.

The code for this would look something like this:

from ievv_opensource.ievv_batchframework.models import BatchOperation

myblogpost = Blog.objects.get(...)
BatchOperation.objects.create_asynchronous(
    context_object=myblogpost,
    operationtype='new-blogpost-email')
# code to send the batch operation to the batching service (like celery)
# with a 15 minute delay, or just a service that polls for
# BatchOperation.objects.filter(operationtype='new-blogpost-email',
#                               created_datetime__lt=timezone.now() - timedelta(minutes=15))


# The batching service code
def my_batching_service(...):
    batchoperation = BatchOperation.objects.get(...)
    batchoperation.mark_as_running()
    # ... send out the emails ...
    batchoperation.finish()


# In the view for cancelling email sending
BatchOperation.objects\
    .filter(operationtype='new-blogpost-email',
            context_object=myblogpost)\
    .remove()

Synchronous operations

You may also want to use BatchOperation for synchronous operations. This is mainly useful for complex bulk create and bulk update operations.

Lets say you have Game objects with a one-to-many relationship to Player objects with a one-to-many relationship to Card objects. You want to start all players in a game with a card. How to you batch create all the players with a single card?

You can easily batch create players with bulk_create, but you can not batch create the cards because they require a Player. So you need to a way of retrieving the players you just batch created.

If you create a BatchOperation with context_object set to the Game, you will get a unique identifier for the operation (the id of the BatchOperation). Then you can set that identifier as an attribute on all the batch-created Player objects (preferrably as a foreign-key), and retrieve the batch created objects by filtering on the id of the BatchOperation. After this, you can iterate through all the created Player objects, and create a list of Card objects for your batch create operation for the cards.

Example:

game = Game.objects.get(...)
batchoperation = BatchOperation.objects.create_synchronous(
    context_object=game)

players = []
for index in range(1000):
    player = Player(
        game=game,
        name='player{}'.format(index),
        batchoperation=batchoperation)
    players.append(player)
Player.objects.bulk_create(players)
created_players = Player.objects.filter(batchoperation=batchoperation)

cards = []
available_cards = [...]  # A list of available card IDs
for player in created_players:
    card = Card(
        player=player,
        cardid=random.choice(available_cards)
    )
    cards.append(card)
Card.objects.bulk_create(cards)
batchoperation.finish()

As you can see in the example above, instead of having to perform 2000 queries (one for each player, and one for each card), we now only need 5 queries no matter how many players we have (or a few more on database servers that can not bulk create 1000 items at a time).

Data model API

class ievv_opensource.ievv_batchframework.models.BatchOperationManager[source]

Bases: django.db.models.manager.Manager

Manager for BatchOperation.

create_synchronous(input_data=None, **kwargs)[source]

Create a synchronous BatchOperation.

An synchronous batch operation starts with BatchOperation.status set to BatchOperation.STATUS_RUNNING and started_running_datetime set just as if BatchOperation.mark_as_running() was called. So calling this would have the same result as calling create_asynchronous() and then calling BatchOperation.mark_as_running(), but this will just use one database query instead of two.

The BatchOperation is cleaned before it is saved.

Parameters:
  • input_data – The input data. A python object to set as the input data using the BatchOperation.input_data() property.
  • **kwargs – Forwarded to the constructor for BatchOperation.
Returns:

The created BatchOperation object.

Return type:

BatchOperation

create_asynchronous(input_data=None, **kwargs)[source]

Create an asynchronous BatchOperation. An asynchronous batch operation starts with BatchOperation.status set to BatchOperation.STATUS_UNPROCESSED.

The BatchOperation is cleaned before it is saved.

Parameters:
  • input_data – The input data. A python object to set as the input data using the BatchOperation.input_data() property.
  • **kwargs – Forwarded to the constructor for BatchOperation.
Returns:

The created BatchOperation object.

Return type:

BatchOperation

class ievv_opensource.ievv_batchframework.models.BatchOperation(*args, **kwargs)[source]

Bases: django.db.models.base.Model

Defines a batch operation.

STATUS_UNPROCESSED = 'unprocessed'

One of the possible values for status. Defines the BatchOperation as uprocessed (not yet started). This only makes sense for background tasks. They will typically be created with the unprocessed status, and then set to STATUS_RUNNING when the batching service starts running the operation.

STATUS_RUNNING = 'running'

One of the possible values for status. Defines the BatchOperation as running (in progress).

STATUS_FINISHED = 'finished'

One of the possible values for status. Defines the BatchOperation as finished.

STATUS_CHOICES = [('unprocessed', 'unprocessed'), ('running', 'running'), ('finished', 'finished')]

Allowed values for status. Possible values are:

RESULT_NOT_AVAILABLE = 'not-available'

One of the possible values for result. This is used when we have no result yet (the operation is not finished).

RESULT_SUCCESSFUL = 'successful'

One of the possible values for result. Defines the BatchOperation as failed. This is set if the operation could not be completed because of an error. Any details about the result of the operation can be stored in output_data_json.

RESULT_FAILED = 'failed'

One of the possible values for result. Defines the BatchOperation as failed. This is set if the operation could not be completed because of an error. Any error message(s) should be stored in output_data_json.

RESULT_CHOICES = [('not-available', 'not available yet (processing not finished)'), ('successful', 'successful'), ('failed', 'failed')]

Allowed values for result. Possible values are:

started_by

The user that started this batch operation. Optional, but it is good metadata to add for debugging.

created_datetime

The datetime when this batch operation was created. Defaults to timezone.now().

started_running_datetime

The datetime when this batch operation started running. This is not the same as created_datetime, this is the time when the operation started processing.

finished_datetime

The datetime when this batch operation was finished.

context_content_type

The content type for context_object.

context_object_id

The id field for context_object.

context_object

Generic foreign key that identifies the context this operation runs in. This is optional.

operationtype

The type of operation. This is application specific - you typically use this if you allow multiple different batch operations on the same context_object. This is not required, and defaults to empty string.

status

The status of the operation. The allowed values for this field is documented in STATUS_CHOICES. Defaults to STATUS_UNPROCESSED.

result

The result of the operation. The allowed values for this field is documented in RESULT_CHOICES. Defaults to RESULT_NOT_AVAILABLE.

input_data_json

Input data for the BatchOperation. You should not use this directly, use the input_data property instead.

output_data_json

Output data for the BatchOperation. You should not use this directly, use the output_data property instead.

input_data

Decode BatchOperation.input_data_json and return the result.

Return None if input_data_json is empty.

output_data

Decode BatchOperation.output_data_json and return the result.

Returns:None if output_data_json is empty, or the decoded json data if the output_data is not empty.
Return type:object
mark_as_running()[source]

Mark the batch operation as running.

Sets the status to STATUS_RUNNING, started_running_datetime to the current datetime, clean and save.

finish(failed=False, output_data=None)[source]

Mark the bulk operation as finished.

Sets result as documented in the failed parameter below. Sets finished_datetime to the current datetime. Sets output_data_json as documented in the output_data parameter below.

Parameters:
  • failed (boolean) – Set this to False to set result to RESULT_FAILED. The default is True, which means that result is set to RESULT_SUCCESSFUL
  • output_data – The output data. A python object to set as the output data using the BatchOperation.output_data() property.
exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

clean()[source]

Hook for doing any extra model-wide validation after clean() has been called on every field by self.clean_fields. Any ValidationError raised by this method will not be associated with a particular field; it will have a special-case association with the field defined by NON_FIELD_ERRORS.