Oracle DB. Advanced Queuing. Простой пример c Subscriber.

В это статье я бы хотел рассказать о том, как можно быстро и просто создавать очереди в Oracle, а так же о том как автоматически выгребать сообщения из очереди при помощи процедуры-подписчика(Subscriber).


Для корректной работы с очередями пользователю должны быть выданы привилегии на выполнение пакетов dbms_aqadm и dbms_aq.


Перед созданием очереди необходимо создать таблицу, в которой будут храниться её сообщения. Также, для передачи данных процессу, который обрабатывает сообщение из очереди, необходимо создать объектный тип, который будет содержать в себе эти данные.
Создаём тип:

create or replace type tp_queue as object(n number);

Создаём таблицу для будущей очереди, указав её имя и созданный тип:

begin
  dbms_aqadm.create_queue_table('tq_test','tp_queue', multiple_consumers => true);
end;

Проверить, что таблица создана успешно, можно обратившись в системную таблицу
select * from user_queue_tables;
Также можно выполнить простой селект к таблице(на момент создания она еще пуста):
select * from tq_test;


Далее, необходимо создать саму очередь в привязке к созданной таблице:

begin
  dbms_aqadm.create_queue('q_test', 'tq_test');
end;

И запустить её:

begin
  dbms_aqadm.start_queue('q_test');
end;

Проверить, что очередь создана успешно, можно обратившись в системную таблицу
select * from user_queues;


На этом этапе очередь уже готова к работе и в неё можно вставлять сообщения и вычитывать их вручную. Однако мы на этом не останавливаемся и автоматизируем процесс вычитывания сообщений из очереди при помощи процедуры-подписчика.

Перед тем как создать процедуру-подписчик, создадим таблицу в которую будут складываться факты её срабатывания:

create table t_log
(
d_add timestamp default systimestamp not null,
s_msg varchar2(4000)
);

Теперь можно создать пакет, в котором будет две процедуры: подписчик и складывание сообщения в таблицу логов.
Заголовок пакета

create or replace package pkg_queue is

  procedure addLog(pi_msg in t_log.s_msg%type);
  
  procedure subscriber(
                       context raw
                      ,reginfo sys.aq$_reg_info
                      ,descr sys.aq$_descriptor
                      ,payload raw
                      ,payloadl number
                      );

end pkg_queue;

Тело пакета

create or replace package body pkg_queue is

  procedure addLog(pi_msg in t_log.s_msg%type)
  as
  begin
    insert into t_log(s_msg) values(pi_msg);
  end;
  
  procedure subscriber(
                       context raw
                      ,reginfo sys.aq$_reg_info
                      ,descr sys.aq$_descriptor
                      ,payload raw
                      ,payloadl number
                      )
  as
    l_tr_obj      tp_queue;
    l_msg_props   dbms_aq.message_properties_t;
    l_queue_opts  dbms_aq.dequeue_options_t;
    l_msg_id      raw(16);
  begin
    l_queue_opts.consumer_name := descr.consumer_name;
    l_queue_opts.msgid := descr.msg_id;
    dbms_aq.dequeue(descr.queue_name, l_queue_opts, l_msg_props, l_tr_obj, l_msg_id);
    
    --здесь должна быть полезная нагрузка на основе данных из объектного типа
    --в данном случае в качестве нагрузки логируется запись из типа
    addLog(l_tr_obj.n);
  exception
    when others
      then 
        addLog(sqlerrm);
  end;
   
end pkg_queue;

Процедура-подписчик вычитывает(dequeue) сообщение с указанным msg_id и выполняет на основе его полезную нагрузку.
ВАЖНО!. Процедура-подписчик всегда должна иметь точно такую спецификацию, иначе её работа не гарантируется.

После этого можно добавить подписчика к выбранной очереди и зарегистрировать его:

begin
  dbms_aqadm.add_subscriber('q_test', sys.aq$_agent('test_subscriber', null, null));
  dbms_aq.register(sys.aq$_reg_info_list(sys.aq$_reg_info('q_test:test_subscriber', dbms_aq.namespace_aq, 'plsql://pkg_queue.subscriber', hextoraw('FF'))), 1);
end;

Всё готово. Теперь достаточно добавить сообщение в очередь…

declare
  l_msg_props   dbms_aq.message_properties_t;
  l_queue_opts  dbms_aq.enqueue_options_t;
  l_msg_id      raw(16);
begin
  dbms_aq.enqueue('q_test', l_queue_opts, l_msg_props, tp_queue(1), l_msg_id);
  commit;
end;

…и проверить таблицу логов, чтобы удостовериться что подписчик отработал нормально.

select * from t_log;
20.01.16 14:54:30,774600	1

Очереди в Oracle это достаточно продвинутый инструмент.
Стандартными средствами вы можете настроить количество повторов при неуспешной вычитке, их интервал, время жизни сообщения, задержка его выполнения, http-запросы и e-mail в качестве процедуры-подписчика и многое другое.
В этой статье описан лишь простейший пример работы с ними.


Вообще, очереди в Oracle достаточно интересный, хоть и громоздкий, инструмент. Но есть в них одна особенность, которая может не позволить вам их использовать. Дело в одном баге, который проявляется при перекомпиляции пакетов.

Допустим, в процедуре-подписчике в качестве полезной нагрузки вызывается сторонний пакет, например так:

addLog(l_tr_obj.n * pkg_util.getRandom);

В случае, если в один момент времени существует процесс, который использует процедуру-подписчик, и перекомпилируется пакет pkg_util, процедура подписчик выпадет с ошибкой:

ORA-06508: PL/SQL: could not find program unit being called

Если ваша система высоконагружена, и сообщения в очередь поступают непрерывно, данный процесс продолжит выпадать с этой ошибкой на все сообщения из очереди.

Данный баг чаще выпадает при компиляции заголовка пакета и реже при компиляции тела.

Существует два известных мне решения этой проблемы:
1) запретить запись сообщений в очередь и немного подождать
2) убить процесс, выпавший с ошибкой, после чего автоматически создастся другой процесс, подхватив новую версию пакета

Использовать или нет очереди в своих проектах — решать вам. Для себя я решил следующее: если задача относительно небольшая и на её реализацию не потребуется много времени, лучше напишу свой велосипед.

Оставьте комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *