Camel apache копирование файлов

Обновлено: 01.07.2024

Apache Camel – это интеграционный фреймворк, который помогает в решении такой сложной задачи, как интеграции многих, часто сильно разнородных систем, в единое целое. На мой взгляд, в современном мире интеграция различных сервисов друг с другом играет первостепенное значение. Агрегаторы сервисов, интеграция с платежными системами, системами обработки и анализа данных. Часто ваша система уже не является просто изолированным ото всего мира приложением. И тут вам нужно средство, которое поможет соединить набор совершенно различных систем, с разными API и форматами данных, в единое целое.
Здесь то на сцену и выходит Camel.

Что может быть сложного в том, чтобы подключиться к какому-то стороннему сервису и начать использовать его API? Есть интерфейс, есть некий SLA – работаем. Какие у нас могут возникнуть основные проблемы? Давайте попробуем перечислить некоторые из них:

Настольная книга интегратора — EIP.

Давайте теперь наконец обратимся к самому фреймворку. В этом разделе мы познакомимся с основами Camel, его концепциями и терминами, а также напишем свое первое приложение с
помощью него.
В сфере интеграции приложений существует свой Hello World! И заключается он в том, чтобы скопировать(передать) файл из одной папки в другую. Прежде чем заниматься интеграцией с реальными системами, мы знакомимся с основами вот на таком простом примере.

Как бы вы сделали тоже самое с помощью Java? Просто скопировать файл из одного места в другое заняло бы уже довольно немало кода, но зачем писать что-то заново, если кто-то уже это сделал за вас?
Итак, приступим. Создайте новый проект в IDEA и с помощью вашего любимого сборщика(Gradle или Maven) подключите к проекту Camel. Я использую Maven:

Весь код по копированию файла укладывается буквально в две строчки:

Одно из основных понятий Camel – маршрут.

Маршрут — это некоторый путь из точки А в точку Б. В нашем случае он заключается в перемещении данных из директории source в директорию destination. Об этом мы и сообщаем Camel, добавляя свой маршрут:

В метод addRoutes() мы передаем объект анонимного класса RouteBuilder, задача которого, как следует из названия — построить маршрут(или сразу несколько маршрутов!).
Мы могли бы вынести свой RouteBuilder в собственный класс. Это улучшит переиспользуемость и тестируемость нашего RouteBuilder:
Затем мы стартуем Camel, ждем 10 секунд, пока отработает наш маршрут и произойдет копирование файлов:

В конце мы просто останавливаем Camel. Давайте вернемся к нашему примеру и повторим все, что только что изучили.



Сколько вы знаете шаблонов интеграции приложений(EIP)? Сколько из них вы можете использовать?

Симпатичный «верблюд» снова здесь, а значит, представляю вам продолжение серии статей об Apache Camel. В этой статье найдутся как самые необходимые, так и очень любопытные шаблоны интеграции. Расскажу о том, как они ложатся на нашу интеграцию.

Если вы знакомы с шаблонами, но решаете стоит ли связываться с «верблюдом», то наши примеры помогут разобраться. Если вам интересен путь от сценариев использования к реализации интеграции, то эта статья как раз об этом. Прошу под кат.

Напомню, мы построили свою сервисную шину на Apache Camel. Предыстория описывались в предыдущей части. Сейчас Camel для нас уже данность, с которой надо бороться. В нашем “зоопарке” прибавилось, кроме последнего, мы имеем две системы. Первая — это наша основная система, которая построена на классической трёхуровневой “клиент-серверной” архитектуре. Она представляет собой BPMS систему, сложность которой обусловлена многолетним процессом “допиливания” мелких “хотелок”. Вторая — небольшая и простая, как валенок, система той же архитектуры. Её будем называть офисом продаж. Разбирать варианты использования будем как раз на её примере.

Анкетная система с потребностью интеграции.


Наш офис продаж — приложение, которое не имеет сложной логики. Оно реализует процесс регистрации клиентов и ввод документов, необходимых для заказа. Эта система открыта для клиентов после регистрации, поэтому требования ко времени отклика и пропускной способности более высокого порядка, чем к основной системе. Ограничения безопасности, пропускной способности, независимой настройки и необходимость поддержания обособленного цикла разработки привели к выделению офиса продаж в самостоятельное приложение. Для обслуживания процесса продаж нам требовалась его интеграция в ИТ-инфраструктуру нашего заказчика.
Знакомство с интеграцией начнём с вариантов использования офиса продаж. Мы выделили в бизнес-процессе нашего приложения две основные роли: покупателя и администратора. Именно эти группы пользователей с ним работают.



На схеме разобраны основные варианты использования. Обратите внимание на правую часть схемы, несколько прецедентов выходят за рамки офиса продаж и требуют определённых действий со стороны основной системы. Именно эти прецеденты мы будем разбирать дальше.

Для формирования целостного представления о нашем офисе разберём схему. Наши покупатели подают заявки для участия в аукционах. Проведение аукционов — основной процесс офиса продаж. Для участия в них покупатели должны проходить регистрацию и оформлять заявки, заполняя большое количество скучных регистрационных форм. Администратор выполняет функции подготовки и проведения аукционов. Для этого по каждому из аукционов подготавливается специальный шаблон заявки — анкета, поэтому класс таких систем иногда называют анкетными. Процесс проведения аукциона завершается для офиса продаж передачей информации по поданным заявкам в основную систему для выбора победителя и оформления сделки.

  • Потери нормативно-справочной информации допустимы и не должны приводить к нарушению процесса функционирования подсистем;
  • Обращение во внешние системы не должно нарушать процесс проведения аукциона в случае отказа внешних систем и/или каналов связи;
  • Заявки должны гарантировано передаваться и доставляться между подсистемами;
  • Файлы должны гарантировано передаваться и доставляться между подсистемами;
  • Пользовательская нагрузка на проведение аукционов должна полностью ложиться на офис продаж, разгружая основную систему. Поэтому, взаимодействия между системами должны быть сведены к минимуму;
  • В распоряжении пользователя офиса продаж не должно быть механизмов воздействия на основную систему.
  1. Организация транспорта;
  2. Распределение функций между подсистемами;
  3. Синхронизация справочников;
  4. Передача зависимых сущностей;
  5. Наблюдение за процессом обмена;
  6. Передача файлов.

Организация транспорта.

В состав транспорта входят обе описанных выше системы и Camel. О последнем уже много было сказано в предыдущей части статьи, поэтому пойдём дальше.
Все три системы связанны брокером ActiveMQ по протоколу AMQP.

Напомню, системы обмениваются пакетами, используя JMS. Полезной нагрузкой этих пакетов решили сделать XML и сериализовать в него объекты одной из систем, используя JAXB. Но какие объекты взять за основу, чтобы при этом минимизировать временные затраты на создание интеграции? Систем две — значит, и форматов может быть два. Решили остановится на объектах офиса продаж, эта система более легковесная, объекты области домена связаны с другими архитектурными слоями этого приложения только JPA аннотациями. Выделение на её основе транспортных объектов не составляло труда. Альтернативное решение (использовать объекты основной системы) представлялось практически невыполнимым из-за наличия большого количества метаданных и сложных связей с другими бизнес-сущностями, выходящими за границы интересующего нас процесса обслуживания офиса продаж. Ещё один оставшийся вариант — создание новых транспортных объектов. Его даже не рассматривали, так как он требовал реализацию процедуры и импорта, и экспорта в обеих системах, от которых в первом случае для подсистемы офиса продаж можно было отказаться.


Наверное, вас удивило, что мы выбрали XML в качестве полезной нагрузки наших пакетов? Но, уверяю, этому были причины. Сериализация, использующая стандарт JAXB, включена в JVM — это упрощает работу с ней и не требует дополнительных модулей. На момент разработки интеграции у нас уже был опыт работы с JAXB, так что никакого overhead-а на ознакомление не требовалось. Ещё одна “плюшка” — то, что XML — текстовый формат, а значит, при возникновении нештатных ситуаций можно вмешиваться в его структуру и вносить необходимые коррективы. Но были и недостатки: известно, что структура формата XML заметно увеличивает объём данных. Однако, информация, которой мы планировали обмениваться, по первоначальным оценкам не превышала для справочной информации — 100 мб, для заявок — 1 мб на заявку. Это, согласитесь, не очень большие цифры. Кроме этого, обмен информацией должен был происходить единовременно, и требований к оперативности не предъявлялось.


Распределение функций между подсистемами.

Синхронизация справочников.


Процесс синхронизации справочников составлен на основе приведённых ранее прецедентов. Схема получается такая:



Передача зависимых сущностей.


Для начала, составим схему процесса публикации лота с оглядкой на диаграмму прецедентов.



Как видно процесс передачи сравнительно прост. Можно переложить схему процесса на EIP, получится уже известная по предыдущей части схема:



В простоте есть слабое место, обратите внимание на подробную схему публикации лотов в нотации BPMN.


Наблюдение за процессом обмена.


Разберём эту задачу на примере процесса получения и регистрации результатов, который остаётся ключевым и для интеграции, и для офиса продаж. Схема процесса продемонстрирована на рисунке:



Процесс начинается в основной системе отправкой запроса на получение предложений по лоту, эта часть не представлена на схеме потому, что подобный маршрут уже обсуждался в предыдущем разделе. Далее, когда запрос приходит в удалённый офис, начинается сбор и проверка поданных покупателями заявки по лоту. Обрабатываются только полностью заполненные и проверенные заявки на момент поступления запроса от основной системы. Заявки делятся на фрагменты. Разделение на фрагменты позволяет использовать меньше памяти и ускорять процесс обработки. Когда пакет с заявками попадет в основную систему, начинается его разбор и создание на его основе сущностей основной системы (импорт). Мы не стали здесь экспериментировать с параллельностью, чтобы не сталкиваться с проблемами консистентности БД в параллельных транзакциях. Все длительные операции передачи и импорта заявок разбиты на фазы, каждая фаза завершается отправкой уведомления и пользователь имеет возможность наблюдать за процессом импорта.
Схема передачи заявок в паттернах EIP.


Пример настойки роутов:

  • надо было контролировать порядок фрагментов;
  • реализовать собственный механизм восстановления файла из фрагментов;
  • отладить передачу.


Для полноты картины — вот роут, который мы используем:

Как видно на рисунке выше, файлы копируются в три этапа. На первом — файлы копируются из файлового хранилища в транспортную папку офиса продаж. Маршрутизацию в сервисной шине поясняет роут, он запускается только тогда, когда файлы становятся доступны в транспортной папке подсистемы удаленного офиса. Сервисная шина постоянно сканирует эту папку и, как только файл туда попадает, сразу же перемещает его в транспортную папку основной системы. Далее сервисная шина создаёт уведомление о перемещении файла и отправляет его в очередь office.files.import . В этом роуте мы используем механизм обработки исключений, он гарантирует, что если файл попал в транспортную папку офиса продаж, то основная система получит оповещение независимо от успеха или неудачи перемещения файла. Пора подводить итоги.


Итоги.

Об ошибках и способах их решения я планирую написать в следующей статье (своего рода “Продолжение следует…”). Поэтому до новых встреч.

Сценарии интеграции с Apache Camel

Сколько вы знаете шаблонов интеграции приложений(EIP)? Сколько из них вы можете использовать?

Симпатичный «верблюд» снова здесь, а значит, представляю вам продолжение серии статей об Apache Camel. В этой статье найдутся как самые необходимые, так и очень любопытные шаблоны интеграции. Расскажу о том, как они ложатся на нашу интеграцию.

Если вы знакомы с шаблонами, но решаете стоит ли связываться с «верблюдом», то наши примеры помогут разобраться. Если вам интересен путь от сценариев использования к реализации интеграции, то эта статья как раз об этом. Прошу под кат.

Напомню, мы построили свою сервисную шину на Apache Camel. Предыстория описывались в предыдущей части. Сейчас Camel для нас уже данность, с которой надо бороться. В нашем “зоопарке” прибавилось, кроме последнего, мы имеем две системы. Первая — это наша основная система, которая построена на классической трёхуровневой “клиент-серверной” архитектуре. Она представляет собой BPMS систему, сложность которой обусловлена многолетним процессом “допиливания” мелких “хотелок”. Вторая — небольшая и простая, как валенок, система той же архитектуры. Её будем называть офисом продаж. Разбирать варианты использования будем как раз на её примере.

Анкетная система с потребностью интеграции.

Наш офис продаж — приложение, которое не имеет сложной логики. Оно реализует процесс регистрации клиентов и ввод документов, необходимых для заказа. Эта система открыта для клиентов после регистрации, поэтому требования ко времени отклика и пропускной способности более высокого порядка, чем к основной системе. Ограничения безопасности, пропускной способности, независимой настройки и необходимость поддержания обособленного цикла разработки привели к выделению офиса продаж в самостоятельное приложение. Для обслуживания процесса продаж нам требовалась его интеграция в ИТ-инфраструктуру нашего заказчика.

Знакомство с интеграцией начнём с вариантов использования офиса продаж. Мы выделили в бизнес-процессе нашего приложения две основные роли: покупателя и администратора. Именно эти группы пользователей с ним работают.

Подробная схема прецедентов

На схеме разобраны основные варианты использования. Обратите внимание на правую часть схемы, несколько прецедентов выходят за рамки офиса продаж и требуют определённых действий со стороны основной системы. Именно эти прецеденты мы будем разбирать дальше.

Для формирования целостного представления о нашем офисе разберём схему. Наши покупатели подают заявки для участия в аукционах. Проведение аукционов — основной процесс офиса продаж. Для участия в них покупатели должны проходить регистрацию и оформлять заявки, заполняя большое количество скучных регистрационных форм. Администратор выполняет функции подготовки и проведения аукционов. Для этого по каждому из аукционов подготавливается специальный шаблон заявки — анкета, поэтому класс таких систем иногда называют анкетными. Процесс проведения аукциона завершается для офиса продаж передачей информации по поданным заявкам в основную систему для выбора победителя и оформления сделки.

Качественные требования к процессам интеграции:

  • Потери нормативно-справочной информации допустимы и не должны приводить к нарушению процесса функционирования подсистем;
  • Обращение во внешние системы не должно нарушать процесс проведения аукциона в случае отказа внешних систем и/или каналов связи;
  • Заявки должны гарантировано передаваться и доставляться между подсистемами;
  • Файлы должны гарантировано передаваться и доставляться между подсистемами;
  • Пользовательская нагрузка на проведение аукционов должна полностью ложиться на офис продаж, разгружая основную систему. Поэтому, взаимодействия между системами должны быть сведены к минимуму;
  • В распоряжении пользователя офиса продаж не должно быть механизмов воздействия на основную систему.

При разборе обозначенных прецедентов возникли следующие архитектурные задачи:

  1. Организация транспорта;
  2. Распределение функций между подсистемами;
  3. Синхронизация справочников;
  4. Передача зависимых сущностей;
  5. Наблюдение за процессом обмена;
  6. Передача файлов.

Разберём решения этих архитектурных задач на примере сценариев использования и соотнесём с шаблонами EIP.

Организация транспорта.

В состав транспорта входят обе описанных выше системы и Camel. О последнем уже много было сказано в предыдущей части статьи, поэтому пойдём дальше.

Все три системы связанны брокером ActiveMQ по протоколу AMQP.

Напомню, системы обмениваются пакетами, используя JMS. Полезной нагрузкой этих пакетов решили сделать XML и сериализовать в него объекты одной из систем, используя JAXB. Но какие объекты взять за основу, чтобы при этом минимизировать временные затраты на создание интеграции? Систем две — значит, и форматов может быть два. Решили остановится на объектах офиса продаж, эта система более легковесная, объекты области домена связаны с другими архитектурными слоями этого приложения только JPA аннотациями. Выделение на её основе транспортных объектов не составляло труда. Альтернативное решение (использовать объекты основной системы) представлялось практически невыполнимым из-за наличия большого количества метаданных и сложных связей с другими бизнес-сущностями, выходящими за границы интересующего нас процесса обслуживания офиса продаж. Ещё один оставшийся вариант — создание новых транспортных объектов. Его даже не рассматривали, так как он требовал реализацию процедуры и импорта, и экспорта в обеих системах, от которых в первом случае для подсистемы офиса продаж можно было отказаться.

Наверное, вас удивило, что мы выбрали XML в качестве полезной нагрузки наших пакетов? Но, уверяю, этому были причины. Сериализация, использующая стандарт JAXB, включена в JVM — это упрощает работу с ней и не требует дополнительных модулей. На момент разработки интеграции у нас уже был опыт работы с JAXB, так что никакого overhead-а на ознакомление не требовалось. Ещё одна “плюшка” — то, что XML — текстовый формат, а значит, при возникновении нештатных ситуаций можно вмешиваться в его структуру и вносить необходимые коррективы. Но были и недостатки: известно, что структура формата XML заметно увеличивает объём данных. Однако, информация, которой мы планировали обмениваться, по первоначальным оценкам не превышала для справочной информации — 100 мб, для заявок — 1 мб на заявку. Это, согласитесь, не очень большие цифры. Кроме этого, обмен информацией должен был происходить единовременно, и требований к оперативности не предъявлялось.

Распределение функций между подсистемами.

Посмотрим на это распределение с точки зрения архитектуры интеграции. Офис продаж отвечает за:

  • передачу нормативно-справочной информации;
  • передачу информации о лотах и аукционах;
  • передача данных заявок.
  • подготовку и отправку нормативно-справочной информации;
  • инициацию и отправку задания на проведение аукциона;
  • построение дальнейшего бизнес-процесса обработки заявок;

Идём дальше, теперь реальные примеры.

Синхронизация справочников.

Процесс синхронизации справочников составлен на основе приведённых ранее прецедентов. Схема получается такая:

На схеме используются несколько паттернов EIP:

К счастью, наши справочники не столь велики, и простота вышла на первое место. Но эту проблему не стоит недооценивать, в настройках брокера для каждого канала выделяется ограниченный объём оперативной памяти, превышение которого может привести к критической ошибке.

подробностиprivate static class CatalogItemAggregationStrategy implements AggregationStrategy
public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
String newBody = newExchange.getIn().getBody(String.class);

if( StringUtils.equals( oldCode, newCode) )
String oldBody = oldExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(
StringUtils.substringBeforeLast( oldBody, "n</catalogs>") +
StringUtils.substringAfter( newBody, "?>") + "n</catalogs>" );
return oldExchange;
>
>

StringBuilder builder = new StringBuilder( newBody );
builder.insert( builder.indexOf("n") + 1, "<catalogs>n");
builder.append( "</catalogs>n" );
newExchange.getIn().setBody( builder.toString());
return newExchange;
>
>

Передача зависимых сущностей.

Для начала, составим схему процесса публикации лота с оглядкой на диаграмму прецедентов.

Как видно процесс передачи сравнительно прост. Можно переложить схему процесса на EIP, получится уже известная по предыдущей части схема:

В простоте есть слабое место, обратите внимание на подробную схему публикации лотов в нотации BPMN.

Вот и получается, что выгрузить лот можно только после того, как будет создан аукцион. Процесс публикации аукциона изображён на первой схеме этого раздела серым цветом. Для решения проблемы связанных процессов предлагались два решения:

В итоге, использованное нами решение — передавать связанные объекты для сохранения полноты изменений -, стало хорошей альтернативой распределённым транзакциям. Идём дальше.

Наблюдение за процессом обмена.

Разберём эту задачу на примере процесса получения и регистрации результатов, который остаётся ключевым и для интеграции, и для офиса продаж. Схема процесса продемонстрирована на рисунке:

Процесс начинается в основной системе отправкой запроса на получение предложений по лоту, эта часть не представлена на схеме потому, что подобный маршрут уже обсуждался в предыдущем разделе. Далее, когда запрос приходит в удалённый офис, начинается сбор и проверка поданных покупателями заявки по лоту. Обрабатываются только полностью заполненные и проверенные заявки на момент поступления запроса от основной системы. Заявки делятся на фрагменты. Разделение на фрагменты позволяет использовать меньше памяти и ускорять процесс обработки. Когда пакет с заявками попадет в основную систему, начинается его разбор и создание на его основе сущностей основной системы (импорт). Мы не стали здесь экспериментировать с параллельностью, чтобы не сталкиваться с проблемами консистентности БД в параллельных транзакциях. Все длительные операции передачи и импорта заявок разбиты на фазы, каждая фаза завершается отправкой уведомления и пользователь имеет возможность наблюдать за процессом импорта.

Схема передачи заявок в паттернах EIP.

Пример настойки роутов:

from("jms:topic:system.audit")
.filter( PredicateBuilder.and(
header("callbackUUID").isNotNull(),
header("fcntp.audit").isNull() ) )
.setHeader("system.audit", simple( "true", Boolean.class ))
.inOnly("fcntpJms:topic:fcntp.audit?timeToLive=10000");

Новые использованные шаблоны EIP:

Передача файлов.

Передача файлов — задача не очень простая, особенно если решать её средствами JMS. Разберём этот кейс. В требованиях к этой задаче ничего формализованного не было, нужно было передавать данные неограниченного объёма. В Camel-е не нашлось готового решения для того, чтобы перегнать файл с одного сервера на другой. Реализация разбиения файлов на фиксированные куски с последующей склейкой встречала ряд сложностей:

  • надо было контролировать порядок фрагментов;
  • реализовать собственный механизм восстановления файла из фрагментов;
  • отладить передачу.

Мы решили пойти другим путём: в Camel копирование файлов между локальными папками реализовано просто и гибко, этот механизм нам подходил. Полная схема обработки файлов, следующая: файл кладётся одной системой в сетевую папку, Camel находит его и копирует в другую папку, а после копирования сообщает основной системе, что файл был передан. Вот схема:

Для полноты картины — вот роут, который мы используем:

Как видно на рисунке выше, файлы копируются в три этапа. На первом — файлы копируются из файлового хранилища в транспортную папку офиса продаж. Маршрутизацию в сервисной шине поясняет роут, он запускается только тогда, когда файлы становятся доступны в транспортной папке подсистемы удаленного офиса. Сервисная шина постоянно сканирует эту папку и, как только файл туда попадает, сразу же перемещает его в транспортную папку основной системы. Далее сервисная шина создаёт уведомление о перемещении файла и отправляет его в очередь office.files.import. В этом роуте мы используем механизм обработки исключений, он гарантирует, что если файл попал в транспортную папку офиса продаж, то основная система получит оповещение независимо от успеха или неудачи перемещения файла. Пора подводить итоги.

Об ошибках и способах их решения я планирую написать в следующей статье (своего рода “Продолжение следует…”). Поэтому до новых встреч.

У меня есть маршрут, который предполагает, что различные файлы будут скопированы во входящую папку . Маршрут переместит эти файлы во временную папку, где он будет делать другие вещи. Маршрут следующий:

Проблема в том, что эти файлы могут быть довольно большими. Допустим, 1 ГБ. Чтобы скопировать этот файл во входящую папку, может потребоваться, скажем, 10 секунд. В течение этих 10 секунд Потребитель опрашивает каталог, и выдается исключение, поскольку частичный файл все еще копируется. Какой обходной путь я могу использовать?

Я использовал readLock все стратегии (в основном измененные), но получаю исключение:

Измененный uri выглядит следующим образом:

Все еще не повезло

4 ответа

Проверьте параметры readLock в файловом компоненте

Параметр readLock = changed в этом случае кажется подходящим. Могут возникнуть проблемы, если у вас очень медленный производитель, записывающий файлы во входящую папку.

Другой вариант - использовать готовое имя файла. Вы можете заставить исходного производителя создать готовый файл после завершения записи файла.

чаще всего используется один готовый файл для каждого целевого файла. Это означает, что существует корреляция 1: 1. Для этого вы должны использовать динамические заполнители в опции doneFileName. В настоящее время Camel поддерживает следующие два динамических токена: file: name и file: name.noext, которые должны быть заключены в $ <>. Потребитель поддерживает только статическую часть имени файла done как префикс или суффикс (но не оба сразу).

from ("file: bar? doneFileName = $ .done");
В этом примере будут опрашиваться только файлы, если существует готовый файл с именем file. name.done.

Что-то вроде этого получится. На всякий случай, если его система NON-Camel копирует ваш большой файл в InputDir, вам нужно позаботиться о создании файла .DONE после того, как файл будет скопирован. Как только файл .DONE станет доступным, начнется обработка маршрута.

Возможно, это поздно, но используйте fileExist=Append в URI маршрута. Пример:

Читайте также: