Oracle создание jms очереди

Обновлено: 04.07.2024

Oracle Data Integration, Cloud, Spatial and Analytics (GoldenGate, ODI, Cloud, Spatial, Exadata)

В последнее время довольно часто встречаюсь с задачей, когда данные, захватываемые GoldenGate, нужно доставлять в нестандартные приемники.

Под стандартными я понимаю:

  • базы данных Oracle, MSSQL, DB2, MySQL, Sybase, TimesTen и др.
  • плоские файлы.

В случае нестандартного источника GoldenGate не знает, как правильно записать информацию в целевую систему и приходится либо использовать возможности целевой системы для загрузки данных, либо использовать Java API для написания собственные процедур. Но есть третий путь – использовать адаптер GoldenGate for JMS. О последнем вариант мы и поговорим.

Применение

Где это может быть использовано? Вот довольно типовые задачи:

Все эти задачи объединяются общими характеристиками:

  • Real-Time (или близкий к Real-Time) режим работы
  • изначальная нерегламентированность задач (база данных и приложение разрабатывались до появления этих требований)
  • нежелание грузить базу данных (она обычно и так работает на пределе железа).

В пределе это классическая задача, где источником является база данных, а целевая система – Complex Event Processing.

Способов решения существует несколько:

image

  1. Запустить приложение, которое в цикле будет постоянно опрашивать таблицу и отсылать уведомления (ресурсоемко и немасштабируемо).
  2. Использовать возможности DBMS_CHANGE_NOTIFICATION (обработку уведомлений нужно будет писать на PL/SQL или C, существует только в СУБД Oracle)
  3. Использовать GoldenGate со специальным адаптером JMS (быстро, масштабируемо, нулевая дополнительная нагрузка на источник, гибко). Схема решения следующая:

Инсталляция необходимых компонент

  1. Агент GoldenGate, который будет перелопачивать журналы СУБД в формат, понятный GoldenGate (в Trail-файлы). Этот компонент специфичен для каждой СУБД+ОС. (раздел Oracle GoldenGate on Oracle Media Pack или Oracle GoldenGate for Non Oracle Media Pack)
  2. Агент GoldenGate, который будет заниматься преобразованием данных, захваченных в п.1 и помещением их в JMS-очередь (Oracle GoldenGate Application Adapters for JMS and Flat File Media Pack).
  3. Провайдер очереди JMS. В нашем случае это будет Weblogic, но могут быть и другие – ActiveMQ, Websphere и т.д.

Я буду для простоты рассматривать вариант, когда у меня источник СУБД Oracle.

Подготовка среды Oracle для GoldenGate

0. Стандартным образом инсталлируем GoldenGate для Oracle

Здесь не буду приводить подробности. При необходимости можно заглянуть вот сюда.

1. Конфигурируем процесс Manager на источнике
2. Создаем табличку для тестов на источнике
3. Добавляем Supplemental Logging
4. Настраиваем захват и передачу изменений на целевой сервер
Подготовка целевой системы
1. Инсталляция Oracle GoldenGate for Java
  • ggs_JavaAdapter_xxxxxxxxxxx.tar
  • ggs_xxxxx_Generic_xxxxxxxxx.zip

image

image

В Windows первый файл уже распакова

В результате структура содержимого каталога будет такой:

image

2. Инсталлируем Weblogic

Инсталлируем Java, а затем запускаем инсталляцию скачанного Weblogic:

java -jar wls1036_generic.jar

Далее на каталог с Weblogic будем называть <WLSinst>, по умолчанию этот каталог называется wlserver_10.3/ и располагается на уровень ниже Fusion Middleware Home <FMWhome>.

Для того, чтобы работать с Weblogic сформируем набор клиентских библиотек:

a. cd <WLSinst>/server/lib
b. java -jar wljarbuilder.jar

3. Создаем и запускаем Weblogic Domain
  • <WLSinst>/common/bin/config.sh
  • <FMWhome>/user_projects/domains/<DomainName>/startWeblogic.sh
4. Создаем JMS Connection Factory и Queue

image

Настройка GoldenGate для JMS

В качестве шаблона будем использовать следующий текст (в поставке GoldenGate for Java есть этот файл в каталоге sample-dirprm.

Все параметры должны писаться в одну строку. Особое внимание нужно обратить на параметр javawriter.bootoptions -Djava.class.path – каталоги и файлы должны разделяться в Windows с помощью точки с запятой и с помощью двоеточия в Linux.

Синтаксис при настройке параметров JMS-обработчиков следующий :
gg.handlerlist=<somename_1>,<somename_2>,…,<somename_n>
а затем :
gg.handler.<somename_1>.<property_a>=<value_w>
gg.handler.<somename_1>.<property_b>=<value_x>
gg.handler.<somename_2>.<property_a>=<value_y>
gg.handler.<somename_2>.<property_b>=<value_z>

Имена обработчиков (в примере это myjms1) никак не связаны c именами, используемыми в WebLogic Server. Имена свойств чувствительны к регистру.

Такой подход к настройке параметров позволяет определить несколько обработчиков в одном файле свойств. Обработчики, не перечисленные в параметре gg.handlerlist, игнорируются.

2. Настройка переменной PATH для доступа к JVM

Следующие настройку нужно будет сделать до запуска процесса MGR, поскольку процесс Extract наследует переменные среды от MGR.

На системе, где мы инсталлируем GoldenGate for Java, нужно будет настроить переменные PATH (Windows) или LD_LIBRARY_PATH (UNIX), чтобы GoldenGate мог найти JVM. Можно также в переменную PATH включить путь к запусковому файлу java, но это не обязательно, поскольку GoldenGate использует динамическую библиотеку jvm.dll (libjvm.so).

Для Windows я использовал следующие команды:

set JAVA_HOME=C:\Program Files\Java\jdk1.7.0_02
set PATH=%JAVA_HOME%\bin;%PATH%
set PATH=%JAVA_HOME%\jre\bin\server;%PATH%

Обратите внимание: на Windows в качестве разделителя используется точка с запятой (;), а на Unix – двоеточие(:).

3. Генерируем файл определений на приемнике (DEF)
4. Настраиваем менеджер для GoldenGate for Java

Повторяем все, что делали в разделе “Конфигурируем процесс Manager на источнике”.

5. Настройка процесса Extract для доставки данных в очередь

Мы создаем Extract ejms (имя может быть любое).

EXTRACT ejms
setEnv ( GGS_USEREXIT_CONF = "dirprm/javaue.properties" )
GetEnv (JAVA_HOME)
GetEnv (PATH)
GetEnv (LD_LIBRARY_PATH)
GetEnv (LIBPATH)
GetEnv (CLASSPATH)
CUserExit ggjava_ue.dll CUSEREXIT PASSTHRU INCLUDEUPDATEBEFORES
sourceDefs .\dirdef\emp.defs
getUpdateBefores
TABLE ggtest.employees;

Ключевые параметры здесь

6. Создаем и запуска Extact для JMS очереди

GGSCI> ADD extract ejms, extTrailSource dirdat/tb
GGSCI> START ejms
GGSCI> INFO ejms

Проверка работы

1. Генерируем нагрузку на источник

ALTER SESSION SET CURRENT_SCHEMA=GGTEST;

declare
n number;

Заключение

GoldenGate – это инструмент для интеграции данных. Он позволяет реплицировать базы данных, осуществлять безостановочную миграцию, создавать хранилища, работающие в реальном времени.

Использование модуля GoldenGate for Java позволяет пойти гораздо дальше. Я показал, как использовать GoldenGate для доставки изменений в очередь JMS, а это открывает весь мир SOA. С моей точки зрения – наиболее интересным применением может стать подача данных в Oracle Complex Event Processing, который, в свою очередь, открывает мир событийно-управляемых приложений.

Are you aware how much time I've spent learning for details of Java? Thread management, dynamics, CORBA.

среда, 29 июня 2011 г.

Транзакционное чтение из JMS-очереди в Oracle SOA Suite

Одним из наиболее применяемых паттернов интеграции информационных систем является паттерн "Фильтры и трубы". При построении интеграционного решения на Oracle SOA Suite в качестве "фильтров" выступают композиты, а в качестве "труб" удобно использовать JMS-очереди.

Создание демонстрационного проекта

Для демонстрации создадим проект JMSTransactionDemo. Вся разработка композитов для Oracle SOA Suite будет производиться в интегрированной среде разработки JDeveloper.


В создаваемом проекте создадим пустой композит SOA Suite.




Укажем подключение к серверу приложений, выбрав его из списка. Если у вас нет подключений к серверам приложений, то необходимо создать такое подключение, нажав на кнопку с изображением большого зеленого плюса.






На последнем шаге мастера конфигурирования JMS-адаптера нас проинформируют о том, что определение адаптера завершено и что после нажатия кнопки Finish будут созданы файлы с его описанием.


Сконфигурированный адаптер отобразится в окне редактирования композита. Так как мы выбрали операцию Consume Message, то данный адаптер отобразится в левой части композита, т.е. он представляет собой сервис, через который можно создавать новые экземпляры композита.



В окне редактирования композита созданный BPEL процесс необходимо соединить проводом с JMS-адаптером. После этого в BPEL-процессе будет доступна партнерская ссылка InJMSAdapter, которую нужно использовать для настройки активности Receive.




Активность Throw по-умолчанию не настроена. Если по активности щелкнуть два раза, то появится соответствующий диалог настройки.


Обязательными для заполнения являются поля группы Fault QName: Namespace URI и Local Part. Заполняются данные поля с помощью специального диалога, доступного по кнопке с изображением лупы.


  • System Faults - системные ошибки;
  • Project WSDL Fault - ошибки, определенные в WSDL файлах проекта, например, исключительные ситуации, определенные в методах веб-сервисов;
  • Partner Links - ошибки, определенные в файлах определения партнерских ссылок.

Сейчас в демонстрационном проекте доступны только системные ошибки, информации об ошибках из других групп просто нет. Поэтому выберем для активности Throw системную ошибку, например rollback.


Информация о системной ошибке rollback будет скопирована в определение активности Throw.


Настройка транзакций в BPEL-процессе

Прежде всего следует указать тип распространения транзакции в BPEL-процессе. Делается это установкой свойства bpel.config.transaction компонента BPELProcess. Возможны значения: required - если на момент запуска процесса транзакция уже создана, то процесс исполняется в рамках данной транзакции, если транзакция не создана, то она создается, и requiresNew - всегда создается новая траназакция, если же транзакция на момент запуска процесса существует, то она приостанавливается. Значением данного свойства по-умолчанию является required.

Значения всех свойств прописываются с помощью тега property в xml-описании композита. Полное описание композита демонстрационного примера, в котором выставлены все значения описанных свойств, следующее:


Тестирование работы демонстрационного композита

Протестируем работу композита. Для этого развернем его на сервере SOA Suite. В Enterprise Manager откроем вкладку с информацией о композите JMSTransactionDemo и убедимся, что запущеных экземпляров данного композита нет.





В окне трассировки демонстрационного композита видно, что было создано пять экземпляров BPEL-процесса DemoBPELProcess, при этом каждый экземпляр завершился неуспешно.


Ограничения, накладываемые на транзакционное чтение из очереди

В Oracle SOA Suite имеется ряд ограничений, накладываемых на транзакционное чтение из JMS-очереди. Рассмотрим их подробнее.


Начинаем работу

Настройка готового и создание собственного адаптера

Основная роль адаптера - преобразование данных, поступающих из различных источников в формат событий Oracle CEP. Затем эти события передаются следующим по цепочке компонентам сети обработки событий. Обычно, адаптер - это точка входа или выхода событий в/из приложения Oracle CEP.

В поставке Oracle CEP идут следующие готовые адаптеры:

Если у вас другой источник, например:

  • датчики со своими API
  • плоские файлы
  • RSS, twitter
  • Reuters, Wombat или Bloomberg

то пишется Java-класс, обрабатывающий событие. Мы рассмотрим 2 варианта получения событий: с помощью JMS и с помощью адаптера собственной разработки.

Создаем тип события jmsEvent
  1. Как было оговорено - на вход сети будут поступать события, содержащие 2 атрибуты username и message. Для эффективной работы с этими событиями создадим класс Java - cep.event.JMSEvent. В этом классе создадим 2 private поля - username и message:
  2. Для эти полей реализуем соответствующие методы get и set. Для этого кликнем правой кнопкой в редакторе кода и выберем Source->Generate Getter And Setter methods.
  3. Теперь необходимо зарегистрировать тип события в EPN Editor на закладке Event Types.
Создаем потребителя событий (Sink)

Обычно событий, поступающие из адаптера перерабатываются, просеиваются, анализируются и агрегируются. В нашем простейшем случае - мы сразу направим поток событий потребителю для вывода на экран. Выводом на экран будет заниматься специальный класс cep.sink.simpleListener.

  1. Создаем новый класс - cep.sink.simpleListener. Для того, чтобы включить его в качестве потребителя в сеть, необходимо в этом классе реализовать соответствующий интерфейс - StreamSink.
  2. Вывод на экран организовываем с помощью System.out:

Основная терминология JMS

JMS Server - это контейнер для управления ресурсами JMS модуля. JMS server требуется для создания JMS module

JMS module - это модуль который содержит такие ресурсы как queues и topics Модуль JMS требуется для того, чтобы создать JMS queue.
Subdeployment - jms модули могут находиться на нескольких экземплярах WebLogic сервера(targets-ах). Ресурсы в JMS модуле , такие как queues и topics также могут находиться на сервере JMS или экземплярах сервера WLS. Subdeployment представляет собой группировку targets-ов. Connection Factory - connection factory в Weblogic JMS это ресурс который позволяет JMS клиентам создавать соединения к JMS destinations.
Зная основные понятия, давайте настроим WebLogic JMS и создадим ADF приложение для работы с ним. Приступим.

Начнем с WebLogic

Сначала нужно создать JMS Server. Он создается на вкладке Services->Messaging->JMS Servers


Вам предложат дать имя серверу и выбрать тип работы с данными(сохранять или нет данные)


Далее вам нужно выбрать экземпляр WebLogic сервера на котором будет находиться JMS Server и нажать Finish.


После этого нужно создать JMS Module. Он создается на вкладке Services->Messaging->JMS Modules. В первом окне введите имя и нажмите Next.



Далее нужно выбрать необходимый вам target, нажать на Next, а потом Finish.



После этого нужно зайти в созданный JMS Module, открыть вкладку Subdeployment и нажать на New.


И выбрать сервер который мы создали ранее


Далее нужно создать Connection Factory, для этого в созданном JMS модуле нужно зайти на вкладку Configuration и нажать New


Выбрать Connection Factory


Дать ему имя и JNDI имя


Выбрать созданный ранее Subdeplyment и нажать Finish.


Теперь собственно можно создать саму Queue (или Topic). Для этого нужно вернуться на вкладку Configuration в вашем JMS Module, там нажать на New, а потом Queue


Дать ему имя, JNDI имя и нажать на Finish.


jspx должен выглядеть примерно так:


Теперь для работы с JMS нужно добавить две jar-ки :
- javax.jms._1.1.4.jar (для Jdeveloper 12.1.3.0.0 находится в Oracle_Home\oracle_common\modules)
- weblogic.jar (Oracle_Home\wlserver\server\lib)


Для отправки и приема код должен выглядеть так :




В прошлой статье про Oracle CEP я писал, как настроить среду Eclipse для работы с Oracle CEP. Кроме того, я рассказал о том, как запустить приложение HelloWorld, которое идет в качестве примера, а также разобрал в деталях элементы приложения Oracle CEP.


Начинаем работу

  1. Инсталлируем плагин для Oracle CEP и запускаем сервер Oracle CEP. Это детально описано — в предыдущей статье.
  2. Создаем новое приложение для Oracle CEP.
  3. Задаем параметры приложения Oracle CEP. При этом выбираем не использовать базовые приложения
  4. Приложение создано и мы можем приступать к настройке работы приложения.

Настройка готового и создание собственного адаптера

Основная роль адаптера — преобразование данных, поступающих из различных источников в формат событий Oracle CEP. Затем эти события передаются следующим по цепочке компонентам сети обработки событий. Обычно, адаптер — это точка входа или выхода событий в/из приложения Oracle CEP.

В поставке Oracle CEP идут следующие готовые адаптеры:

Если у вас другой источник, например:

  • датчики со своими API
  • плоские файлы
  • RSS, twitter
  • Reuters, Wombat или Bloomberg

то пишется Java-класс, обрабатывающий событие. Мы рассмотрим 2 варианта получения событий: с помощью JMS и с помощью адаптера собственной разработки.

Создаем тип события jmsEvent

Создаем потребителя событий (Sink)

Обычно событий, поступающие из адаптера перерабатываются, просеиваются, анализируются и агрегируются. В нашем простейшем случае — мы сразу направим поток событий потребителю для вывода на экран. Выводом на экран будет заниматься специальный класс cep.sink.simpleListener.

  1. Создаем новый класс — cep.sink.simpleListener. Для того, чтобы включить его в качестве потребителя в сеть, необходимо в этом классе реализовать соответствующий интерфейс — StreamSink.
  2. Вывод на экран организовываем с помощью System.out:
  3. Ну и последнее — нужно связать выход адаптера со входом listener. Для этого в EPN Editor жмем правой кнопкой на полотне и выбираем New->Event Bean. Затем кликаем на иконке адаптера и, удерживая левую кнопку мыши, тащит стрелку на eventBean. После соединения адатера и listener стрелкой появляется ошибка (около адаптера загорается красный фонарик) — он пропадет после того, как мы свяжем ранее описанный класс с элементом сети eventBean.

Разворачиваем приложение и тестируем.

Для того, чтобы развернуть приложение на сервере:

    Запускаем сервер CEP

    (60 Мб) (7 Кб) запускаем следующей командой

Результат работы


Добавляем процессор

Мы только что создали простейшее приложение:


Давайте добавим обработчик событий. Для того мы создаем 2 канала и процессор, а также коммутируем их:



Кликнем два раза по каждому каналу и укажем какой тип события следует ожидать по каждому из них:



Как можно видеть процессору сопоставлен 1 запрос на CQL и тот закомментирован

Давайте раскомментируем его и зададим условия на события (используется regular expression), которые процессор будет пропускать дальше по цепочке:


Заключение

Приложение, которое мы только что создали обеспечивает непрерывный анализ потока поступающих событий и вычленение из него событий, интересных нам. Обработка относительно простая, но ее очень просто можно расширить до более сложной.

Читайте также: