Tech

Guru: Get meaningful audit information from journals

July 26, 2021

Paul Tuohhi

Journaling is an invaluable tool used for data recovery, data replication, commitment control, and of course auditing. However, getting audit information in an easy-to-use way can be tedious. This article introduces an audit table for any table / physical file and a stored procedure that creates a corresponding view that you can use to easily audit changes.

For example, if you want to audit the EMPLOYEE table (which uses the EMPLOYEE table from the standard Db2 sample database) and modify the SALARY column, use the following simple SQL statement.

select audit_type, jodate, jotime, jojob, jouser, jonbr, jopgm,
       old_empno, new_empno, old_salary, new_salary
  from AUDIT_EMPLOYEE_VIEW 
 where audit_type in ('INSERT', 'DELETE') or
       old_salary $lt;%gt; new_salary 
 order by joseqn;

The result set of results looks like this:

process

The audit process has three stages.

  • Define the required audit tables and views. You need to create an audit table for each journal table you want to audit. This step should only be repeated if the journal table definition changes (for example, if a new column is added to the table).
  • Copy the journal entry from the journal to the audit table and populate the audit table. This can be done hourly, weekly, monthly, or sporadically, as needed.
  • Perform the required audit analysis (as in the example above).

Put journal audit information in a table

You can use the View Journal (DSPJRN) command to output the journaled data to a database file. Because I am interested in auditing record changes, the command to copy all record journals from a journal for a particular table on a particular date is:

DSPJRN JRN(SQLSTAND/QSQJRN) FILE((SQLSTAND/EMPLOYEE)) RCVRNG(*CURCHAIN)
       FROMTIME('03/01/21') ENTTYP(*RCD) OUTPUT(*OUTFILE) 
       OUTFILE(QTEMP/JRNOUT1) 

The resulting table has a row for each journal entry. Each row contains a column of journal information (such as the date and time of the change, the job and program that made the change), and one column (JOESD) that contains an image of the entire row. The Journal Entry Type (JOENTT) column indicates whether the row image is an inserted, updated, pre-update, or post-update image.

The journal information column is fine, but the contents of the JOESD column are of little use as they are.

Creating audit tables and views

The audit table is a copy of the file generated by the DSPJRN command, with the JOESD columns replaced by the column definitions in the journal table.

The audit view is a view of the audit table that combines the previous and next images so that each row provides old and new images of the affected row. Deleted rows appear in the old column and inserted rows appear in the new column.

We recommend that you have separate schemas for your audit tables and views. This makes it easier to protect audit information and control whether data is included in backups as needed. This may not be necessary if the journal is saved.

You can write a stored procedure to create an audit table and view any table. This is an example of calling the MAKE_AUDIT_TABLE () stored procedure to create a view of the audit table and table EMPLOYEE in the schema SQLSTAND. The audit table will be AUDIT_EMPLOYEE in the schema AUDITLIB. The system name of the table will be AUDITEMP. The audit view is automatically named AUDIT_EMPLOYEE_VIEW — the name of the audit table to which _VIEW was added.

call make_audit_table(JOURNAL_TABLE_SCHEMA => 'SQLSTAND', 
                      JOURNAL_TABLE => 'EMPLOYEE', 
                      AUDIT_SCHEMA => 'AUDITLIB', 
                      AUDIT_TABLE => 'AUDIT_EMPLOYEE', 
                      AUDIT_TABLE_SYSTEM => 'AUDITEMP'); 

This is the SQL for creating the MAKE_AUDIT_TABLE () stored procedure. Copy it and paste it into “Run SQL Script” to run it. These are the main points to keep in mind in the procedure. See Callouts in your code.

  • Make a copy of the * OUTFILE template file (QSYS.QADSPJRN) in QTEMP and remove the JOESD column definition from the definition. The call to the QSYS2.override_qaqqini () stored procedure is to ignore the query message that stops dropping columns. If desired, there are four other template files that can be used to provide more detailed journal columns. If you need this information, simply replace all references in QSYS.QADSPJRN with the name of the alternate template (for example, QSYS.QADSPJRN2).
  • Use dynamic SQL to create an audit table that consists of all the columns of the table defined in (A) above and all the columns of the table to be audited. The audit table is effectively a file generated by the DSPJRN command, but the column JOESD definition is replaced with the definition of every column in the audited table.
  • Use the LISTAGG () aggregate function to get the list of columns needed for the audit view. For Journal columns, the view does not include JOESD (because it is replaced by a column in the audited table) and JORES (for padding and no information).
  • Create an audit view using dynamic SQL. Each row in the view consists of a journal column in the front (OLD_) column of the audited table and the back (NEW_) column of the audited table. The view is a join of three select statements in the audit table, one for changes, one for inserts, and one for deletes. The AUDIT_TYPE column indicates the change type.
  create or replace procedure make_audit_table
    (
      journal_table_schema varchar(256),
      journal_table        varchar(256),
      audit_schema         varchar(256),
      audit_table          varchar(256),
      audit_table_system   varchar(10) default ''
    )
      set option dbgview = *source,                        
                 commit  = *none 
    BEGIN
    
      declare audit_qualified         varchar(512);
      declare journal_table_qualified varchar(512);
      declare for_system_name         varchar(100);
      
      declare execute_statement varchar(32000);
      
      declare journal_columns     varchar(32000);
      declare journal_columns_old varchar(32000);
      declare old_name_select     varchar(32000);
      declare old_name_list       varchar(32000);
      declare new_name_select     varchar(32000);
      declare new_name_list       varchar(32000);
      declare default_list        varchar(32000);
      
      -- Ignore File not found when deleting temporary table
      declare CONTINUE HANDLER for SQLSTATE '42704'
        BEGIN
        END;
    
      -- Set qualified names
      set audit_qualified = audit_schema concat 
                            '.' concat 
                            audit_table;
      set journal_table_qualified = journal_table_schema concat 
                                    '.' concat 
                                    journal_table;
      
      -- Copy the *OUTFILE template to a work file in QTEMP and
      --  drop the JOESD (record image) column                              
 -- (A)
      drop table QTEMP.MYAUDIT;
      create table QTEMP.MYAUDIT like QSYS.QADSPJRN;
      call QSYS2.override_qaqqini(1, '', '');
      call QSYS2.override_qaqqini(2, 'SUPPRESS_INQUIRY_MESSAGES', '*YES');
      alter table QTEMP.MYAUDIT drop column JOESD;
      call QSYS2.override_qaqqini(3, '', '');
    
 -- (B)  
      -- Create the Audit table.
      --   This is the *OUTFILE template but JOESD is replaced with the
      --   definition of the columns in the journaled table
      if (audit_table_system <> '') then
        set for_system_name=" for system name " concat audit_table_system;
      end if;
      set execute_statement="create or replace table " concat audit_qualified concat 
         for_system_name concat ' as ( ' concat
         'select MYAUDIT.*,  ' concat journal_table concat '.* ' concat
         'from QTEMP.MYAUDIT cross join ' concat 
         journal_table_qualified concat ') WITH NO DATA';
    
      prepare execute_make_table from execute_statement;                 
      execute execute_make_table;  
      
      drop table QTEMP.MYAUDIT;
      
      -- Create the required Column Name lists for the Journal Entry Columns, 
      --  the OLD and NEW columns in the journaled table and a set of defaut
      --  values for the columns in the journaled table
-- (C) 
      select LISTAGG(COLUMN_NAME, ', ') 
               WITHIN GROUP(ORDER BY ordinal_position)
        into journal_columns 
        from QSYS2.SYSCOLUMNS 
       where (table_schema, table_name) = ('QSYS', 'QADSPJRN') 
         and column_name not in ('JORES', 'JOESD');
    
      select LISTAGG('OLD.' concat COLUMN_NAME, ', ') 
                WITHIN GROUP(ORDER BY ordinal_position)
        into journal_columns_old 
        from QSYS2.SYSCOLUMNS 
       where (table_schema, table_name) = ('QSYS', 'QADSPJRN') 
         and column_name not in ('JORES', 'JOESD');
    
      select LISTAGG('OLD.' concat COLUMN_NAME, ', ') 
                WITHIN GROUP(ORDER BY ordinal_position) 
        into old_name_select
        from QSYS2.SYSCOLUMNS 
      where (table_schema, table_name) 
          = (journal_table_schema, journal_table);
    
      select LISTAGG('NEW.' concat COLUMN_NAME, ', ') 
               WITHIN GROUP(ORDER BY ordinal_position)
        into new_name_select
        from QSYS2.SYSCOLUMNS 
      where (table_schema, table_name) 
          = (journal_table_schema, journal_table);               
    
      select LISTAGG('OLD_' concat COLUMN_NAME, ', ') 
               WITHIN GROUP(ORDER BY ordinal_position) 
        into old_name_list
        from QSYS2.SYSCOLUMNS 
      where (table_schema, table_name) 
          = (journal_table_schema, journal_table);
    
      select LISTAGG('NEW_' concat COLUMN_NAME, ', ') 
               WITHIN GROUP(ORDER BY ordinal_position)
        into new_name_list
        from QSYS2.SYSCOLUMNS 
      where (table_schema, table_name) 
          = (journal_table_schema, journal_table); 
      
      select LISTAGG(CASE
                       WHEN NUMERIC_SCALE IS NOT NULL THEN '0'
                       WHEN DATA_TYPE = 'DATE' THEN 'CURRENT_DATE'
                       WHEN DATA_TYPE = 'TIME' THEN 'CURRENT_TIME'
                       WHEN DATA_TYPE = 'TIMESTAMP' THEN 'CURRENT_TIMESTAMP'
                       ELSE ''''''
                     END, ', ') WITHIN GROUP(ORDER BY ordinal_position)
        into default_list 
        from QSYS2.SYSCOLUMNS 
       where (table_schema, table_name) 
           = (journal_table_schema, journal_table);
    
      -- Create the Audit View over the Audit Table
      --  Each row in the view contains:
      --  - an AUDIT_TYPE column (contains CHANGE, INSERT or DELETE)
      --  - the Journal entry columns
      --  - the values of the Before Change/Delete columns 
      --  - the values of the After Change/Insert columns     
      set execute_statement="create or replace view " concat audit_schema concat 
         '.' concat audit_table concat '_view ' concat
         '( AUDIT_TYPE, ' concat journal_columns concat ', ' concat
         old_name_list concat ', ' concat new_name_list concat 
         ') as select ''CHANGE'' as AUDIT_TYPE, ' concat
         journal_columns_old concat ', ' concat
         old_name_select concat ', ' concat
         new_name_select concat ' from ' concat 
         audit_qualified concat ' OLD inner join ' concat
         audit_qualified concat ' NEW on ' concat
         '(OLD.JODATE, OLD.JOTIME, OLD.JOJOB, OLD.JOUSER, OLD.JONBR, ' concat 
         'OLD.JOPGM, OLD.JOOBJ, OLD.JOLIB, OLD.JOMBR, OLD.JOCTRR) = ' concat
         '(NEW.JODATE, NEW.JOTIME, NEW.JOJOB, NEW.JOUSER, NEW.JONBR, ' concat
         'NEW.JOPGM, NEW.JOOBJ, NEW.JOLIB, NEW.JOMBR, NEW.JOCTRR) ' concat
         'where (OLD.JOENTT, NEW.JOENTT) = (''UB'', ''UP'') or ' concat
         '(OLD.JOENTT, NEW.JOENTT) = (''BR'', ''UR'') ' concat
         'UNION select ''INSERT'' as AUDIT_TYPE, ' concat
         journal_columns concat ', ' concat
         default_list concat ', ' concat
         old_name_select concat ' from ' concat  
         audit_qualified concat ' OLD ' concat
         'where JOENTT IN (''PT'', ''PX'') '  concat
         'UNION select ''DELETE'' as AUDIT_TYPE, ' concat
         journal_columns concat ', ' concat
         old_name_select concat ', ' concat
         default_list concat ' from ' concat  
         audit_qualified concat ' OLD ' concat
         'where JOENTT IN (''DL'', ''DR'') '
         ;
    
      prepare execute_make_view from execute_statement;                 
      execute execute_make_view;  
      
                   
    END; 

Input to audit table

Getting information from the journal to the audit table is a two-step process. Use the View Journals (DSPJRN) command to write the required journals to the output file, and then use the Copy File (CPYF) command to copy the output file. Audit table.

This is an example of how to do this by running a SQL script. Please note the following about CPYF:

The TOFILE parameter uses the system name of the audit table.

The value of the Format Option (FMTOPT) parameter is * NOCHK. This means that the value of the JOESD column (record image) will be copied / overlaid on all columns of the audited table (defined in the audit table) — performing a byte-by-byte copy. ..

CL: DSPJRN JRN(SQLSTAND/QSQJRN) FILE((SQLSTAND/EMPLOYEE)) RCVRNG(*CURCHAIN)
           FROMTIME('03/01/21') ENTTYP(*RCD) 
           OUTPUT(*OUTFILE) OUTFILE(QTEMP/JRNOUT1);                                                                                       
CL: CPYF FROMFILE(QTEMP/JRNOUT1) TOFILE(AUDITLIB/AUDITEMP) MBROPT(*REPLACE)
         FMTOPT(*NOCHK) ;              

In a production environment, this is done in a scheduled CL program and the start date and time is set programmatically as needed. This CL program also runs the required audit program that you have created.

However, as in the previous example, we now have a view that can be easily queried.

Other considerations

Some items you may want to consider:

  • Instead of using existing column names for journal columns (JODATE, JOTIME, etc.), it provides a hard-coded list of more meaningful names in the view. This list replaces the use of journal_columns in the view’s column list.
  • Use the value in the COLUMN_DEFAULT column (in SYSCOLUMNS) instead of using CASE to determine the default value. The reason for using CASE is to avoid null values.
  • If you are auditing a “busy” table, it is recommended that you perform extraction / auditing more frequently rather than copying large amounts of data from the journal. That is, copy in small chunks.
  • Examine what the DSPJRN command provides for other output file formats to determine if any of the additional information is useful. This example uses the default blue * TYPE1.

I hope this stored procedure is useful.

Related story

journal Forensics 101

Guru: Get meaningful audit information from journals

Source link Guru: Get meaningful audit information from journals

Related Articles

Back to top button