Oracle DB. Advanced Queuing. Простой пример c Subscriber.

В это статье я бы хотел рассказать о том, как можно быстро и просто создавать очереди в Oracle, а так же о том как автоматически выгребать сообщения из очереди при помощи процедуры-подписчика(Subscriber).

Примечание

Перед созданием очереди необходимо создать таблицу, в которой будут храниться её сообщения. Также, для передачи данных процессу, который обрабатывает сообщение из очереди, необходимо создать объектный тип, который будет содержать в себе эти данные.
Создаём тип:

create or replace type tp_queue as object(n number);

Создаём таблицу для будущей очереди, указав её имя и созданный тип:

begin
  dbms_aqadm.create_queue_table('tq_test','tp_queue', multiple_consumers => true);
end;
Примечание

Далее, необходимо создать саму очередь в привязке к созданной таблице:

begin
  dbms_aqadm.create_queue('q_test', 'tq_test');
end;

И запустить её:

begin
  dbms_aqadm.start_queue('q_test');
end;
Примечание

На этом этапе очередь уже готова к работе и в неё можно вставлять сообщения и вычитывать их вручную. Однако мы на этом не останавливаемся и автоматизируем процесс вычитывания сообщений из очереди при помощи процедуры-подписчика.
Перед тем как создать процедуру-подписчик, создадим таблицу в которую будут складываться факты её срабатывания:

create table t_log
(
d_add timestamp default systimestamp not null,
s_msg varchar2(4000)
);

Теперь можно создать пакет, в котором будет две процедуры: подписчик и складывание сообщения в таблицу логов.

Заголовок пакета
Тело пакета
Процедура-подписчик вычитывает(dequeue) сообщение с указанным msg_id и выполняет на основе его полезную нагрузку.
ВАЖНО! Процедура-подписчик всегда должна иметь точно такую спецификацию, иначе её работа не гарантируется.

После этого можно добавить подписчика к выбранной очереди и зарегистрировать его:

begin
  dbms_aqadm.add_subscriber('q_test', sys.aq$_agent('test_subscriber', null, null));
  dbms_aq.register(sys.aq$_reg_info_list(sys.aq$_reg_info('q_test:test_subscriber', dbms_aq.namespace_aq, 'plsql://pkg_queue.subscriber', hextoraw('FF'))), 1);
end;

Всё готово. Теперь достаточно добавить сообщение в очередь…

declare
  l_msg_props   dbms_aq.message_properties_t;
  l_queue_opts  dbms_aq.enqueue_options_t;
  l_msg_id      raw(16);
begin
  dbms_aq.enqueue('q_test', l_queue_opts, l_msg_props, tp_queue(1), l_msg_id);
  commit;
end;

…и проверить таблицу логов, чтобы удостовериться что подписчик отработал нормально.

select * from t_log;
Output
Примечание

Вообще, очереди в Oracle достаточно интересный, хоть и громоздкий, инструмент. Но есть в них одна особенность, которая может не позволить вам их использовать. Дело в одном баге, который проявляется при перекомпиляции пакетов.
Допустим, в процедуре-подписчике в качестве полезной нагрузки вызывается сторонний пакет, например так:

addLog(l_tr_obj.n * pkg_util.getRandom);

В случае, если в один момент времени существует процесс, который использует процедуру-подписчик, и перекомпилируется пакет pkg_util, процедура подписчик выпадет с ошибкой:

ORA-06508: PL/SQL: could not find program unit being called

Если ваша система высоконагружена, и сообщения в очередь поступают непрерывно, данный процесс продолжит выпадать с этой ошибкой на все сообщения из очереди.
Данный баг чаще выпадает при компиляции заголовка пакета и реже при компиляции тела.
Существует два известных мне решения этой проблемы:
1) запретить запись сообщений в очередь и немного подождать
2) убить процесс, выпавший с ошибкой, после чего автоматически создастся другой процесс, подхватив новую версию пакета

Использовать или нет очереди в своих проектах – решать вам. Для себя я решил следующее: если задача относительно небольшая и на её реализацию не потребуется много времени, лучше напишу свой велосипед.

Leave a Reply

Your email address will not be published. Required fields are marked *