8i | 9i | 10g | 11g | 12c | 13c | 18c | 19c | 21c | 23ai | Misc | PL/SQL | SQL | RAC | WebLogic | Linux
package allows a workload associated with a base table to be broken down into smaller chunks which can be run in parallel. The examples in this article are based around a simple update statement, which in reality would be more efficiently coded as a single parallel DML statement, but it serves to explain the usage of the package nicely. In reality you should use this package where straight parallel DML is not appropriate. Using the DBMS_PARALLEL_EXECUTE
package involves several distinct stages.
At the end of the article there are some complete examples, using some of the techniques discussed below.
The user controlling the process needs the CREATE JOB
The examples used in this article require the following table to be created and populated.
CONN test/test DROP TABLE test_tab; CREATE TABLE test_tab ( id NUMBER, description VARCHAR2(50), num_col NUMBER, session_id NUMBER, CONSTRAINT test_tab_pk PRIMARY KEY (id) ); INSERT /*+ APPEND */ INTO test_tab SELECT level, 'Description for ' || level, CASE WHEN MOD(level, 5) = 0 THEN 10 WHEN MOD(level, 3) = 0 THEN 20 ELSE 30 END, NULL FROM dual CONNECT BY level <= 500000; COMMIT; EXEC DBMS_STATS.gather_table_stats(USER, 'TEST_TAB', cascade => TRUE); SELECT num_col, COUNT(*) FROM test_tab GROUP BY num_col ORDER BY num_col; NUM_COL COUNT(*) ---------- ---------- 10 100000 20 133333 30 266667 SQL>
Create a task
procedure is used to create a new task. It requires a task name to be specified, but can also include an optional task comment.
BEGIN DBMS_PARALLEL_EXECUTE.create_task (task_name => 'test_task'); END; /
Information about existing tasks is displayed using the [DBA|USER]_PARALLEL_EXECUTE_TASKS
COLUMN task_name FORMAT A10 SELECT task_name, status FROM user_parallel_execute_tasks; TASK_NAME STATUS ---------- ------------------- test_task CREATED SQL>
function returns a unique task name if you do not want to name the task manually.
SELECT DBMS_PARALLEL_EXECUTE.generate_task_name FROM dual; GENERATE_TASK_NAME -------------------------------------------------------------------------------- TASK$_726 SQL>
Split the workload into chunks
The workload is associated with a base table, which can be split into subsets or chunks of rows. There are three methods of splitting the workload into chunks.
The chunks associated with a task can be dropped using the DROP_CHUNKS
procedure splits the data by rowid into chunks specified by the CHUNK_SIZE
parameter. If the BY_ROW
parameter is set to TRUE, the CHUNK_SIZE
refers to the number of rows, otherwise it refers to the number of blocks.
BEGIN DBMS_PARALLEL_EXECUTE.create_chunks_by_rowid(task_name => 'test_task', table_owner => 'TEST', table_name => 'TEST_TAB', by_row => TRUE, chunk_size => 10000); END; /
Once the operation is complete the task status is changed to 'CHUNKED'.
COLUMN task_name FORMAT A10 SELECT task_name, status FROM user_parallel_execute_tasks; TASK_NAME STATUS ---------- ------------------- test_task CHUNKED SQL>
views display information about the individual chunks.
SELECT chunk_id, status, start_rowid, end_rowid FROM user_parallel_execute_chunks WHERE task_name = 'test_task' ORDER BY chunk_id; CHUNK_ID STATUS START_ROWID END_ROWID ---------- -------------------- ------------------ ------------------ 287 UNASSIGNED AAASjoAAEAAAAIwAAA AAASjoAAEAAAAI3CcP 288 UNASSIGNED AAASjoAAEAAAAI4AAA AAASjoAAEAAAAI/CcP ... 450 UNASSIGNED AAASjoAAEAAAAIIAAA AAASjoAAEAAAAIPCcP 451 UNASSIGNED AAASjoAAEAAAAIoAAA AAASjoAAEAAAAIvCcP 88 rows selected. SQL>
procedure divides the workload up based on a number column. It uses the specified columns min and max values along with the chunk size to split the data into approximately equal chunks. For the chunks to be equally sized the column must contain a continuous sequence of numbers, like that generated by a sequence.
BEGIN DBMS_PARALLEL_EXECUTE.create_chunks_by_number_col(task_name => 'test_task', table_owner => 'TEST', table_name => 'TEST_TAB', table_column => 'ID', chunk_size => 10000); END; /
views display information about the individual chunks.
SELECT chunk_id, status, start_id, end_id FROM user_parallel_execute_chunks WHERE task_name = 'test_task' ORDER BY chunk_id; CHUNK_ID STATUS START_ID END_ID ---------- -------------------- ---------- ---------- 600 UNASSIGNED 1 10000 601 UNASSIGNED 10001 20000 ... 648 UNASSIGNED 480001 490000 649 UNASSIGNED 490001 500000 50 rows selected. SQL>
procedure divides the workload based on a user-defined query. If the BY_ROWID
parameter is set to TRUE, the query must return a series of start and end rowids. If it's set to FALSE, the query must return a series of start and end IDs.
DECLARE l_stmt CLOB; BEGIN l_stmt := 'SELECT DISTINCT num_col, num_col FROM test_tab'; DBMS_PARALLEL_EXECUTE.create_chunks_by_sql(task_name => 'test_task', sql_stmt => l_stmt, by_rowid => FALSE); END; /
views display information about the individual chunks.
SELECT chunk_id, status, start_id, end_id FROM user_parallel_execute_chunks WHERE task_name = 'test_task' ORDER BY chunk_id; CHUNK_ID STATUS START_ID END_ID ---------- -------------------- ---------- ---------- 650 UNASSIGNED 10 10 651 UNASSIGNED 30 30 652 UNASSIGNED 20 20 3 rows selected. SQL>
Run the task
Running a task involves running a specific statement for each defined chunk of work. The documentation only shows examples using updates of the base table, but this is not the only use of this functionality. The statement associated with the task can be a procedure call, as shown in one of the examples at the end of the article.
There are two ways to run a task and several procedures to control a running task.
procedure runs the specified statement in parallel by scheduling jobs to process the workload chunks. The statement specifying the actual work to be done must include a reference to the ':start_id' and ':end_id', which represent a range of rowids or column IDs to be processed, as specified in the chunk definitions. The degree of parallelism is controlled by the number of scheduled jobs, not the number of chunks defined. The scheduled jobs take an unassigned workload chunk, process it, then move on to the next unassigned chunk.
DECLARE l_sql_stmt VARCHAR2(32767); BEGIN l_sql_stmt := 'UPDATE test_tab t SET t.num_col = t.num_col + 10, t.session_id = SYS_CONTEXT(''USERENV'',''SESSIONID'') WHERE rowid BETWEEN :start_id AND :end_id'; DBMS_PARALLEL_EXECUTE.run_task(task_name => 'test_task', sql_stmt => l_sql_stmt, language_flag => DBMS_SQL.NATIVE, parallel_level => 10); END; /
procedure waits for the task to complete. On completion, the status of the task must be assessed to know what action to take next.
You can see the activity ran in parallel by checking the SESSION_ID
column that was set during the update.
SELECT session_id, COUNT(*) FROM test_tab GROUP BY session_id ORDER BY session_id; SESSION_ID COUNT(*) ---------- ---------- 232639 65041 232640 54029 232641 53571 232642 44622 232643 45264 232644 54242 232645 47314 232646 44746 232647 46309 232648 44862 10 rows selected. SQL>
The maximum level of parallelism will be limited by the JOB_QUEUE_PROCESSES
parameter and the number of available chunks.
User-defined framework
package allows you to manually code the task run. The GET_ROWID_CHUNK
procedures return the next available unassigned chunk. You can than manually process the chunk and set its status. The example below shows the processing of a workload chunked by rowid.
DECLARE l_sql_stmt VARCHAR2(32767); l_chunk_id NUMBER; l_start_rowid ROWID; l_end_rowid ROWID; l_any_rows BOOLEAN; BEGIN l_sql_stmt := 'UPDATE test_tab t SET t.num_col = t.num_col + 10, t.session_id = SYS_CONTEXT(''USERENV'',''SESSIONID'') WHERE rowid BETWEEN :start_id AND :end_id'; LOOP -- Get next unassigned chunk. DBMS_PARALLEL_EXECUTE.get_rowid_chunk(task_name => 'test_task', chunk_id => l_chunk_id, start_rowid => l_start_rowid, end_rowid => l_end_rowid, any_rows => l_any_rows); EXIT WHEN l_any_rows = FALSE; BEGIN -- Manually execute the work. EXECUTE IMMEDIATE l_sql_stmt USING l_start_rowid, l_end_rowid; -- Set the chunk status as processed. DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name => 'test_task', chunk_id => l_chunk_id, status => DBMS_PARALLEL_EXECUTE.PROCESSED); EXCEPTION WHEN OTHERS THEN -- Record chunk error. DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name => 'test_task', chunk_id => l_chunk_id, status => DBMS_PARALLEL_EXECUTE.PROCESSED_WITH_ERROR, err_num => SQLCODE, err_msg => SQLERRM); END; -- Commit work. COMMIT; END LOOP; END; /
Task control
A running task can be stopped and restarted using the STOP_TASK
procedures respectively.
procedure deletes all chunks with a status of 'PROCESSED' or 'PROCESSED_WITH_ERROR'.
routines have the same function as their namesakes, but they allow the operations to performed on tasks owned by other users. In order to use these routines the user must have been granted the ADM_PARALLEL_EXECUTE_TASK
Check the task status
The simplest way to check the status of a task is to use the TASK_STATUS
function. After execution of the task, the only possible return values are the 'FINISHED' or 'FINISHED_WITH_ERROR' constants. If the status is not 'FINISHED', then the task can be resumed using the RESUME_TASK
DECLARE l_try NUMBER; l_status NUMBER; BEGIN -- If there is error, RESUME it for at most 2 times. l_try := 0; l_status := DBMS_PARALLEL_EXECUTE.task_status('test_task'); WHILE(l_try < 2 and l_status != DBMS_PARALLEL_EXECUTE.FINISHED) Loop l_try := l_try + 1; DBMS_PARALLEL_EXECUTE.resume_task('test_task'); l_status := DBMS_PARALLEL_EXECUTE.task_status('test_task'); END LOOP; END; /
The status of the task and the chunks can also be queried.
COLUMN task_name FORMAT A10 SELECT task_name, status FROM user_parallel_execute_tasks; TASK_NAME STATUS ---------- ------------------- test_task FINISHED SQL>
If there were errors, the chunks can be queried to identify the problems.
SELECT status, COUNT(*) FROM user_parallel_execute_chunks GROUP BY status ORDER BY status; STATUS COUNT(*) -------------------- ---------- PROCESSED 88 SQL>
views contain a record of the JOB_PREFIX
used when scheduling the chunks of work.
SELECT job_prefix FROM user_parallel_execute_tasks WHERE task_name = 'test_task'; JOB_PREFIX ------------------------------ TASK$_368 SQL>
This value can be used to query information about the individual jobs used during the process. The number of jobs scheduled should match the degree of parallelism specified in the RUN_TASK
COLUMN job_name FORMAT A20 SELECT job_name, status FROM user_scheduler_job_run_details WHERE job_name LIKE (SELECT job_prefix || '%' FROM user_parallel_execute_tasks WHERE task_name = 'test_task'); JOB_NAME STATUS -------------------- ------------------------------ TASK$_368_1 SUCCEEDED TASK$_368_6 SUCCEEDED TASK$_368_2 SUCCEEDED TASK$_368_9 SUCCEEDED TASK$_368_10 SUCCEEDED TASK$_368_8 SUCCEEDED TASK$_368_7 SUCCEEDED TASK$_368_4 SUCCEEDED TASK$_368_5 SUCCEEDED TASK$_368_3 SUCCEEDED 10 rows selected. SQL>
Drop the task
Once the job is complete you can drop the task, which will drop the associated chunk information also.
BEGIN DBMS_PARALLEL_EXECUTE.drop_task('test_task'); END; /
Complete examples
The following example shows the processing of a workload chunked by rowid.
DECLARE l_task VARCHAR2(30) := 'test_task'; l_sql_stmt VARCHAR2(32767); l_try NUMBER; l_status NUMBER; BEGIN DBMS_PARALLEL_EXECUTE.create_task (task_name => l_task); DBMS_PARALLEL_EXECUTE.create_chunks_by_rowid(task_name => l_task, table_owner => 'TEST', table_name => 'TEST_TAB', by_row => TRUE, chunk_size => 10000); l_sql_stmt := 'UPDATE test_tab t SET t.num_col = t.num_col + 10, t.session_id = SYS_CONTEXT(''USERENV'',''SESSIONID'') WHERE rowid BETWEEN :start_id AND :end_id'; DBMS_PARALLEL_EXECUTE.run_task(task_name => l_task, sql_stmt => l_sql_stmt, language_flag => DBMS_SQL.NATIVE, parallel_level => 10); -- If there is error, RESUME it for at most 2 times. l_try := 0; l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task); WHILE(l_try < 2 and l_status != DBMS_PARALLEL_EXECUTE.FINISHED) Loop l_try := l_try + 1; DBMS_PARALLEL_EXECUTE.resume_task(l_task); l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task); END LOOP; DBMS_PARALLEL_EXECUTE.drop_task(l_task); END; /
The following example shows the processing of a workload chunked by a number column. Notice that the workload is actually a stored procedure in this case.
CREATE OR REPLACE PROCEDURE process_update (p_start_id IN NUMBER, p_end_id IN NUMBER) AS BEGIN UPDATE test_tab t SET t.num_col = t.num_col + 10, t.session_id = SYS_CONTEXT('USERENV','SESSIONID') WHERE id BETWEEN p_start_id AND p_end_id; END; / DECLARE l_task VARCHAR2(30) := 'test_task'; l_sql_stmt VARCHAR2(32767); l_try NUMBER; l_status NUMBER; BEGIN DBMS_PARALLEL_EXECUTE.create_task (task_name => l_task); DBMS_PARALLEL_EXECUTE.create_chunks_by_number_col(task_name => l_task, table_owner => 'TEST', table_name => 'TEST_TAB', table_column => 'ID', chunk_size => 10000); l_sql_stmt := 'BEGIN process_update(:start_id, :end_id); END;'; DBMS_PARALLEL_EXECUTE.run_task(task_name => l_task, sql_stmt => l_sql_stmt, language_flag => DBMS_SQL.NATIVE, parallel_level => 10); -- If there is error, RESUME it for at most 2 times. l_try := 0; l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task); WHILE(l_try < 2 and l_status != DBMS_PARALLEL_EXECUTE.FINISHED) Loop l_try := l_try + 1; DBMS_PARALLEL_EXECUTE.resume_task(l_task); l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task); END LOOP; DBMS_PARALLEL_EXECUTE.drop_task(l_task); END; /
The following example shows a workload chunked by a SQL statement and processed by a user-defined framework.
DECLARE l_task VARCHAR2(30) := 'test_task'; l_stmt CLOB; l_sql_stmt VARCHAR2(32767); l_chunk_id NUMBER; l_start_id NUMBER; l_end_id NUMBER; l_any_rows BOOLEAN; BEGIN DBMS_PARALLEL_EXECUTE.create_task (task_name => l_task); l_stmt := 'SELECT DISTINCT num_col, num_col FROM test_tab'; DBMS_PARALLEL_EXECUTE.create_chunks_by_sql(task_name => l_task, sql_stmt => l_stmt, by_rowid => FALSE); l_sql_stmt := 'UPDATE test_tab t SET t.num_col = t.num_col, t.session_id = SYS_CONTEXT(''USERENV'',''SESSIONID'') WHERE num_col BETWEEN :start_id AND :end_id'; LOOP -- Get next unassigned chunk. DBMS_PARALLEL_EXECUTE.get_number_col_chunk(task_name => l_task, chunk_id => l_chunk_id, start_id => l_start_id, end_id => l_end_id, any_rows => l_any_rows); EXIT WHEN l_any_rows = FALSE; BEGIN -- Manually execute the work. EXECUTE IMMEDIATE l_sql_stmt USING l_start_id, l_end_id; -- Set the chunk status as processed. DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name => l_task, chunk_id => l_chunk_id, status => DBMS_PARALLEL_EXECUTE.PROCESSED); EXCEPTION WHEN OTHERS THEN -- Record chunk error. DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name => l_task, chunk_id => l_chunk_id, status => DBMS_PARALLEL_EXECUTE.PROCESSED_WITH_ERROR, err_num => SQLCODE, err_msg => SQLERRM); END; -- Commit work. COMMIT; END LOOP; DBMS_PARALLEL_EXECUTE.drop_task(l_task); END; /
For more information see:
Hope this helps. Regards Tim...