В состав Oracle включен интересный пакет dbms_alert, позволяющий реализовывать асинхронные событийные уведомления. В этой статье я покажу простейшую реализацию обработки сообщений из классической очереди.
Имеется таблица очереди, в которой содержится идентификатор сообщения и дата его добавления:
create table t_queue ( id_queue number primary key, d_add timestamp default systimestamp not null );
Также имеется последовательность, из которой будут генерироваться ID сообщений в очередь(id_queue):
create sequence seq_t_queue;
Идея такова. Один процесс создает в этой таблице запись и ждет ответ. В это же время другой процесс вычитывает запись из этой таблицы, выполняет полезную нагрузку и отдает ответ. Первый процесс получает ответ. Все счастливы.
Для начала реализуем процесс-обработчик. Для этого напишем простую процедуру-обработчик…
create or replace procedure sendRandom is
l_id_queue t_queue.id_queue%type;
l_cur sys_refcursor;
l_random number;
begin
loop
open l_cur for
select id_queue
from t_queue
order by d_add asc
for update skip locked;
fetch l_cur into l_id_queue;
if l_id_queue is not null
then
delete from t_queue where id_queue = l_id_queue;
l_random := dbms_random.value;
dbms_alert.signal(l_id_queue, l_random);
commit;
else
rollback;
dbms_lock.sleep(0.1);
end if;
end loop;
end sendRandom;
…и джоб для нее
begin
dbms_scheduler.create_job(
job_name => 'job_queue_thread_1'
,job_type => 'PLSQL_BLOCK'
,job_action => 'begin sendRandom; end;'
,start_date => systimestamp
,end_date => NULL
,enabled => TRUE
);
end;
Процедура выгребает самую старую запись, блокирует её. Блокировка необходима для случаев, когда записи из очереди выгребает не один процесс, как сейчас, а несколько. Далее процедура выполняет полезную нагрузку(в данном случае расчет рандомного числа) и отправляет ответ другому процессу с именем события l_id_queue используя dbms_alert.signal.
Как вы уже наверно подумали, не стоит использовать бесконечные циклы при решении реальных задач. Хорошим решением будет установить время жизни у процедуры-обработчика, а также иметь интеллектуальный процесс-менеджер джобов, поднимающий их в зависимости от нагрузки системы.
Если захотите перекомпилировать процедуру после разворачивания джоба, сначала отключите его, иначе компиляция зависнет навечно.
Джоб зупащен, а это значит что мы можем использовать процесс-инициатор:
declare
c_id_queue constant number := seq_t_queue.nextval;
l_status number;
l_answer varchar2(64);
begin
insert into t_queue(id_queue) values (c_id_queue);
commit;
dbms_alert.register(c_id_queue);
dbms_alert.waitone(c_id_queue, l_answer, l_status, 5);
if l_status = 0
then
commit;
dbms_output.put_line('Success: '||l_answer);
else
rollback;
dbms_output.put_line('Timeout');
end if;
end;
В этом процессе мы создаем запись в таблице очереди и фиксируем её. После этого мы регистрируем имя нашего процесса(c_id_queue) и при помощи dbms_alert.waitone ждем пока джоб в фоновом процессе обработает запись из очереди. Если джоб обработал запись в течении 5 сек. l_status будет равен нулю и мы получим ответ(рандомное число) в переменной l_answer.

Полезно!