Source code for sqlalchemy_postgresql_audit.event_listeners.alembic

from alembic.operations.ops import ExecuteSQLOp


[docs]class ReversableExecute(ExecuteSQLOp):
[docs] def __init__(self, sqltext, reverse_ddl, execution_options=None): super(ReversableExecute, self).__init__(sqltext, execution_options) self._reverse_ddl = reverse_ddl self._execution_options = execution_options
[docs] def reverse(self): return ExecuteSQLOp(self._reverse_ddl, self._execution_options)
[docs]def compare_for_table( autogen_context, modify_table_ops, schema, tname, conn_table, metadata_table ): if metadata_table.info.get("audit.is_audit_table"): # Case when the audit table is new if conn_table is None: modify_table_ops.ops.append( ReversableExecute( sqltext=metadata_table.info["audit.target_table"].info[ "audit.create_ddl" ], reverse_ddl=metadata_table.info["audit.target_table"].info[ "audit.drop_ddl" ], # TODO: Use reflected DDL from the database ) ) # Case when the audit table exists already else: existing_audit_table_columns = set( [col.name for col in conn_table.columns.values()] ) defined_audit_table_columns = set( [col.name for col in metadata_table.columns.values()] ) # Case when the audit table has new columns. # TODO: Check if the settings types have changed. # Not 100% sure how this would be done. if existing_audit_table_columns.symmetric_difference( defined_audit_table_columns ): modify_table_ops.ops.append( ReversableExecute( sqltext=metadata_table.info["audit.target_table"].info[ "audit.create_ddl" ], reverse_ddl=metadata_table.info["audit.target_table"].info[ "audit.drop_ddl" ], # TODO: Use reflected DDL from the database ) ) # Case when the audit table triggers _should_ already be installed else: # TODO: Check to ensure that the triggers are installed pass elif not metadata_table.info.get("audit.is_audited"): # TODO: Drop the triggers if they exist. pass