Oracle DB. Advanced Queuing. Simple subscriber example.



In this article I will show you how to create Oracle queues, pull messages and push them using subscriber.

Before we begin, please, grant dbms_aqadm and dbms_aq execute privileges to user.

Queue table use an object data type to transfer message data. Let’s create it first.

create or replace type tp_queue as object(n number);

Second, we can create a queue table.

begin
  dbms_aqadm.create_queue_table('tq_test','tp_queue', multiple_consumers => true);
end;

Check here:
select * from user_queue_tables;
Or execute a direct query:
select * from tq_test;

Next, we can create a queue, linking it with queue table.

begin
  dbms_aqadm.create_queue('q_test', 'tq_test');
end;

And start it:

begin
  dbms_aqadm.start_queue('q_test');
end;

New queue will appear here:
select * from user_queues;

That’s it! Now queue is ready and we can push and pull messages manually. But we will continue. Let’s automate pull process a little bit using subscriber.
Before we create subscriber procedure, let’s create log table to check subscriber actually works.

create table t_log
(
d_add timestamp default systimestamp not null,
s_msg varchar2(4000)
);

Now we are ready to create a package with subscriber procedure and logging helper.
Head

create or replace package pkg_queue is

  procedure addLog(pi_msg in t_log.s_msg%type);
  
  procedure subscriber(
                       context raw
                      ,reginfo sys.aq$_reg_info
                      ,descr sys.aq$_descriptor
                      ,payload raw
                      ,payloadl number
                      );

end pkg_queue;

Body

create or replace package body pkg_queue is

  procedure addLog(pi_msg in t_log.s_msg%type)
  as
  begin
    insert into t_log(s_msg) values(pi_msg);
  end;
  
  procedure subscriber(
                       context raw
                      ,reginfo sys.aq$_reg_info
                      ,descr sys.aq$_descriptor
                      ,payload raw
                      ,payloadl number
                      )
  as
    l_tr_obj      tp_queue;
    l_msg_props   dbms_aq.message_properties_t;
    l_queue_opts  dbms_aq.dequeue_options_t;
    l_msg_id      raw(16);
  begin
    l_queue_opts.consumer_name := descr.consumer_name;
    l_queue_opts.msgid := descr.msg_id;
    dbms_aq.dequeue(descr.queue_name, l_queue_opts, l_msg_props, l_tr_obj, l_msg_id);
    
    --there can be heavy load, but now just log
    addLog(l_tr_obj.n);
  exception
    when others
      then 
        addLog(sqlerrm);
  end;
   
end pkg_queue;

The logic is simple. Subscriber pulling(dbms_aq.dequeue) a message and logging it.
IMPORTANT. Subscriber procedure specification should be the same, even you are not using all arguments.

Now, we can add and register a procedure to our queue.

begin
  dbms_aqadm.add_subscriber('q_test', sys.aq$_agent('test_subscriber', null, null));
  dbms_aq.register(sys.aq$_reg_info_list(sys.aq$_reg_info('q_test:test_subscriber', dbms_aq.namespace_aq, 'plsql://pkg_queue.subscriber', hextoraw('FF'))), 1);
end;

All done! Now we can add message to a queue…

declare
  l_msg_props   dbms_aq.message_properties_t;
  l_queue_opts  dbms_aq.enqueue_options_t;
  l_msg_id      raw(16);
begin
  dbms_aq.enqueue('q_test', l_queue_opts, l_msg_props, tp_queue(1), l_msg_id);
  commit;
end;

…and check a log table to prove subscriber work.

select * from t_log;

20.01.16 14:54:30,774600	1

Oracle Advanced Queuing is a mature progressive product.
Using built-in queue administration packages you can tune pull repeat count, repeat interval, message lifetime, pull delay, http and email subscribers and much more.

But before you leave, you should know.

There is a special thing using Oracle queues. A bug, which you may to meet, if you want to recompile dependent package.

Imagine, you have a call to external package inside a subscriber procedure:

addLog(l_tr_obj.n * pkg_util.getRandom);

In case, one subscriber process is running subscriber calling pkg_util and another process is compiling pkg_util, you can get the following error:

ORA-06508: PL/SQL: could not find program unit being called

If your system is high-load, subscriber process will not reload and you will continue to get this error for all messages it will try to pull.

This bug is appearing on package head compilation often and package body compilation rarely.

There are two ways to stop it, once you got such error:
1) Restrict message pushing and wait a little bit
2) Kill a process, which got an error. After that, a new process will spawn automatically without an error.

It’s all up to you, to use Advanced Queuing or not. As for me, I will better use my own queue engine for small projects.

Leave a Reply

Your email address will not be published. Required fields are marked *