8i | 9i | 10g | 11g | 12c | 13c | 18c | 19c | 21c | 23c | Misc | PL/SQL | SQL | RAC | WebLogic | Linux

Home » Articles » 9i » Here

Oracle Streams

Oracle Streams are a generic mechanism for sharing data which can be used as the basis of many processes including messaging, replication and warehouse ETL processes. They are an extension of a number of existing technologies including Advanced Queuing, LogMiner and Job Scheduling. This article presents a simple replication scenario as an example of their use.

Basic Architecture

The processing of streams is divided into three main processes (Capture, Staging and Apply):

Both the capture and apply processes can use table, schema and database level rules to determine their actions.

Instance Setup

In order to begin the following parameters should be set in the spfiles of participating databases.

ALTER SYSTEM SET JOB_QUEUE_PROCESSES=1;
ALTER SYSTEM SET AQ_TM_PROCESSES=1;
ALTER SYSTEM SET GLOBAL_NAMES=TRUE;
ALTER SYSTEM SET COMPATIBLE='9.2.0' SCOPE=SPFILE;
ALTER SYSTEM SET LOG_PARALLELISM=1 SCOPE=SPFILE;
SHUTDOWN IMMEDIATE;
STARTUP;

In addition, any databases involved in capture (DBA1) must be in ARCHIVELOG mode.

Stream Administrator Setup

Next we create a stream administrator, a stream queue table and a database link on the source database.

CONN sys/password@DBA1 AS SYSDBA

CREATE USER strmadmin IDENTIFIED BY strmadminpw
DEFAULT TABLESPACE users QUOTA UNLIMITED ON users;

GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE TO strmadmin;

GRANT EXECUTE ON DBMS_AQADM            TO strmadmin;
GRANT EXECUTE ON DBMS_CAPTURE_ADM      TO strmadmin;
GRANT EXECUTE ON DBMS_PROPAGATION_ADM  TO strmadmin;
GRANT EXECUTE ON DBMS_STREAMS_ADM      TO strmadmin;
GRANT EXECUTE ON DBMS_APPLY_ADM        TO strmadmin;
GRANT EXECUTE ON DBMS_FLASHBACK        TO strmadmin;

BEGIN 
  DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, 
    grantee      => 'strmadmin', 
    grant_option => FALSE);
END;
/

BEGIN 
  DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.CREATE_RULE_OBJ, 
    grantee      => 'strmadmin', 
    grant_option => FALSE);
END;
/

CONNECT strmadmin/strmadminpw@DBA1
EXEC DBMS_STREAMS_ADM.SET_UP_QUEUE();

CREATE DATABASE LINK dba2 CONNECT TO strmadmin IDENTIFIED BY strmadminpw USING 'DBA2';

This process must be repeated on the destination database (DBA2). The reverse database link is not necessary in this example but the following grant must be added.

GRANT ALL ON scott.dept TO strmadmin;

LogMiner Tablespace Setup

Next we create a new tablespace to hold the logminer tables on the source database.

CONN sys/password@DBA1 AS SYSDBA

CREATE TABLESPACE logmnr_ts DATAFILE '/u01/app/oracle/oradata/DBA1/logmnr01.dbf' 
  SIZE 25 M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
   
EXECUTE DBMS_LOGMNR_D.SET_TABLESPACE('logmnr_ts');

Supplemental Logging

The apply process requires additional information for some actions so we must configure suplimental logging of primary key information for tables of interest.

CONN sys/password@DBA1 AS SYSDBA
ALTER TABLE scott.dept ADD SUPPLEMENTAL LOG GROUP log_group_dept_pk (deptno) ALWAYS;

Configure Propagation Process

Configure the propagation process on DBA1.

CONNECT strmadmin/strmadminpw@DBA1
BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
    table_name              => 'scott.dept', 
    streams_name            => 'dba1_to_dba2', 
    source_queue_name       => 'strmadmin.streams_queue',
    destination_queue_name  => 'strmadmin.streams_queue@dba2',
    include_dml             =>  true,
    include_ddl             =>  true,
    source_database         => 'dba1');
END;
/

The propagation is performed using a job which can be monitored.

SELECT job,
       TO_CHAR(last_date, 'DD-Mon-YYYY HH24:MI:SS') last_date,
       TO_CHAR(next_date, 'DD-Mon-YYYY HH24:MI:SS') next_date,
       what
FROM   dba_jobs;

Configure Capture Process

Configure the capture process on DBA1.

CONNECT strmadmin/strmadminpw@DBA1
BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name     => 'scott.dept',   
    streams_type   => 'capture',
    streams_name   => 'capture_simp',
    queue_name     => 'strmadmin.streams_queue',
    include_dml    =>  true,
    include_ddl    =>  true);
END;
/

Configure Instantiation SCN

The instantiation SCN of the source table must be configured in the destination table before the apply process will work. If the destination table is already present this can be accomplished using a metadata only export/import.

exp userid=scott/tiger@dba1 FILE=dept_instant.dmp TABLES=dept OBJECT_CONSISTENT=y ROWS=n
imp userid=scott/tiger@dba2 FILE=dept_instant.dmp IGNORE=y COMMIT=y LOG=import.log STREAMS_INSTANTIATION=y

During the transfer of the meta information the supplematal logging was also transferred. Since no capture is done on DBA2 this can be removed.

CONN sys/password@DBA2 AS SYSDBA
ALTER TABLE scott.dept DROP SUPPLEMENTAL LOG GROUP log_group_dept_pk;

Alternatively the instantiation SCN can be set using the DBMS_APPLY_ADM package.

CONNECT strmadmin/strmadminpw@dba1
DECLARE
  v_scn  NUMBER;
BEGIN
  v_scn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
  DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@DBA2(
    source_object_name    => 'scott.dept',
    source_database_name  => 'dba1',
    instantiation_scn     => v_scn);
END;
/

Configure Apply Process

Configure the apply process on the destination database (DBA2).

CONNECT strmadmin/strmadminpw@DBA2
BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name      => 'scott.dept',
    streams_type    => 'apply', 
    streams_name    => 'apply_simp',
    queue_name      => 'strmadmin.streams_queue',
    include_dml     =>  true,
    include_ddl     =>  true,
    source_database => 'dba1');
END;
/

Start Apply Process

Start the apply process on destination database (DBA2) and prevent errors stopping the process.

CONNECT strmadmin/strmadminpw@DBA2
BEGIN
  DBMS_APPLY_ADM.SET_PARAMETER(
    apply_name  => 'apply_simp', 
    parameter   => 'disable_on_error', 
    value       => 'n');

  DBMS_APPLY_ADM.START_APPLY(
    apply_name  => 'apply_simp');
END;
/

Start Capture Process

Start the capture process on the source database (DBA1).

CONNECT strmadmin/strmadminpw@DBA1
BEGIN
  DBMS_CAPTURE_ADM.START_CAPTURE(
    capture_name  => 'capture_simp');
END;
/

Test It

With the streams activated we can see that DML changes to the source table are visible in the destination table.

CONNECT scott/tiger@dba1
INSERT INTO dept (deptno, dname, loc) VALUES (99, 'Test Dept', 'UK');
COMMIT;

SELECT * FROM dept;

    DEPTNO DNAME          LOC
---------- -------------- -------------
        10 ACCOUNTING     NEW YORK
        20 RESEARCH       DALLAS
        30 SALES          CHICAGO
        40 OPERATIONS     BOSTON
        99 Test Dept      UK

5 rows selected.

CONNECT scott/tiger@dba2
SELECT * FROM dept;

    DEPTNO DNAME          LOC
---------- -------------- -------------
        10 ACCOUNTING     NEW YORK
        20 RESEARCH       DALLAS
        30 SALES          CHICAGO
        40 OPERATIONS     BOSTON
        99 Test Dept      UK

5 rows selected.

We can also see that DDL changes to the source table are reflected in the destination table.

CONNECT scott/tiger@dba1
ALTER TABLE dept ADD (
  new_col NUMBER(10)
)
/
DESC dept

 Name                         Null?    Type
 ---------------------------- -------- --------------
 DEPTNO                       NOT NULL NUMBER(2)
 DNAME                                 VARCHAR2(14)
 LOC                                   VARCHAR2(13)
 NEW_COL                               NUMBER(10)

CONNECT scott/tiger@dba2
DESC dept

 Name                         Null?    Type
 ---------------------------- -------- --------------
 DEPTNO                       NOT NULL NUMBER(2)
 DNAME                                 VARCHAR2(14)
 LOC                                   VARCHAR2(13)
 NEW_COL                               NUMBER(10)

The contents of the streams can be viewed.

SELECT s.user_data.getTypeName()
FROM   streams_queue_table s;

SET SERVEROUTPUT ON
DECLARE
  v_anydata  SYS.ANYDATA;
  v_lcr      SYS.LCR$_ROW_RECORD;  
  v_row_list SYS.LCR$_ROW_LIST;
  v_result   PLS_INTEGER;
BEGIN
  
  SELECT user_data
  INTO   v_anydata
  FROM   strmadmin.streams_queue_table
  WHERE  rownum < 2;
  
  v_result := ANYDATA.GetObject(
                self  => v_anydata,
                obj   => v_lcr);
                
  DBMS_OUTPUT.PUT_LINE('Command Type         : ' || v_lcr.Get_Command_Type);
  DBMS_OUTPUT.PUT_LINE('Object Owner         : ' || v_lcr.Get_Object_Owner);
  DBMS_OUTPUT.PUT_LINE('Object Name          : ' || v_lcr.Get_Object_Name);
  DBMS_OUTPUT.PUT_LINE('Source Database Name : ' || v_lcr.Get_Source_Database_Name);
END;
/

Clean Up

All rules can be identified and removed using the following statements.

BEGIN
  FOR cur_rec IN (SELECT rule_owner, rule_name FROM dba_rules) LOOP
    DBMS_RULE_ADM.DROP_RULE(
      rule_name => cur_rec.rule_owner || '.' || cur_rec.rule_name,
      force => TRUE);
  END LOOP;
END;
/

All capture and apply processes can be identified, stopped and dropped.

BEGIN
  FOR cur_rec IN (SELECT capture_name FROM dba_capture) LOOP
    DBMS_CAPTURE_ADM.STOP_CAPTURE(
      capture_name => cur_rec.capture_name);
    DBMS_CAPTURE_ADM.DROP_CAPTURE(
      capture_name => cur_rec.capture_name);
  END LOOP;

  FOR cur_rec IN (SELECT apply_name FROM dba_apply) LOOP
    DBMS_APPLY_ADM.STOP_APPLY(
      apply_name => cur_rec.apply_name);
    DBMS_APPLY_ADM.DROP_APPLY(
      apply_name => cur_rec.apply_name);
  END LOOP;
END;
/

All streams information relating to a specific object can be purged.

BEGIN
  DBMS_STREAMS_ADM.PURGE_SOURCE_CATALOG(
    source_database        => 'dba1',
    source_object_name     => 'scott.dept',
    source_object_type     => 'TABLE');
END;
/

For more information see:

Hope this helps. Regards Tim...

Back to the Top.