В состав Oracle включен интересный пакет dbms_alert, позволяющий реализовывать асинхронные событийные уведомления. В этой статье я покажу простейшую реализацию обработки сообщений из классической очереди.
Имеется таблица очереди, в которой содержится идентификатор сообщения и дата его добавления:
create table t_queue ( id_queue number primary key, d_add timestamp default systimestamp not null );
Также имеется последовательность, из которой будут генерироваться ID сообщений в очередь(id_queue):
create sequence seq_t_queue;
Идея такова. Один процесс создает в этой таблице запись и ждет ответ. В это же время другой процесс вычитывает запись из этой таблицы, выполняет полезную нагрузку и отдает ответ. Первый процесс получает ответ. Все счастливы.
Для начала реализуем процесс-обработчик. Для этого напишем простую процедуру-обработчик…
create or replace procedure sendRandom is l_id_queue t_queue.id_queue%type; l_cur sys_refcursor; l_random number; begin loop open l_cur for select id_queue from t_queue order by d_add asc for update skip locked; fetch l_cur into l_id_queue; if l_id_queue is not null then delete from t_queue where id_queue = l_id_queue; l_random := dbms_random.value; dbms_alert.signal(l_id_queue, l_random); commit; else rollback; dbms_lock.sleep(0.1); end if; end loop; end sendRandom;
…и джоб для нее
begin dbms_scheduler.create_job( job_name => 'job_queue_thread_1' ,job_type => 'PLSQL_BLOCK' ,job_action => 'begin sendRandom; end;' ,start_date => systimestamp ,end_date => NULL ,enabled => TRUE ); end;
Процедура выгребает самую старую запись, блокирует её. Блокировка необходима для случаев, когда записи из очереди выгребает не один процесс, как сейчас, а несколько. Далее процедура выполняет полезную нагрузку(в данном случае расчет рандомного числа) и отправляет ответ другому процессу с именем события l_id_queue используя dbms_alert.signal.
Как вы уже наверно подумали, не стоит использовать бесконечные циклы при решении реальных задач. Хорошим решением будет установить время жизни у процедуры-обработчика, а также иметь интеллектуальный процесс-менеджер джобов, поднимающий их в зависимости от нагрузки системы.
Если захотите перекомпилировать процедуру после разворачивания джоба, сначала отключите его, иначе компиляция зависнет навечно.
Джоб зупащен, а это значит что мы можем использовать процесс-инициатор:
declare c_id_queue constant number := seq_t_queue.nextval; l_status number; l_answer varchar2(64); begin insert into t_queue(id_queue) values (c_id_queue); commit; dbms_alert.register(c_id_queue); dbms_alert.waitone(c_id_queue, l_answer, l_status, 5); if l_status = 0 then commit; dbms_output.put_line('Success: '||l_answer); else rollback; dbms_output.put_line('Timeout'); end if; end;
В этом процессе мы создаем запись в таблице очереди и фиксируем её. После этого мы регистрируем имя нашего процесса(c_id_queue) и при помощи dbms_alert.waitone ждем пока джоб в фоновом процессе обработает запись из очереди. Если джоб обработал запись в течении 5 сек. l_status будет равен нулю и мы получим ответ(рандомное число) в переменной l_answer.
Полезно!