Oracle DB. Using dbms_alert for queuing and parallel data processing.



Oracle has an interesting package to provide an event-based logic named dbms_alert. In this article I will use it to create a classic queue processing.

First, we need to create a simple table, which store queue message ID and creation time.

create table t_queue
(
id_queue number primary key,
d_add timestamp default systimestamp not null 
);

And a sequence to generate an ID for every message.

create sequence seq_t_queue;

The Idea:
Process #1 creates a message and waiting for an answer. At the same time process #2 reading a message, doing some stuff, and replying. Process #1 receive reply and feeling happy.

Let’s create a handler procedure.

create or replace procedure sendRandom is
  l_id_queue t_queue.id_queue%type;
  l_cur sys_refcursor;
  l_random number;
begin
  loop
    open l_cur for
    select id_queue
    from t_queue
    order by d_add asc
    for update skip locked;
    
    fetch l_cur into l_id_queue;
    
    if l_id_queue is not null
      then
        delete from t_queue where id_queue = l_id_queue;        
        l_random := dbms_random.value;
        dbms_alert.signal(l_id_queue, l_random);
        commit;
      else
        rollback;
        dbms_lock.sleep(0.1);
    end if;
  end loop;  
end sendRandom;

And run an infinity job for a handler.

begin
  dbms_scheduler.create_job(
                            job_name        => 'job_queue_thread_1'
                           ,job_type        => 'PLSQL_BLOCK'
                           ,job_action      => 'begin sendRandom; end;'
                           ,start_date      => systimestamp
                           ,end_date        => NULL
                           ,enabled         => TRUE
                           );
end;

This procedure pulling the oldest message and blocking it. You need to block it in case you have >1 parallel handler processes. Next, procedure generate random value and send reply using l_id_queue as event name.

As you probably already thought, you should not use infinite cycles on real projects. A good solution would be to have a process manager adaptively spawning jobs with limit lifetime.
If you want to recompile procedure while job is running, first stop a job or it will hang infinetly.

Job is running, so we can create our first queue message.

declare
  c_id_queue constant number := seq_t_queue.nextval;
  l_status   number;
  l_answer   varchar2(64);
begin
  insert into t_queue(id_queue) values (c_id_queue);
  commit;
  
  dbms_alert.register(c_id_queue);
  dbms_alert.waitone(c_id_queue, l_answer, l_status, 5);
  
  if l_status = 0
    then
      commit;
      dbms_output.put_line('Success: '||l_answer);
    else
      rollback;
      dbms_output.put_line('Timeout');
  end if;
end;

Here, we are creating new message and commiting it. Next we register our process using c_id_queue as event name and waiting for a 5 seconds. If handler will success processing our message, we will get a random number into l_answer variable.

Leave a Reply

Your email address will not be published. Required fields are marked *