Oracle Streams are a generic mechanism for sharing data which can be used as the basis of many processes including messaging, replication and warehouse ETL processes. They are an extension of a number of existing technologies including Advanced Queuing, LogMiner and Job Scheduling. This article presents a simple replication scenario as an example of their use.
The processing of streams is divided into three main processes (Capture, Staging and Apply):
SYS.AnyData
. LCRs can be propogated
between a source and destination staging area in different databases if necessary. Propagation is scheduled using job queues.Both the capture and apply processes can use table, schema and database level rules to determine their actions.
In order to begin the following parameters should be set in the spfiles of participating databases.
ALTER SYSTEM SET JOB_QUEUE_PROCESSES=1; ALTER SYSTEM SET AQ_TM_PROCESSES=1; ALTER SYSTEM SET GLOBAL_NAMES=TRUE; ALTER SYSTEM SET COMPATIBLE='9.2.0' SCOPE=SPFILE; ALTER SYSTEM SET LOG_PARALLELISM=1 SCOPE=SPFILE; SHUTDOWN IMMEDIATE; STARTUP;
In addition, any databases involved in capture (DBA1) must be in ARCHIVELOG
mode.
Next we create a stream administrator, a stream queue table and a database link on the source database.
CONN sys/password@DBA1 AS SYSDBA CREATE USER strmadmin IDENTIFIED BY strmadminpw DEFAULT TABLESPACE users QUOTA UNLIMITED ON users; GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE TO strmadmin; GRANT EXECUTE ON DBMS_AQADM TO strmadmin; GRANT EXECUTE ON DBMS_CAPTURE_ADM TO strmadmin; GRANT EXECUTE ON DBMS_PROPAGATION_ADM TO strmadmin; GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin; GRANT EXECUTE ON DBMS_APPLY_ADM TO strmadmin; GRANT EXECUTE ON DBMS_FLASHBACK TO strmadmin; BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, grantee => 'strmadmin', grant_option => FALSE); END; / BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ, grantee => 'strmadmin', grant_option => FALSE); END; / CONNECT strmadmin/strmadminpw@DBA1 EXEC DBMS_STREAMS_ADM.SET_UP_QUEUE(); CREATE DATABASE LINK dba2 CONNECT TO strmadmin IDENTIFIED BY strmadminpw USING 'DBA2';
This process must be repeated on the destination database (DBA2). The reverse database link is not necessary in this example but the following grant must be added.
GRANT ALL ON scott.dept TO strmadmin;
Next we create a new tablespace to hold the logminer tables on the source database.
CONN sys/password@DBA1 AS SYSDBA CREATE TABLESPACE logmnr_ts DATAFILE '/u01/app/oracle/oradata/DBA1/logmnr01.dbf' SIZE 25 M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; EXECUTE DBMS_LOGMNR_D.SET_TABLESPACE('logmnr_ts');
The apply process requires additional information for some actions so we must configure suplimental logging of primary key information for tables of interest.
CONN sys/password@DBA1 AS SYSDBA ALTER TABLE scott.dept ADD SUPPLEMENTAL LOG GROUP log_group_dept_pk (deptno) ALWAYS;
Configure the propagation process on DBA1.
CONNECT strmadmin/strmadminpw@DBA1 BEGIN DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES( table_name => 'scott.dept', streams_name => 'dba1_to_dba2', source_queue_name => 'strmadmin.streams_queue', destination_queue_name => 'strmadmin.streams_queue@dba2', include_dml => true, include_ddl => true, source_database => 'dba1'); END; /
The propagation is performed using a job which can be monitored.
SELECT job, TO_CHAR(last_date, 'DD-Mon-YYYY HH24:MI:SS') last_date, TO_CHAR(next_date, 'DD-Mon-YYYY HH24:MI:SS') next_date, what FROM dba_jobs;
Configure the capture process on DBA1.
CONNECT strmadmin/strmadminpw@DBA1 BEGIN DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'scott.dept', streams_type => 'capture', streams_name => 'capture_simp', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => true); END; /
The instantiation SCN of the source table must be configured in the destination table before the apply process will work. If the destination table is already present this can be accomplished using a metadata only export/import.
exp userid=scott/tiger@dba1 FILE=dept_instant.dmp TABLES=dept OBJECT_CONSISTENT=y ROWS=n imp userid=scott/tiger@dba2 FILE=dept_instant.dmp IGNORE=y COMMIT=y LOG=import.log STREAMS_INSTANTIATION=y
During the transfer of the meta information the supplematal logging was also transferred. Since no capture is done on DBA2 this can be removed.
CONN sys/password@DBA2 AS SYSDBA ALTER TABLE scott.dept DROP SUPPLEMENTAL LOG GROUP log_group_dept_pk;
Alternatively the instantiation SCN can be set using the DBMS_APPLY_ADM
package.
CONNECT strmadmin/strmadminpw@dba1 DECLARE v_scn NUMBER; BEGIN v_scn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER(); DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@DBA2( source_object_name => 'scott.dept', source_database_name => 'dba1', instantiation_scn => v_scn); END; /
Configure the apply process on the destination database (DBA2).
CONNECT strmadmin/strmadminpw@DBA2 BEGIN DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'scott.dept', streams_type => 'apply', streams_name => 'apply_simp', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => true, source_database => 'dba1'); END; /
Start the apply process on destination database (DBA2) and prevent errors stopping the process.
CONNECT strmadmin/strmadminpw@DBA2 BEGIN DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'apply_simp', parameter => 'disable_on_error', value => 'n'); DBMS_APPLY_ADM.START_APPLY( apply_name => 'apply_simp'); END; /
Start the capture process on the source database (DBA1).
CONNECT strmadmin/strmadminpw@DBA1 BEGIN DBMS_CAPTURE_ADM.START_CAPTURE( capture_name => 'capture_simp'); END; /
With the streams activated we can see that DML changes to the source table are visible in the destination table.
CONNECT scott/tiger@dba1 INSERT INTO dept (deptno, dname, loc) VALUES (99, 'Test Dept', 'UK'); COMMIT; SELECT * FROM dept; DEPTNO DNAME LOC ---------- -------------- ------------- 10 ACCOUNTING NEW YORK 20 RESEARCH DALLAS 30 SALES CHICAGO 40 OPERATIONS BOSTON 99 Test Dept UK 5 rows selected. CONNECT scott/tiger@dba2 SELECT * FROM dept; DEPTNO DNAME LOC ---------- -------------- ------------- 10 ACCOUNTING NEW YORK 20 RESEARCH DALLAS 30 SALES CHICAGO 40 OPERATIONS BOSTON 99 Test Dept UK 5 rows selected.
We can also see that DDL changes to the source table are reflected in the destination table.
CONNECT scott/tiger@dba1 ALTER TABLE dept ADD ( new_col NUMBER(10) ) / DESC dept Name Null? Type ---------------------------- -------- -------------- DEPTNO NOT NULL NUMBER(2) DNAME VARCHAR2(14) LOC VARCHAR2(13) NEW_COL NUMBER(10) CONNECT scott/tiger@dba2 DESC dept Name Null? Type ---------------------------- -------- -------------- DEPTNO NOT NULL NUMBER(2) DNAME VARCHAR2(14) LOC VARCHAR2(13) NEW_COL NUMBER(10)
The contents of the streams can be viewed.
SELECT s.user_data.getTypeName() FROM streams_queue_table s; SET SERVEROUTPUT ON DECLARE v_anydata SYS.ANYDATA; v_lcr SYS.LCR$_ROW_RECORD; v_row_list SYS.LCR$_ROW_LIST; v_result PLS_INTEGER; BEGIN SELECT user_data INTO v_anydata FROM strmadmin.streams_queue_table WHERE rownum < 2; v_result := ANYDATA.GetObject( self => v_anydata, obj => v_lcr); DBMS_OUTPUT.PUT_LINE('Command Type : ' || v_lcr.Get_Command_Type); DBMS_OUTPUT.PUT_LINE('Object Owner : ' || v_lcr.Get_Object_Owner); DBMS_OUTPUT.PUT_LINE('Object Name : ' || v_lcr.Get_Object_Name); DBMS_OUTPUT.PUT_LINE('Source Database Name : ' || v_lcr.Get_Source_Database_Name); END; /
All rules can be identified and removed using the following statements.
BEGIN FOR cur_rec IN (SELECT rule_owner, rule_name FROM dba_rules) LOOP DBMS_RULE_ADM.DROP_RULE( rule_name => cur_rec.rule_owner || '.' || cur_rec.rule_name, force => TRUE); END LOOP; END; /
All capture and apply processes can be identified, stopped and dropped.
BEGIN FOR cur_rec IN (SELECT capture_name FROM dba_capture) LOOP DBMS_CAPTURE_ADM.STOP_CAPTURE( capture_name => cur_rec.capture_name); DBMS_CAPTURE_ADM.DROP_CAPTURE( capture_name => cur_rec.capture_name); END LOOP; FOR cur_rec IN (SELECT apply_name FROM dba_apply) LOOP DBMS_APPLY_ADM.STOP_APPLY( apply_name => cur_rec.apply_name); DBMS_APPLY_ADM.DROP_APPLY( apply_name => cur_rec.apply_name); END LOOP; END; /
All streams information relating to a specific object can be purged.
BEGIN DBMS_STREAMS_ADM.PURGE_SOURCE_CATALOG( source_database => 'dba1', source_object_name => 'scott.dept', source_object_type => 'TABLE'); END; /
For more information see:
Hope this helps. Regards Tim...
Back to normal view: https://oracle-base.com/articles/9i/streams-9i