8i | 9i | 10g | 11g | 12c | 13c | 18c | 19c | 21c | 23ai | Misc | PL/SQL | SQL | RAC | WebLogic | Linux

Home » Articles » 11g » Here

DBMS_PARALLEL_EXECUTE

The DBMS_PARALLEL_EXECUTE package allows a workload associated with a base table to be broken down into smaller chunks which can be run in parallel. The examples in this article are based around a simple update statement, which in reality would be more efficiently coded as a single parallel DML statement, but it serves to explain the usage of the package nicely. In reality you should use this package where straight parallel DML is not appropriate. Using the DBMS_PARALLEL_EXECUTE package involves several distinct stages.

At the end of the article there are some complete examples, using some of the techniques discussed below.

The user controlling the process needs the CREATE JOB privilege.

CONN / AS SYSDBA
GRANT CREATE JOB TO test;

The examples used in this article require the following table to be created and populated.

CONN test/test

DROP TABLE test_tab;

CREATE TABLE test_tab (
  id          NUMBER,
  description VARCHAR2(50),
  num_col     NUMBER,
  session_id  NUMBER,
  CONSTRAINT test_tab_pk PRIMARY KEY (id)
);

INSERT /*+ APPEND */ INTO test_tab
SELECT level,
       'Description for ' || level,
       CASE
         WHEN MOD(level, 5) = 0 THEN 10
         WHEN MOD(level, 3) = 0 THEN 20
         ELSE 30
       END,
       NULL
FROM   dual
CONNECT BY level <= 500000;
COMMIT;

EXEC DBMS_STATS.gather_table_stats(USER, 'TEST_TAB', cascade => TRUE);

SELECT num_col, COUNT(*)
FROM   test_tab
GROUP BY num_col
ORDER BY num_col;

   NUM_COL   COUNT(*)
---------- ----------
        10     100000
        20     133333
        30     266667

SQL>

Create a task

The CREATE_TASK procedure is used to create a new task. It requires a task name to be specified, but can also include an optional task comment.

BEGIN
  DBMS_PARALLEL_EXECUTE.create_task (task_name => 'test_task');
END;
/

Information about existing tasks is displayed using the [DBA|USER]_PARALLEL_EXECUTE_TASKS views.

COLUMN task_name FORMAT A10
SELECT task_name,
       status
FROM   user_parallel_execute_tasks;

TASK_NAME  STATUS
---------- -------------------
test_task  CREATED

SQL>

The GENERATE_TASK_NAME function returns a unique task name if you do not want to name the task manually.

SELECT DBMS_PARALLEL_EXECUTE.generate_task_name
FROM   dual;

GENERATE_TASK_NAME
--------------------------------------------------------------------------------
TASK$_726

SQL>

Split the workload into chunks

The workload is associated with a base table, which can be split into subsets or chunks of rows. There are three methods of splitting the workload into chunks.

The chunks associated with a task can be dropped using the DROP_CHUNKS procedure.

CREATE_CHUNKS_BY_ROWID

The CREATE_CHUNKS_BY_ROWID procedure splits the data by rowid into chunks specified by the CHUNK_SIZE parameter. If the BY_ROW parameter is set to TRUE, the CHUNK_SIZE refers to the number of rows, otherwise it refers to the number of blocks.

BEGIN
  DBMS_PARALLEL_EXECUTE.create_chunks_by_rowid(task_name   => 'test_task',
                                               table_owner => 'TEST',
                                               table_name  => 'TEST_TAB',
                                               by_row      => TRUE,
                                               chunk_size  => 10000);
END;
/

Once the operation is complete the task status is changed to 'CHUNKED'.

COLUMN task_name FORMAT A10

SELECT task_name,
       status
FROM   user_parallel_execute_tasks;

TASK_NAME  STATUS
---------- -------------------
test_task  CHUNKED

SQL>

The [DBA|USER]_PARALLEL_EXECUTE_CHUNKS views display information about the individual chunks.

SELECT chunk_id, status, start_rowid, end_rowid
FROM   user_parallel_execute_chunks
WHERE  task_name = 'test_task'
ORDER BY chunk_id;

  CHUNK_ID STATUS               START_ROWID        END_ROWID
---------- -------------------- ------------------ ------------------
       287 UNASSIGNED           AAASjoAAEAAAAIwAAA AAASjoAAEAAAAI3CcP
       288 UNASSIGNED           AAASjoAAEAAAAI4AAA AAASjoAAEAAAAI/CcP
...
       450 UNASSIGNED           AAASjoAAEAAAAIIAAA AAASjoAAEAAAAIPCcP
       451 UNASSIGNED           AAASjoAAEAAAAIoAAA AAASjoAAEAAAAIvCcP

88 rows selected.

SQL>

CREATE_CHUNKS_BY_NUMBER_COL

The CREATE_CHUNKS_BY_NUMBER_COL procedure divides the workload up based on a number column. It uses the specified columns min and max values along with the chunk size to split the data into approximately equal chunks. For the chunks to be equally sized the column must contain a continuous sequence of numbers, like that generated by a sequence.

BEGIN
  DBMS_PARALLEL_EXECUTE.create_chunks_by_number_col(task_name    => 'test_task',
                                                    table_owner  => 'TEST',
                                                    table_name   => 'TEST_TAB',
                                                    table_column => 'ID',
                                                    chunk_size   => 10000);
END;
/

The [DBA|USER]_PARALLEL_EXECUTE_CHUNKS views display information about the individual chunks.

SELECT chunk_id, status, start_id, end_id
FROM   user_parallel_execute_chunks
WHERE  task_name = 'test_task'
ORDER BY chunk_id;

  CHUNK_ID STATUS                 START_ID     END_ID
---------- -------------------- ---------- ----------
       600 UNASSIGNED                    1      10000
       601 UNASSIGNED                10001      20000
 ...
       648 UNASSIGNED               480001     490000
       649 UNASSIGNED               490001     500000

50 rows selected.

SQL>

CREATE_CHUNKS_BY_SQL

The CREATE_CHUNKS_BY_SQL procedure divides the workload based on a user-defined query. If the BY_ROWID parameter is set to TRUE, the query must return a series of start and end rowids. If it's set to FALSE, the query must return a series of start and end IDs.

DECLARE
  l_stmt CLOB;
BEGIN
  l_stmt := 'SELECT DISTINCT num_col, num_col FROM test_tab';

  DBMS_PARALLEL_EXECUTE.create_chunks_by_sql(task_name => 'test_task',
                                             sql_stmt  => l_stmt,
                                             by_rowid  => FALSE);
END;
/

The [DBA|USER]_PARALLEL_EXECUTE_CHUNKS views display information about the individual chunks.

SELECT chunk_id, status, start_id, end_id
FROM   user_parallel_execute_chunks
WHERE  task_name = 'test_task'
ORDER BY chunk_id;

  CHUNK_ID STATUS                 START_ID     END_ID
---------- -------------------- ---------- ----------
       650 UNASSIGNED                   10         10
       651 UNASSIGNED                   30         30
       652 UNASSIGNED                   20         20

3 rows selected.

SQL>

Run the task

Running a task involves running a specific statement for each defined chunk of work. The documentation only shows examples using updates of the base table, but this is not the only use of this functionality. The statement associated with the task can be a procedure call, as shown in one of the examples at the end of the article.

There are two ways to run a task and several procedures to control a running task.

RUN_TASK

The RUN_TASK procedure runs the specified statement in parallel by scheduling jobs to process the workload chunks. The statement specifying the actual work to be done must include a reference to the ':start_id' and ':end_id', which represent a range of rowids or column IDs to be processed, as specified in the chunk definitions. The degree of parallelism is controlled by the number of scheduled jobs, not the number of chunks defined. The scheduled jobs take an unassigned workload chunk, process it, then move on to the next unassigned chunk.

DECLARE
  l_sql_stmt VARCHAR2(32767);
BEGIN
  l_sql_stmt := 'UPDATE test_tab t 
                 SET    t.num_col = t.num_col + 10,
                        t.session_id = SYS_CONTEXT(''USERENV'',''SESSIONID'')
                 WHERE rowid BETWEEN :start_id AND :end_id';

  DBMS_PARALLEL_EXECUTE.run_task(task_name      => 'test_task',
                                 sql_stmt       => l_sql_stmt,
                                 language_flag  => DBMS_SQL.NATIVE,
                                 parallel_level => 10);
END;
/

The RUN_TASK procedure waits for the task to complete. On completion, the status of the task must be assessed to know what action to take next.

You can see the activity ran in parallel by checking the SESSION_ID column that was set during the update.

SELECT session_id, COUNT(*)
FROM   test_tab
GROUP BY session_id
ORDER BY session_id;

SESSION_ID   COUNT(*)
---------- ----------
    232639      65041
    232640      54029
    232641      53571
    232642      44622
    232643      45264
    232644      54242
    232645      47314
    232646      44746
    232647      46309
    232648      44862

10 rows selected.

SQL>

The maximum level of parallelism will be limited by the JOB_QUEUE_PROCESSES parameter and the number of available chunks.

User-defined framework

The DBMS_PARALLEL_EXECUTE package allows you to manually code the task run. The GET_ROWID_CHUNK and GET_NUMBER_COL_CHUNK procedures return the next available unassigned chunk. You can than manually process the chunk and set its status. The example below shows the processing of a workload chunked by rowid.

DECLARE
  l_sql_stmt    VARCHAR2(32767);
  l_chunk_id    NUMBER;
  l_start_rowid ROWID;
  l_end_rowid   ROWID;
  l_any_rows    BOOLEAN;
BEGIN
  l_sql_stmt := 'UPDATE test_tab t 
                 SET    t.num_col = t.num_col + 10,
                        t.session_id = SYS_CONTEXT(''USERENV'',''SESSIONID'')
                 WHERE rowid BETWEEN :start_id AND :end_id';
 
  LOOP
    -- Get next unassigned chunk.
    DBMS_PARALLEL_EXECUTE.get_rowid_chunk(task_name   => 'test_task',
                                          chunk_id    => l_chunk_id,
                                          start_rowid => l_start_rowid,
                                          end_rowid   => l_end_rowid,
                                          any_rows    => l_any_rows);

    EXIT WHEN l_any_rows = FALSE;
 
    BEGIN
      -- Manually execute the work.
      EXECUTE IMMEDIATE l_sql_stmt USING l_start_rowid, l_end_rowid;

      -- Set the chunk status as processed.
      DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name => 'test_task',
                                             chunk_id  => l_chunk_id,
                                             status    => DBMS_PARALLEL_EXECUTE.PROCESSED);
      EXCEPTION
        WHEN OTHERS THEN
          -- Record chunk error.
          DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name => 'test_task',
                                                 chunk_id  => l_chunk_id,
                                                 status    => DBMS_PARALLEL_EXECUTE.PROCESSED_WITH_ERROR,
                                                 err_num   => SQLCODE,
                                                 err_msg   => SQLERRM);
    END;

    -- Commit work.
    COMMIT;
  END LOOP;
END;
/

Task control

A running task can be stopped and restarted using the STOP_TASK and RESUME_TASK procedures respectively.

The PURGE_PROCESSED_CHUNKS procedure deletes all chunks with a status of 'PROCESSED' or 'PROCESSED_WITH_ERROR'.

The ADM_DROP_CHUNKS, ADM_DROP_TASK, ADM_TASK_STATUS and ADM_STOP_TASK routines have the same function as their namesakes, but they allow the operations to performed on tasks owned by other users. In order to use these routines the user must have been granted the ADM_PARALLEL_EXECUTE_TASK role.

Check the task status

The simplest way to check the status of a task is to use the TASK_STATUS function. After execution of the task, the only possible return values are the 'FINISHED' or 'FINISHED_WITH_ERROR' constants. If the status is not 'FINISHED', then the task can be resumed using the RESUME_TASK procedure.

DECLARE
  l_try NUMBER;
  l_status NUMBER;
BEGIN
  -- If there is error, RESUME it for at most 2 times.
  l_try := 0;
  l_status := DBMS_PARALLEL_EXECUTE.task_status('test_task');
  WHILE(l_try < 2 and l_status != DBMS_PARALLEL_EXECUTE.FINISHED) 
  Loop
    l_try := l_try + 1;
    DBMS_PARALLEL_EXECUTE.resume_task('test_task');
    l_status := DBMS_PARALLEL_EXECUTE.task_status('test_task');
  END LOOP;
END;
/

The status of the task and the chunks can also be queried.

COLUMN task_name FORMAT A10
SELECT task_name,
       status
FROM   user_parallel_execute_tasks;

TASK_NAME  STATUS
---------- -------------------
test_task  FINISHED

SQL>

If there were errors, the chunks can be queried to identify the problems.

SELECT status, COUNT(*)
FROM   user_parallel_execute_chunks
GROUP BY status
ORDER BY status;

STATUS                 COUNT(*)
-------------------- ----------
PROCESSED                    88

SQL>

The [DBA|USER]_PARALLEL_EXECUTE_TASKS views contain a record of the JOB_PREFIX used when scheduling the chunks of work.

SELECT job_prefix
FROM   user_parallel_execute_tasks
WHERE  task_name = 'test_task';

JOB_PREFIX
------------------------------
TASK$_368

SQL>

This value can be used to query information about the individual jobs used during the process. The number of jobs scheduled should match the degree of parallelism specified in the RUN_TASK procedure.

COLUMN job_name FORMAT A20

SELECT job_name, status
FROM   user_scheduler_job_run_details
WHERE  job_name LIKE (SELECT job_prefix || '%'
                      FROM   user_parallel_execute_tasks
                      WHERE  task_name = 'test_task');

JOB_NAME             STATUS
-------------------- ------------------------------
TASK$_368_1          SUCCEEDED
TASK$_368_6          SUCCEEDED
TASK$_368_2          SUCCEEDED
TASK$_368_9          SUCCEEDED
TASK$_368_10         SUCCEEDED
TASK$_368_8          SUCCEEDED
TASK$_368_7          SUCCEEDED
TASK$_368_4          SUCCEEDED
TASK$_368_5          SUCCEEDED
TASK$_368_3          SUCCEEDED

10 rows selected.

SQL>

Drop the task

Once the job is complete you can drop the task, which will drop the associated chunk information also.

BEGIN
  DBMS_PARALLEL_EXECUTE.drop_task('test_task');
END;
/

Complete examples

The following example shows the processing of a workload chunked by rowid.

DECLARE
  l_task     VARCHAR2(30) := 'test_task';
  l_sql_stmt VARCHAR2(32767);
  l_try      NUMBER;
  l_status   NUMBER;
BEGIN
  DBMS_PARALLEL_EXECUTE.create_task (task_name => l_task);

  DBMS_PARALLEL_EXECUTE.create_chunks_by_rowid(task_name   => l_task,
                                               table_owner => 'TEST',
                                               table_name  => 'TEST_TAB',
                                               by_row      => TRUE,
                                               chunk_size  => 10000);

  l_sql_stmt := 'UPDATE test_tab t 
                 SET    t.num_col = t.num_col + 10,
                        t.session_id = SYS_CONTEXT(''USERENV'',''SESSIONID'')
                 WHERE rowid BETWEEN :start_id AND :end_id';

  DBMS_PARALLEL_EXECUTE.run_task(task_name      => l_task,
                                 sql_stmt       => l_sql_stmt,
                                 language_flag  => DBMS_SQL.NATIVE,
                                 parallel_level => 10);

  -- If there is error, RESUME it for at most 2 times.
  l_try := 0;
  l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task);
  WHILE(l_try < 2 and l_status != DBMS_PARALLEL_EXECUTE.FINISHED) 
  Loop
    l_try := l_try + 1;
    DBMS_PARALLEL_EXECUTE.resume_task(l_task);
    l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task);
  END LOOP;

  DBMS_PARALLEL_EXECUTE.drop_task(l_task);
END;
/

The following example shows the processing of a workload chunked by a number column. Notice that the workload is actually a stored procedure in this case.

CREATE OR REPLACE PROCEDURE process_update (p_start_id IN NUMBER, p_end_id IN NUMBER) AS
BEGIN
  UPDATE test_tab t 
  SET    t.num_col = t.num_col + 10,
         t.session_id = SYS_CONTEXT('USERENV','SESSIONID')
  WHERE id BETWEEN p_start_id AND p_end_id;
END;
/

DECLARE
  l_task     VARCHAR2(30) := 'test_task';
  l_sql_stmt VARCHAR2(32767);
  l_try      NUMBER;
  l_status   NUMBER;
BEGIN
  DBMS_PARALLEL_EXECUTE.create_task (task_name => l_task);

  DBMS_PARALLEL_EXECUTE.create_chunks_by_number_col(task_name    => l_task,
                                                    table_owner  => 'TEST',
                                                    table_name   => 'TEST_TAB',
                                                    table_column => 'ID',
                                                    chunk_size   => 10000);

  l_sql_stmt := 'BEGIN process_update(:start_id, :end_id); END;';

  DBMS_PARALLEL_EXECUTE.run_task(task_name      => l_task,
                                 sql_stmt       => l_sql_stmt,
                                 language_flag  => DBMS_SQL.NATIVE,
                                 parallel_level => 10);

  -- If there is error, RESUME it for at most 2 times.
  l_try := 0;
  l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task);
  WHILE(l_try < 2 and l_status != DBMS_PARALLEL_EXECUTE.FINISHED) 
  Loop
    l_try := l_try + 1;
    DBMS_PARALLEL_EXECUTE.resume_task(l_task);
    l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task);
  END LOOP;

  DBMS_PARALLEL_EXECUTE.drop_task(l_task);
END;
/

The following example shows a workload chunked by a SQL statement and processed by a user-defined framework.

DECLARE
  l_task     VARCHAR2(30) := 'test_task';
  l_stmt     CLOB;
  l_sql_stmt VARCHAR2(32767);
  l_chunk_id NUMBER;
  l_start_id NUMBER;
  l_end_id   NUMBER;
  l_any_rows BOOLEAN;
BEGIN
  DBMS_PARALLEL_EXECUTE.create_task (task_name => l_task);

  l_stmt := 'SELECT DISTINCT num_col, num_col FROM test_tab';

  DBMS_PARALLEL_EXECUTE.create_chunks_by_sql(task_name => l_task,
                                             sql_stmt  => l_stmt,
                                             by_rowid  => FALSE);

  l_sql_stmt := 'UPDATE test_tab t 
                 SET    t.num_col = t.num_col,
                        t.session_id = SYS_CONTEXT(''USERENV'',''SESSIONID'')
                 WHERE num_col BETWEEN :start_id AND :end_id';

  LOOP
    -- Get next unassigned chunk.
    DBMS_PARALLEL_EXECUTE.get_number_col_chunk(task_name   => l_task,
                                               chunk_id    => l_chunk_id,
                                               start_id    => l_start_id,
                                               end_id      => l_end_id,
                                               any_rows    => l_any_rows);

    EXIT WHEN l_any_rows = FALSE;
 
    BEGIN
      -- Manually execute the work.
      EXECUTE IMMEDIATE l_sql_stmt USING l_start_id, l_end_id;

      -- Set the chunk status as processed.
      DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name => l_task,
                                             chunk_id  => l_chunk_id,
                                             status    => DBMS_PARALLEL_EXECUTE.PROCESSED);
      EXCEPTION
        WHEN OTHERS THEN
          -- Record chunk error.
          DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name => l_task,
                                                 chunk_id  => l_chunk_id,
                                                 status    => DBMS_PARALLEL_EXECUTE.PROCESSED_WITH_ERROR,
                                                 err_num   => SQLCODE,
                                                 err_msg   => SQLERRM);
    END;

    -- Commit work.
    COMMIT;
  END LOOP;

  DBMS_PARALLEL_EXECUTE.drop_task(l_task);
END;
/

For more information see:

Hope this helps. Regards Tim...

Back to the Top.