Технология Oracle Streams. Настраиваем потоки данных, экономим время и деньги::Журнал СА 12.2009
www.samag.ru
     
Поиск   
              
 www.samag.ru    Web  0 товаров , сумма 0 руб.
E-mail
Пароль  
 Запомнить меня
Регистрация | Забыли пароль?
Журнал "Системный администратор"
Журнал «БИТ»
Наука и технологии
Подписка
Где купить
Авторам
Рекламодателям
Архив номеров
Контакты
   

  Опросы
  Статьи

Электронный документооборот  

5 способов повысить безопасность электронной подписи

Область применения технологий электронной подписи с каждым годом расширяется. Все больше задач

 Читать далее...

Рынок труда  

Системные администраторы по-прежнему востребованы и незаменимы

Системные администраторы, практически, есть везде. Порой их не видно и не слышно,

 Читать далее...

Учебные центры  

Карьерные мечты нужно воплощать! А мы поможем

Школа Bell Integrator открывает свои двери для всех, кто хочет освоить перспективную

 Читать далее...

Гость номера  

Дмитрий Галов: «Нельзя сказать, что люди становятся доверчивее, скорее эволюционирует ландшафт киберугроз»

Использование мобильных устройств растет. А вместе с ними быстро растет количество мобильных

 Читать далее...

Прошу слова  

Твердая рука в бархатной перчатке: принципы soft skills

Лауреат Нобелевской премии, специалист по рынку труда, профессор Лондонской школы экономики Кристофер

 Читать далее...

1001 и 1 книга  
19.03.2018г.
Просмотров: 9934
Комментарии: 0
Потоковая обработка данных

 Читать далее...

19.03.2018г.
Просмотров: 8144
Комментарии: 0
Релевантный поиск с использованием Elasticsearch и Solr

 Читать далее...

19.03.2018г.
Просмотров: 8251
Комментарии: 0
Конкурентное программирование на SCALA

 Читать далее...

19.03.2018г.
Просмотров: 5225
Комментарии: 0
Машинное обучение с использованием библиотеки Н2О

 Читать далее...

12.03.2018г.
Просмотров: 5909
Комментарии: 0
Особенности киберпреступлений в России: инструменты нападения и защита информации

 Читать далее...

Друзья сайта  

 Технология Oracle Streams. Настраиваем потоки данных, экономим время и деньги

Архив номеров / 2009 / Выпуск №12 (85) / Технология Oracle Streams. Настраиваем потоки данных, экономим время и деньги

Рубрика: Администрирование /  Продукты и решения

АНТОН ПИЩУЛИН, главный специалист ОАО «ОТП Банк», занимается разработкой и интеграцией корпоративных информационных систем

Технология Oracle Streams
Настраиваем потоки данных, экономим время и деньги

Режим DownStream технологии Oracle Streams помогает повысить эффективность использования ресурсов в высоконагруженных информационных системах.

При эксплуатации автоматических банковских систем (АБС) в условиях высокой нагрузки, а также параллельно существующих подотчетных систем (в том числе и хранилищ данных), которые зависимы от АБС и должны, например, раз в сутки забирать данные из нее, в период пиковой нагрузки может не хватить ресурсов для своевременного выполнения всех бизнес-задач. Причина понятна – на первом месте всегда стоит задача сдачи отчетности, от которой зависит напрямую деятельность банка. Помимо этой проблемы существует задача аудита данных, для которой также нужны ресурсы. Решить задачи помогает технология Oracle Streams. При помощи этой технологии поток данных с центрального сервера АБС оказывается вполне допустимым и не нагружающим систему. Однако всегда присутствует желание снизить эту нагрузку еще.

Дело в том, что классическая архитектура технологии Oracle Streams состоит из трех процессов – захвата изменений (capture), распространения изменений (propagation) и применения изменений (apply). Два первых процесса в этом случае работают на системе-источнике, то есть на стороне АБС, что не всегда допустимо не только из‑за ограниченности ресурсов, но и из соображений отказоустойчивости и безопасности. Выходом из положения является режим захвата изменений на стороне-приемнике, режим DownStream. При этом на АБС настраивается только дополнительное логирование для системы-приемника. Процессы захвата и применения изменений работают на системе-приемнике.

Между системой-источником и системой-приемником настраивается лог-транспорт. Таким образом, снимается нагрузка с АБС и повышается эффективность работы всех задач банка.

Итак, для начала работы (демонстрации) нам необходимы два экземпляра Oracle с учебной схемой данных пользователя HR (входит в учебный дистрибутив Oracle по OTN лицензии). В общем случае потоки данных могут быть между базами данных, схемами, таблицами и т.д. На практике чаще всего требуется создать поток из одной схемы в другую или из одного набора таблиц в другой. Рассмотрим поток из одной таблицы в другую. Масштабируя данный подход, можно получить набор возможностей для решения достаточно большого объема задач.

Перед началом настройки данные на сервере-источнике (site1) и сервере-приемнике (site2) должны быть синхронизированы. Вся инсталляция программного обеспечения Oracle была выполнена в каталоги, которые предлагает инсталлятор по умолчанию (на платформе Windows – исключительно для целей обучения). Все скрипты написаны в общем виде на PLSQL так, чтобы их легко можно было масштабировать.

Часть 1. Сервер-источник (site1)

В этой части будем настраивать сервер, на котором будет формироваться поток изменений.

Настройка параметров инициализации экземпляра Oracle

  •  Настраиваем параметр global_names. Этот параметр накладывает стандарт на название dblink, а именно названия должны совпадать с именем базы данных (или имя базы.домен).

global_names = true

  •  Режим archivelog должен быть включен.
  •  Возможность создания логов на удаленный сервер-приемник должна быть включена:

remote_archive_enable = true

  •  Настраиваем лог-транспорт на удаленный сервер-приемник

log_archive_dest_2 = 'SERVICE=site2 noregister reopen=300 template=C:\oracle\streams\site1_%t_%s_%r.arc'

Папку C:\oracle\streams я создал вручную.

  •   На всякий случай проверяем параметр (необходим для лог-транспорта):

log_archive_dest_state_2 = 'enable'

  •  Создаем пользователя – администратора streams и даем ему роль dba:
create user strmadm identified by strmadm;

grant dba to strmadm;

BEGIN

  DBMS_STREAMS_AUTH.grant_admin_privilege(

    grantee          => 'strmadm',  

    grant_privileges => true);

    commit;

END;

Настройка реплицируемых объектов (далее все от пользователя strmadm)

Будем настраивать поток из базы site1 объекта hr.employees в базу данных site2.

Все объекты, участвующие в репликации, должны быть специальным образом настроены. На сервере-источнике для идентификации записей, для процессов streams, в журнальные логи должна попадать дополнительная информация (в формате sys.anydata) об операциях над объектами. Эта дополнительная информация будет считана процессом захвата изменений (capture) на сервере-приемнике.

DECLARE

TYPE tp IS TABLE OF VARCHAR2(20);

  tb tp := tp(

  –-Определяем схему и имя реплицируемого объекта

  'hr.employees'

  );

 

BEGIN

  FOR i IN tb.FIRST .. tb.LAST

  LOOP

  --Включаем дополнительное логирование на таблицу на основе ключей

  DBMS_CAPTURE_ADM.prepare_table_instantiation(

    table_name          => tb(i),

    supplemental_logging => 'keys');

  END LOOP;

END;

Часть 2. Сервер-приемник (site2)

В этой части будем настраивать сервер, который будет принимать поток изменений с сервера-источника.

Настройка параметров инициализации экземпляра Oracle

  • Настраиваем параметр global_names. Этот параметр накладывает стандарт на название dblink, а именно названия должны совпадать с именем базы данных (или имя базы. домен).

global_names = true

  • Режим archivelog должен быть включен.
  • Формат архивных журнальных логов log_archive_format сделаем таким же, как на системе-источнике с точностью до sid, то есть site2_%t_%s_%r.arc.
  • Создаем администратора streams и даем ему роль dba.
create user strmadm identified by strmadm;

grant dba to strmadm;

BEGIN

  DBMS_STREAMS_AUTH.grant_admin_privilege(

    grantee          => 'strmadm',  

    grant_privileges => true);

    commit;

END;

Настройка реплицируемых объектов (далее все от пользователя strmadm)

  • Создаем dblink на сервер-источник для автоматизации настройки процесса захвата изменений (capture):
CREATE DATABASE LINK site1 CONNECT TO strmadm IDENTIFIED BY strmadm

   USING '(DESCRIPTION =

    (ADDRESS_LIST =

     (ADDRESS = (PROTOCOL = TCP)(HOST = localhost)(PORT = 1521))

    )

    (CONNECT_DATA =

      (SID = site1)

    )

  )';
  • Включаем логирование на необходимые таблицы (по аналогии с сервером-источником):
DECLARE

TYPE tp IS TABLE OF VARCHAR2(20);

  tb tp := tp(

  'hr.employees'

  );

BEGIN

  FOR i IN tb.FIRST .. tb.LAST

  LOOP

  --Включаем дополнительное логирование на таблицу на основе ключей

  DBMS_CAPTURE_ADM.prepare_table_instantiation(

    table_name          => tb(i),

    supplemental_logging => 'keys');

  END LOOP;

END;
  • Создаем очереди сообщений. Архитектура Oracle Streams в режиме DownStream устроена следующим образом: процесс захвата изменений (capture) формирует сообщения из журнальных логов, которые порождает сервер-источник (site1), и помещает их в очередь типа sys.anydata, затем процесс применения (apply) сканирует очередь сообщений и применяет изменения на сервере-приемнике (site2).
BEGIN

DBMS_STREAMS_ADM.set_up_queue(

   --Таблица очереди сообщений

   queue_table => 'site1_queue_table',

  --Название очереди сообщений

   queue_name  => 'site1_queue'

   );

END;
  • Создаем процесс захвата изменений (capture):
BEGIN

    DBMS_CAPTUR_ADM.create_capture(

   --Название очереди сообщений

    queue_name => 'strmadm.site1_queue',

    --Название процесса захвата

    capture_name => 'capture_site1',

    -–Использовать dblink для настройки

    use_database_link => true,

    --База-источник

    source_database => 'site1'

    );

END;
  • Создаем правила для процесса захвата изменений. Дело в том, что архитектурой Oracle Streams предусмотрено, что каждый процесс должен иметь хотя бы одно правило (условие), при выполнении которого сообщение пропускается дальше или нет. Правила бывают положительные и отрицательные, то есть при срабатывании положительного правила сообщение пропускается, при срабатывании отрицательного – нет. В данной статье используются самые простые положительные правила.
DECLARE

TYPE tp IS TABLE OF VARCHAR2(20);

  tb tp := tp(

  'hr.employees'

  );

BEGIN

  FOR i IN tb.FIRST .. tb.LAST

  LOOP

--Создание положительного правила для таблицы

 DBMS_STREAMS_ADM.add_table_rules(

  table_name => tb(i),

  --Тип процесса streams

  streams_type => 'capture',

  --Название процесса streams

  streams_name => 'capture_site1',

  --Очередь сообщений streams

  queue_name => 'strmadm.site1_queue',

  --Включить dml операции

  include_dml => true,

  --Включить ddl операции

  include_ddl => true,

  --База-источник

  source_database => 'site1',

  --Положительное правило

  inclusion_rule => true

  );

 END LOOP;

END;
  • Добавляем сервисные поля в реплицируемые объекты. В начале статьи упоминалось о задаче аудита данных. Здесь предлагается на уровне записи отслеживать, когда запись была вставлена, обновлена или удалена. Для этого добавляются два сервисных поля – одно типа date (дата транзакции над записью), другое – varchar2 (для отслеживания типа транзакции над записью).
DECLARE

TYPE tp IS TABLE OF VARCHAR2(20);

  tb tp := tp(

  'hr.employees'

  );


BEGIN

  FOR i IN tb.FIRST .. tb.LAST

  LOOP

  execute immediate 'alter table '||tb(i)||' add status_rec varchar2(1)';

  execute immediate 'alter table '||tb(i)||' add status_dt date';

 END LOOP;


END;
  • Создаем функцию трансформации для процесса применения изменений. Она необходима для заполнения сервисных полей на системе-приемнике, созданных в предыдущем пункте. Функция трансформации в общем случае привязывается к любому правилу, соответственно к любому процессу Streams и является расширением простых правил. В данной статье функция трансформации привязывается к процессу применения изменений (apply), как это будет видно ниже.
create or replace function site1_trans(evt in sys.anydata)

--Получаем сообщение и отдаем его дальше после трансформации

return sys.anydata is

  lcr       sys.lcr$_row_record;

  lcr_val   SYS.LCR$_ROW_LIST;

  obj_name  varchar2(30);

  rc        number;

  cmd_tp    varchar2(10);

  st        varchar2(1);

  dt        date;

begin


  --Если сообщение dml

  if evt.GetTypeName='SYS.LCR$_ROW_RECORD' then

     rc := evt.GetObject(lcr);

     obj_name := lcr.get_object_name();


        --Определяем тип транзакции

        cmd_tp := lcr.get_command_type();

        st := upper(substr(cmd_tp,1,1));

        --Если удаление, то трансформируем в обновление

        if st = 'D' then

          lcr_val := lcr.get_values('OLD');

          lcr.set_command_type('UPDATE');     

          lcr.set_values('OLD',lcr_val);   

         

          dt := lcr.get_source_time();

          --Заполняем сервисные поля

          lcr.ADD_COLUMN('NEW','status_dt',sys.anydata.convertdate(dt));

          lcr.ADD_COLUMN('NEW','status_rec',sys.anydata.ConvertVarchar2(st));

 

          return sys.anydata.convertobject(lcr);

        end if;

        --Заполняем сервисные поля

        lcr.ADD_COLUMN('NEW','status_dt',sys.anydata.convertdate(sysdate));

        lcr.ADD_COLUMN('NEW','status_rec',sys.anydata.ConvertVarchar2(st));     

     return sys.anydata.convertobject(lcr);

       end if;    

  return evt;

end site1_trans;
  • Cоздаем процесс применения изменения (apply) сразу вместе с простыми положительными правилами.
DECLARE

TYPE tp IS TABLE OF VARCHAR2(50);

  tb tp := tp(

  'hr.employees'

  );

BEGIN

  FOR i IN tb.FIRST .. tb.LAST

  LOOP

 DBMS_STREAMS_ADM.add_table_rules(

  table_name => tb(i),

  --Тип процесса streams

  streams_type => 'apply',

  --Название процесса streams

  streams_name => 'apply_site1',

  --Очередь сообщений streams

  queue_name => 'strmadm.site1_queue',

  --Включить dml операции

  include_dml => TRUE,

  --Включить ddl операции

  include_ddl => true,

  --База-источник

  source_database => 'site1',

  --Положительное правило

  inclusion_rule => true

  );     

 END LOOP;

END;

Ниже приведен код, устанавливающий вручную scn (снапшот), с которого процесс захвата (capture) начинает отслеживать изменения на сервере-источнике. Данный код не обязателен, так как данная операция производится автоматически при выполнении кода выше. Однако на практике иногда возникает необходимость выполнить это вручную.

DECLARE

TYPE tp IS TABLE OF VARCHAR2(50);

  tb tp := tp(

  'hr.employees'

  );

BEGIN

  FOR i IN tb.FIRST .. tb.LAST

  LOOP

    DECLARE

     iscn  NUMBER;         --scn

    BEGIN

     --Получаем scn базы сервера-источника

     iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER@site1;

     --Устанавливаем scn на таблицу сервера-приемника

     DBMS_APPLY_ADM.set_table_instantiation_scn(

       source_object_name    => tb(i),

       source_database_name  => 'site1',

       instantiation_scn     => iscn);

    END;

  END LOOP;

END;
  • Настраиваем функции трансформации. Привязываем функции трансформации к правилам процесса применения изменений (apply).
DECLARE
 
--Осуществляем выборку искомых правил

CURSOR c1 (s1 varchar2) IS

select c.rule_name from dba_streams_rules c where c.streams_type='APPLY'

and c.rule_type = 'DML';

ss1 varchar2(200);

BEGIN

  OPEN c1(ss1);

  LOOP

     begin

     FETCH c1 INTO ss1;

     EXIT WHEN c1%NOTFOUND;

     --Привязываем существующую функцию трансформации к правилам

     DBMS_STREAMS_ADM.set_rule_transform_function(ss1,'site1_trans');

  exception when others then null;

     end;  

  END LOOP;

END;

Часть 3. Тестирование Oracle Streams

Сейчас мы готовы протестировать всю проделанную нами работу. Для начала необходимо стартовать созданные нами процессы.

--Старт процессов захвата и применения изменений

BEGIN

  dbms_apply_adm.start_apply('apply_site1');

  dbms_capture_adm.start_capture('capture_site1');

END;
  • На сервере-источнике (site1) выполним следующую транзакцию типа update.

update hr.employees t set t.first_name='anton' where t.employee_id=198;

commit;

--Попереключаем несколько раз журнальные логи, так как мы быстрее хотим дождаться результата теста

alter system switch logfile;

Через некоторое время мы должны увидеть на сервере-приемнике следующее.

select t.first_name,t.status_rec,t.status_dt from hr.employees t where t.employee_id=198;

anton            U       01.11.2009 20:16:25 (в вашем случае дата будет другая)
  •  На сервере-источнике (site1) выполним следующую транзакцию типа delete.

delete from hr.employees t where t.employee_id=199;

commit;

--Попереключаем несколько раз журнальные логи

alter system switch logfile;

Через некоторое время мы должны увидеть на сервере-приемнике следующее.

select t.first_name,t.status_rec,t.status_dt from hr.employees t where t.employee_id=199;

Douglas    D      02.11.2009 8:33:23  (в вашем случае дата будет другая)
  • На сервере-источнике (site1) выполним следующую транзакцию типа insert.

insert into hr.employees

select 207, e.first_name, e.last_name, 'letanton@mail.ru', e.phone_number, e.hire_date, e.job_id, e.salary, e.commission_pct, e.manager_id, e.department_id

from hr.employees e where e.employee_id=198;

commit;

--Попереключаем несколько раз журнальные логи

alter system switch logfile;

Через некоторое время мы должны увидеть на сервере-приемнике следующее.

select t.first_name,t.status_rec,t.email,t.status_dt from hr.employees t where t.employee_id=207;

anton      I    letanton@mail.ru   02.11.2009 8:43:5 (в вашем случае дата будет другая)

Вот и все, теперь еще пара слов об основных системных представлениях, необходимых для мониторинга нашего потока:

--Просмотр состояния процесса захвата изменений

Select * from dba_capture

--Просмотр состояния процесса применения изменений

select * from dba_apply;

--Детализация ошибок процесса применения

select * from dba_apply_error;

--Мониторинг работы процесса захвата изменений

   SELECT c.CAPTURE_NAME,

       SUBSTR(s.PROGRAM,INSTR(s.PROGRAM,'(')+1,4) PROCESS_NAME,

       c.SID,

       c.SERIAL#,

       c.STATE,

       c.TOTAL_MESSAGES_CAPTURED,

       c.TOTAL_MESSAGES_ENQUEUED

  FROM V$STREAMS_CAPTURE c, V$SESSION s

  WHERE c.SID = s.SID AND

        c.SERIAL# = s.SERIAL#;

--Мониторинг работы процесса применения изменений

SELECT r.APPLY_NAME,

       DECODE(ap.APPLY_CAPTURED,

                'YES','Captured LCRS',

                'NO','User-enqueued messages','UNKNOWN') APPLY_CAPTURED,

       SUBSTR(s.PROGRAM,INSTR(s.PROGRAM,'(')+1,4) PROCESS_NAME,

       r.STATE,

       r.TOTAL_MESSAGES_DEQUEUED,

       R.TOTAL_MESSAGES_SPILLED

       FROM V$STREAMS_APPLY_READER r, V$SESSION s, DBA_APPLY ap

       WHERE r.SID = s.SID AND

             r.SERIAL# = s.SERIAL# AND

             r.APPLY_NAME = ap.APPLY_NAME;

--Мониторинг работы процесса применения изменений

SELECT r.APPLY_NAME,

DECODE(ap.APPLY_CAPTURED,

'YES','Captured LCRS',

'NO','User-enqueued messages','UNKNOWN') APPLY_CAPTURED,

SUBSTR(s.PROGRAM,INSTR(s.PROGRAM,'(')+1,4) PROCESS_NAME,

r.STATE,

r.TOTAL_MESSAGES_DEQUEUED,

R.TOTAL_MESSAGES_SPILLED

FROM V$STREAMS_APPLY_READER r, V$SESSION s, DBA_APPLY ap

WHERE r.SID = s.SID AND

r.SERIAL# = s.SERIAL# AND

r.APPLY_NAME = ap.APPLY_NAME;

За более подробной информацией необходимо обратиться к оригинальной документации Oracle [1].

  1. http://download.oracle.com/docs/cd/B19306_01/server.102/b14228/mon_rep.htm#i1012369.

Комментарии отсутствуют

Добавить комментарий

Комментарии могут оставлять только зарегистрированные пользователи

               Copyright © Системный администратор

Яндекс.Метрика
Tel.: (499) 277-12-41
Fax: (499) 277-12-45
E-mail: sa@samag.ru