Oracle DB. Использование dbms_alert для реализации очереди и параллельной обработки данных.

В состав Oracle включен интересный пакет dbms_alert, позволяющий реализовывать асинхронные событийные уведомления. В этой статье я покажу простейшую реализацию обработки сообщений из классической очереди.

Имеется таблица очереди, в которой содержится идентификатор сообщения и дата его добавления:

create table t_queue
(
id_queue number primary key,
d_add timestamp default systimestamp not null 
);

Также имеется последовательность, из которой будут генерироваться ID сообщений в очередь(id_queue):

create sequence seq_t_queue;

Идея такова. Один процесс создает в этой таблице запись и ждет ответ. В это же время другой процесс вычитывает запись из этой таблицы, выполняет полезную нагрузку и отдает ответ. Первый процесс получает ответ. Все счастливы.

Для начала реализуем процесс-обработчик. Для этого напишем простую процедуру-обработчик…

create or replace procedure sendRandom is
  l_id_queue t_queue.id_queue%type;
  l_cur sys_refcursor;
  l_random number;
begin
  loop
    open l_cur for
    select id_queue
    from t_queue
    order by d_add asc
    for update skip locked;
    
    fetch l_cur into l_id_queue;
    
    if l_id_queue is not null
      then
        delete from t_queue where id_queue = l_id_queue;        
        l_random := dbms_random.value;
        dbms_alert.signal(l_id_queue, l_random);
        commit;
      else
        rollback;
        dbms_lock.sleep(0.1);
    end if;
  end loop;  
end sendRandom;

…и джоб для нее

begin
  dbms_scheduler.create_job(
                            job_name        => 'job_queue_thread_1'
                           ,job_type        => 'PLSQL_BLOCK'
                           ,job_action      => 'begin sendRandom; end;'
                           ,start_date      => systimestamp
                           ,end_date        => NULL
                           ,enabled         => TRUE
                           );
end;

Процедура выгребает самую старую запись, блокирует её. Блокировка необходима для случаев, когда записи из очереди выгребает не один процесс, как сейчас, а несколько. Далее процедура выполняет полезную нагрузку(в данном случае расчет рандомного числа) и отправляет ответ другому процессу с именем события l_id_queue используя dbms_alert.signal.

Важно!

Джоб зупащен, а это значит что мы можем использовать процесс-инициатор:

declare
  c_id_queue constant number := seq_t_queue.nextval;
  l_status   number;
  l_answer   varchar2(64);
begin
  insert into t_queue(id_queue) values (c_id_queue);
  commit;
  
  dbms_alert.register(c_id_queue);
  dbms_alert.waitone(c_id_queue, l_answer, l_status, 5);
  
  if l_status = 0
    then
      commit;
      dbms_output.put_line('Success: '||l_answer);
    else
      rollback;
      dbms_output.put_line('Timeout');
  end if;
end;

В этом процессе мы создаем запись в таблице очереди и фиксируем её. После этого мы регистрируем имя нашего процесса(c_id_queue) и при помощи dbms_alert.waitone ждем пока джоб в фоновом процессе обработает запись из очереди. Если джоб обработал запись в течении 5 сек. l_status будет равен нулю и мы получим ответ(рандомное число) в переменной l_answer.

Leave a Reply

Your email address will not be published. Required fields are marked *