Source code for sqlalchemy_postgresql_audit.ddl

from .templates import make_audit_procedure, make_drop_audit_procedure


[docs]def get_audit_spec(table): audit_spec = table.info.get("audit.options", {"enabled": False}) audit_spec["schema"] = audit_spec.get("schema_name", table.schema) audit_spec["session_settings"] = audit_spec.get("session_settings", []) return audit_spec
[docs]def get_create_trigger_ddl( target_columns, audit_columns, function_name, trigger_name, table_full_name, audit_table_full_name, session_settings=None, ): session_settings = session_settings or [] deletion_elements = ["'D'", "now()", "current_user"] updation_elements = ["'U'", "now()", "current_user"] insertion_elements = ["'I'", "now()", "current_user"] setting_map = { session_setting.name: session_setting for session_setting in session_settings } column_elements = [] check_settings = [] for col in audit_columns.values(): # We need to make sure to explicitly reference all elements in the procedure column_elements.append(col.name) # If this value is coming out of the target, then we want to explicitly reference the value if col.name in target_columns: deletion_elements.append("OLD.{}".format(col.name)) updation_elements.append("NEW.{}".format(col.name)) insertion_elements.append("NEW.{}".format(col.name)) # If it is not, it is either a default "audit_*" column # or it is one of our session settings values else: if col.name in ( "audit_operation", "audit_operation_timestamp", "audit_current_user", ): continue session_setting = setting_map[col.name] type_str = session_setting.type.compile() name = session_setting.name.split("audit_", 1)[-1] session_settings_element = "current_setting('audit.{}', {})::{}".format( name, "true" if session_setting.nullable else "false", type_str ) deletion_elements.append(session_settings_element) updation_elements.append(session_settings_element) insertion_elements.append(session_settings_element) # This handles a kind of strange behavior where if you set a session setting # and then commit the transaction you will end up with an empty string in that setting # and then the procedure will succeed, despite the value being "empty". if not session_setting.nullable: check_settings.append( "IF {}::VARCHAR = '' THEN RAISE EXCEPTION " "'audit.{} session setting must be set to a non null/empty value'; " "END IF;".format(session_settings_element, name) ) return make_audit_procedure( audit_table_full_name=audit_table_full_name, table_full_name=table_full_name, procedure_name=function_name, trigger_name=trigger_name, deletion_elements=deletion_elements, updation_elements=updation_elements, insertion_elements=insertion_elements, audit_columns=column_elements, check_settings=check_settings, )
[docs]def get_drop_trigger_ddl(function_name, trigger_name, table_full_name): return make_drop_audit_procedure(function_name, trigger_name, table_full_name)
[docs]def install_audit_triggers(metadata, engine=None): """Installs all audit triggers. This can be used after calling `metadata.create_all()` to create all the procedures and triggers. :param metadata: A :class:`sqlalchemy.sql.schema.MetaData` :param engine: A :class:`sqlalchemy.engine.Engine` or None :return: None or a :class:`str` for the DDL needed to install all audit triggers. """ audit_table_ddl = [ t.info["audit.create_ddl"] for t in metadata.tables.values() if t.info.get("audit.is_audited") ] engine = engine or metadata.bind if engine: for ddl in audit_table_ddl: engine.execute(ddl) else: return "; ".join(audit_table_ddl)
[docs]def uninstall_audit_triggers(metadata, engine=None): """Uninstalls all audit triggers. This can be used to remove all audit triggers. :param metadata: A :class:`sqlalchemy.sql.schema.MetaData` :param engine: A :class:`sqlalchemy.engine.Engine` or None :return: None or a :class:`str` for the DDL needed to uninstall all audit triggers. """ audit_table_ddl = [ t.info["audit.drop_ddl"] for t in metadata.tables.values() if t.info.get("audit.is_audited") ] engine = engine or metadata.bind if engine: for ddl in audit_table_ddl: engine.execute(ddl) else: return ";\n ".join(audit_table_ddl)