В это статье я бы хотел рассказать о том, как можно быстро и просто создавать очереди в Oracle, а так же о том как автоматически выгребать сообщения из очереди при помощи процедуры-подписчика(Subscriber).
Для корректной работы с очередями пользователю должны быть выданы привилегии на выполнение пакетов dbms_aqadm и dbms_aq.
Перед созданием очереди необходимо создать таблицу, в которой будут храниться её сообщения. Также, для передачи данных процессу, который обрабатывает сообщение из очереди, необходимо создать объектный тип, который будет содержать в себе эти данные.
Создаём тип:
create or replace type tp_queue as object(n number);
Создаём таблицу для будущей очереди, указав её имя и созданный тип:
begin dbms_aqadm.create_queue_table('tq_test','tp_queue', multiple_consumers => true); end;
Проверить, что таблица создана успешно, можно обратившись в системную таблицу
select * from user_queue_tables;
Также можно выполнить простой селект к таблице(на момент создания она еще пуста):
select * from tq_test;
Далее, необходимо создать саму очередь в привязке к созданной таблице:
begin dbms_aqadm.create_queue('q_test', 'tq_test'); end;
И запустить её:
begin dbms_aqadm.start_queue('q_test'); end;
Проверить, что очередь создана успешно, можно обратившись в системную таблицу
select * from user_queues;
На этом этапе очередь уже готова к работе и в неё можно вставлять сообщения и вычитывать их вручную. Однако мы на этом не останавливаемся и автоматизируем процесс вычитывания сообщений из очереди при помощи процедуры-подписчика.
Перед тем как создать процедуру-подписчик, создадим таблицу в которую будут складываться факты её срабатывания:
create table t_log ( d_add timestamp default systimestamp not null, s_msg varchar2(4000) );
Теперь можно создать пакет, в котором будет две процедуры: подписчик и складывание сообщения в таблицу логов.Заголовок пакета
create or replace package pkg_queue is procedure addLog(pi_msg in t_log.s_msg%type); procedure subscriber( context raw ,reginfo sys.aq$_reg_info ,descr sys.aq$_descriptor ,payload raw ,payloadl number ); end pkg_queue;
Тело пакета
create or replace package body pkg_queue is procedure addLog(pi_msg in t_log.s_msg%type) as begin insert into t_log(s_msg) values(pi_msg); end; procedure subscriber( context raw ,reginfo sys.aq$_reg_info ,descr sys.aq$_descriptor ,payload raw ,payloadl number ) as l_tr_obj tp_queue; l_msg_props dbms_aq.message_properties_t; l_queue_opts dbms_aq.dequeue_options_t; l_msg_id raw(16); begin l_queue_opts.consumer_name := descr.consumer_name; l_queue_opts.msgid := descr.msg_id; dbms_aq.dequeue(descr.queue_name, l_queue_opts, l_msg_props, l_tr_obj, l_msg_id); --здесь должна быть полезная нагрузка на основе данных из объектного типа --в данном случае в качестве нагрузки логируется запись из типа addLog(l_tr_obj.n); exception when others then addLog(sqlerrm); end; end pkg_queue;
Процедура-подписчик вычитывает(dequeue) сообщение с указанным msg_id и выполняет на основе его полезную нагрузку.
ВАЖНО!. Процедура-подписчик всегда должна иметь точно такую спецификацию, иначе её работа не гарантируется.
После этого можно добавить подписчика к выбранной очереди и зарегистрировать его:
begin dbms_aqadm.add_subscriber('q_test', sys.aq$_agent('test_subscriber', null, null)); dbms_aq.register(sys.aq$_reg_info_list(sys.aq$_reg_info('q_test:test_subscriber', dbms_aq.namespace_aq, 'plsql://pkg_queue.subscriber', hextoraw('FF'))), 1); end;
Всё готово. Теперь достаточно добавить сообщение в очередь…
declare l_msg_props dbms_aq.message_properties_t; l_queue_opts dbms_aq.enqueue_options_t; l_msg_id raw(16); begin dbms_aq.enqueue('q_test', l_queue_opts, l_msg_props, tp_queue(1), l_msg_id); commit; end;
…и проверить таблицу логов, чтобы удостовериться что подписчик отработал нормально.
select * from t_log;
20.01.16 14:54:30,774600 1
Очереди в Oracle это достаточно продвинутый инструмент.
Стандартными средствами вы можете настроить количество повторов при неуспешной вычитке, их интервал, время жизни сообщения, задержка его выполнения, http-запросы и e-mail в качестве процедуры-подписчика и многое другое.
В этой статье описан лишь простейший пример работы с ними.
Вообще, очереди в Oracle достаточно интересный, хоть и громоздкий, инструмент. Но есть в них одна особенность, которая может не позволить вам их использовать. Дело в одном баге, который проявляется при перекомпиляции пакетов.
Допустим, в процедуре-подписчике в качестве полезной нагрузки вызывается сторонний пакет, например так:
addLog(l_tr_obj.n * pkg_util.getRandom);
В случае, если в один момент времени существует процесс, который использует процедуру-подписчик, и перекомпилируется пакет pkg_util, процедура подписчик выпадет с ошибкой:
ORA-06508: PL/SQL: could not find program unit being called
Если ваша система высоконагружена, и сообщения в очередь поступают непрерывно, данный процесс продолжит выпадать с этой ошибкой на все сообщения из очереди.
Данный баг чаще выпадает при компиляции заголовка пакета и реже при компиляции тела.
Существует два известных мне решения этой проблемы:
1) запретить запись сообщений в очередь и немного подождать
2) убить процесс, выпавший с ошибкой, после чего автоматически создастся другой процесс, подхватив новую версию пакета
Использовать или нет очереди в своих проектах — решать вам. Для себя я решил следующее: если задача относительно небольшая и на её реализацию не потребуется много времени, лучше напишу свой велосипед.