Source code for ievv_opensource.ievv_customsql.customsql_registry

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import inspect
import os
import re
from collections import OrderedDict

import htmls
from django import template
from django.db import connection
from ievv_opensource.utils.singleton import Singleton


[docs]class AbstractCustomSql(object): """ Defines custom SQL that can be executed by the ``ievv_customsql`` framework. You typically override :meth:`.initialize` and use :meth:`.execute_sql` to add triggers and functions, and override :meth:`.recreate_data` to rebuild the data maintained by the triggers, but many other use-cases are also possible. """ def __init__(self, appname=None): """ Args: appname: Not required - it is added automatically by :class:`.Registry`, and used by :meth:`.__str__`` for easier debugging / prettier output. """ self.appname = appname def execute_sql(self, sql): """ Execute the provided SQL. Args: sql (str): String of SQL. """ cursor = connection.cursor() cursor.execute(sql) def execute_sql_multiple(self, sql_iterable): for sql in sql_iterable: self.execute_sql(sql) def make_sql_file_path(self, path): this_file_path = inspect.getfile(self.__class__) sqldirectory = os.path.join( os.path.dirname(inspect.getfile(self.__class__)), '{}_sqlcode'.format(os.path.splitext(this_file_path)[0])) return os.path.join(sqldirectory, path) def get_sql_from_file(self, path): """ Get SQL from a file as a string. Args: path (str): A path relative to a directory named the same as the module where this class is located without the filename extension and suffixed with ``_sqlcode``. So if the file with this class is in ``path/to/customsql.py``, path will be relative to ``path/to/customsqlcode/``. """ with open(self.make_sql_file_path(path), 'rb') as f: sql = f.read().decode('utf-8') return sql def execute_sql_from_file(self, path): """ Execute SQL from the provided file. Args: path: See :meth:`.get_sql_from_file`. """ self.execute_sql(self.get_sql_from_file(path)) def execute_sql_from_files(self, paths): """ Shortcut for calling :meth:`.execute_sql_from_file` with multiple files. Calls :meth:`.execute_sql_from_file` once for each file in ``paths``. Args: paths (list): A list of paths. See :meth:`.get_sql_from_file` for the format of each path. """ for path in paths: self.execute_sql_from_file(path=path) def render_sql_from_template(self, path, context_data): template_sql = self.get_sql_from_file(path) context_data = context_data or {} djangotemplate = template.Template(template_sql) return djangotemplate.render(template.Context(context_data)) def execute_sql_from_template_file(self, path, context_data=None): """ Execute SQL from the provided Django template file. The SQL is retrieved from the file, then processed as a Django template with the provided ``context_data``, and the result is executed using :meth:`.execute_sql`. Args: path: See :meth:`.get_sql_from_file`. context_data (dict): Template context data. Can be ``None``. """ self.execute_sql(self.render_sql_from_template(path=path, context_data=context_data)) def execute_sql_from_template_files(self, paths, context_data=None): """ Shortcut for calling :meth:`.execute_sql_from_template_file` with multiple files. Calls :meth:`.execute_sql_from_template_file` once for each file in ``paths``. Args: paths (list): A list of paths. See :meth:`.get_sql_from_file` for the format of each path. context_data (dict): Forwarded to :meth:`.execute_sql_from_template_file`. """ for path in paths: self.execute_sql_from_template_file(path=path, context_data=context_data) def initialize(self): """ Code to initialize the custom SQL. You should create triggers, functions, columns, indexes, etc. in this method, using :meth:`.execute_sql`, or using plain Django code. Make sure to write everything in a manner that updates or creates everything in a self-contained manner. This method is called both for the first initialization, and to update code after updates/changes. Must be overridden in subclasses. """ raise NotImplementedError() def _normalize_whitespace(self, sql): return re.sub(r'\s+', ' ', sql).strip() CREATE_TRIGGER_PATTERN = re.compile(r'^\s*CREATE\s+TRIGGER\s+' r'(?P<trigger_name>[a-zA-Z0-9_]+)\s+' r'(?:BEFORE|AFTER|INSTEAD\s+OF)\s+' r'(?:[^\s]+\s+OR\s+)*(?:[^\s]+)\s+' r'ON\s+(?P<table_name>[a-zA-Z0-9_]+)', re.IGNORECASE | re.MULTILINE) def make_drop_trigger_statements_from_sql_code(self, sql): matches = self.CREATE_TRIGGER_PATTERN.finditer(sql) drop_statements = [] for match in matches: drop_statement = 'DROP TRIGGER IF EXISTS {trigger_name} ON {table_name}'.format( **match.groupdict()) drop_statements.append(drop_statement) return drop_statements CREATE_FUNCTION_PATTERN = re.compile(r'^\s*CREATE\s+(?:OR\s+REPLACE\s+)?FUNCTION\s+' r'(?P<function_name>[a-zA-Z0-9_]+)\s*' r'\((?P<arguments>[^)]*)\)', re.IGNORECASE | re.MULTILINE) def make_drop_function_statements_from_sql_code(self, sql): matches = self.CREATE_FUNCTION_PATTERN.finditer(sql) drop_statements = [] for match in matches: function_name = match.groupdict()['function_name'] arguments = self._normalize_whitespace(match.groupdict()['arguments']) drop_statement = 'DROP FUNCTION IF EXISTS {function_name} ({arguments}) RESTRICT'.format( function_name=function_name, arguments=arguments) drop_statements.append(drop_statement) return drop_statements CREATE_INDEX_PATTERN = re.compile(r'^\s*CREATE\s+(?:UNIQUE\s+)?INDEX\s+' r'(?:CONCURRENTLY\s+)?(?:IF\s+NOT\s+EXISTS\s+)?' r'(?P<index_name>[a-zA-Z0-9_]+)\s*ON', re.IGNORECASE | re.MULTILINE) def make_drop_index_statements_from_sql_code(self, sql): matches = self.CREATE_INDEX_PATTERN.finditer(sql) drop_statements = [] for match in matches: drop_statement = 'DROP INDEX IF EXISTS {index_name} RESTRICT'.format( **match.groupdict()) drop_statements.append(drop_statement) return drop_statements CREATE_TYPE_PATTERN = re.compile(r'^\s*CREATE\s+TYPE\s+' r'(?P<type_name>[a-zA-Z0-9_]+)', re.IGNORECASE | re.MULTILINE) def make_drop_type_statements_from_sql_code(self, sql): matches = self.CREATE_TYPE_PATTERN.finditer(sql) drop_statements = [] for match in matches: drop_statement = 'DROP TYPE IF EXISTS {type_name} RESTRICT'.format( **match.groupdict()) drop_statements.append(drop_statement) return drop_statements def make_drop_statements_from_sql_code( self, sql, sql_types=('index', 'type', 'function', 'trigger')): output_sql = [] for sql_type in sql_types: methodname = 'make_drop_{}_statements_from_sql_code'.format(sql_type) output_sql.extend(getattr(self, methodname)(sql)) return output_sql def make_drop_statements_from_sql_file(self, path): return self.make_drop_statements_from_sql_code(self.get_sql_from_file(path)) def make_drop_statements_from_sql_files(self, paths): sql_statements = [] for path in paths: sql_statements.extend(self.make_drop_statements_from_sql_file(path)) return sql_statements def make_drop_statements_from_sql_template_file(self, path, context_data): sql = self.render_sql_from_template(path=path, context_data=context_data) return self.make_drop_statements_from_sql_code(sql) def make_drop_statements_from_sql_template_files(self, paths, context_data): sql_statements = [] for path in paths: sql = self.render_sql_from_template(path=path, context_data=context_data) sql_statements.extend(self.make_drop_statements_from_sql_code(sql)) return sql_statements def clear(self): """ Revert :meth:`.initialize` and remove any data created by the triggers. Drop/delete triggers, functions, columns, indexes, etc. created in :meth:`.initialize`. """ raise NotImplementedError() def recreate_data(self): """ Recreate all data that any triggers created in :meth:`.initialize` would normally keep in sync automatically. Can not be used unless :meth:`.initialize` has already be run (at some point). This restriction is here to make it possible to create SQL functions in :meth:`.initialize` that this method uses to recreate the data. Without this restriction, code-reuse between :meth:`.initialize` and this function would be very difficult. """ pass def run(self): """ Run both :meth:`.initialize` and :meth:`.recreate_data`. """ self.initialize() self.recreate_data() def __str__(self): return '{} in {}'.format(self.__class__.__name__, self.appname)
[docs]class Registry(Singleton): """ Registry of :class:`.AbstractCustomSql` objects. Examples: First, define a subclass of :class:`.AbstractCustomSql`. Register the custom SQL class with the registry via an AppConfig for your Django app:: from django.apps import AppConfig from ievv_opensource.ievv_customsql import customsql_registry from myapp import customsql class MyAppConfig(AppConfig): name = 'myapp' def ready(self): customsql_registry.Registry.get_instance().add(customsql.MyCustomSql) See ``ievv_opensource/demo/customsql/apps.py`` for a complete demo. """ def __init__(self): super(Registry, self).__init__() self._customsql_classes = [] self._customsql_classes_by_appname_map = OrderedDict() def add(self, appname, customsql_class): """ Add the given ``customsql_class`` to the registry. Parameters: appname: The django appname where the ``customsql_class`` belongs. customsql_class: A subclass of :class:`.AbstractCustomSql`. """ if customsql_class in self._customsql_classes: raise ValueError('{}.{} is already in the custom SQL registry.'.format( customsql_class.__module__, customsql_class.__name__)) self._customsql_classes.append(customsql_class) if appname not in self._customsql_classes_by_appname_map: self._customsql_classes_by_appname_map[appname] = [] self._customsql_classes_by_appname_map[appname].append(customsql_class) def remove(self, appname, customsql_class): self._customsql_classes.remove(customsql_class) self._customsql_classes_by_appname_map[appname].remove(customsql_class) if len(self._customsql_classes_by_appname_map[appname]) == 0: del self._customsql_classes_by_appname_map[appname] def __contains__(self, customsql_class): """ Returns ``True`` if the provided customsql_class is in the registry. Parameters: customsql_class: A subclass of :class:`.AbstractCustomSql`. """ return customsql_class in self._customsql_classes def __iter__(self): """ Iterate over all :class:`.AbstractCustomSql` subclasses registered in the registry. The yielded values are objects of the classes initialized with no arguments. """ for appname in self.iter_appnames(): for customsql in self.iter_customsql_in_app(appname): yield customsql def iter_appnames(self): """ Returns an iterator over all the appnames in the registry. Each item in the iterator is an appname (a string). """ return iter(self._customsql_classes_by_appname_map.keys()) def iter_customsql_in_app(self, appname): """ Iterate over all :class:`.AbstractCustomSql` subclasses registered in the provided appname. The yielded values are objects of the classes initialized with no arguments. """ for customsql_class in self._customsql_classes_by_appname_map[appname]: yield customsql_class(appname) def run_all_in_app(self, appname): """ Loops through all the :class:`.AbstractCustomSql` classes registered in the registry with the provided ``appname``, and call :meth:`.AbstractCustomSql.run` for each of them. """ for customsql in self.iter_customsql_in_app(appname): customsql.run() def run_all(self): """ Loops through all the :class:`.AbstractCustomSql` classes in the registry, and call :meth:`.AbstractCustomSql.run` for each of them. """ for customsql in self: customsql.run()
[docs]class MockableRegistry(Registry): """ A non-singleton version of :class:`.Registry`. For tests. Typical usage in a test:: from ievv_opensource.ievv_customsql import customsql_registry class MockCustomSql(customsql_registry.AbstractCustomSql): # ... mockregistry = customsql_registry.MockableRegistry() mockregistry.add(MockCustomSql()) with mock.patch('ievv_opensource.ievv_customsql.customsql_registry.Registry.get_instance', lambda: mockregistry): pass # ... your code here ... """ def __init__(self): self._instance = None # Ensure the singleton-check is not triggered super(MockableRegistry, self).__init__()