Jan Bumbala
22.7.2011

Spring Integration + WebSphere MQ



Na projektu u zákazníka jsme řešili připojení k externímu systému na Websphere MQ. Bohužel na MQ nebylo z nějakého důvodu zpřístupněno JMS, takže bylo potřeba používat nativní API. Pokud by bylo možné použít JMS rozhraní, integrace do aplikace by byla triviální. Protože jsme používali Spring Integration, stačilo by nakonfigurovat springový JMS inbound channel adapter. Takto bylo potřeba vytvořit vlastní adapter a použít nativní MQ rozhraní.

Pro MQ existuje několik verzí java klienta, zdarma je ovšem pouze klient bez podpory transakcí, klient s podporou transakcí je součástí MQ instalace, a tak je potřeba licence k MQ. Knihovna pro zpřístupnění MQ pomocí JMS se nazývá „MQ classes for Java Message Service“, knihovna pro přístup k MQ pomocí nativního rozhraní, která nás nyní zajímá, se označuje jako “MQ classes for Java”. Ke knihovně existuje docela podrobná dokumentace pod názvem “WebSphere MQ, Using Java”, jen jsem měl trochu problém se k ní na stránkách IBM doklikat.

MQ Inbound Channel Adapter

Scénář pro naši implementaci byl jednoduchý: poslouchat na frontě a vybírat odtud všechny zprávy, tělo MQ zprávy se přečte jako String a pošle se do výstupního kanálu nakonfigurovaného ve Spring Integration pro další transformování a zpracování. Celou logiku pro přístup k MQ jsem zapouzdřil do nového inbound adapteru, který má za předka org.springframework.integration.endpoint.AbstractEndpoint a implementuje org.springframework.integration.core.MessageSource.

public class MQAdapter extends AbstractEndpoint implements MessageSource{ ... }

 Ve Spring konfiguraci lze potom adapter nadefinovat jednoduše takto:

<!-- namespace http://www.springframework.org/schema/integration -->
<int:inbound-channel-adapter ref="mqAdapter" channel="mqOutput">
  <int:poller fixed-rate="1000" max-messages-per-poll="10"/>
</int:inbound-channel-adapter>
<bean name="mqAdapter" class="example.MQAdapter"> ... </bean> 

Takto se každou sekundu otestuje fronta, zda nepřišla nová zpráva, v našem příkladu se zpracuje maximálně deset zpráv za sekundu. Co se týče implementace MQ adapteru, neobsahuje nic složitého. Nejprve se naváže spojení k MQ queue manageru které se udržuje po celou dobu životnosti adapteru. Podle nastavení polleru se pak periodicky zjišťuje, zda nepřišla nová zpráva. Spadlé spojení není potřeba nijak ošetřovat, adapter jednoduše naváže nové spojení při následujícím pokusu o čtení z fronty, není tedy nutný žádný daemon thread. Časově nejnáročnější bylo proniknout do nuancí MQ API, dále tedy následuje popis použití tohoto rozhraní.

WebSphere MQ API

Pro připojení k MQ je nejprve potřeba navázat spojení ke queue manageru který je reprezentovaný třídou com.ibm.mq.MQQueueManager. Pro tento účel je třeba znát a nastavit parametry připojení, jako je jméno queue managera, jméno fronty, IP adresa a port atd.. Toto lze udělat dvojím způsobem. Buď nastavit statická pole ve třídě com.ibm.mq.MQEnvironment (instance MQQueueManager si je načte při inicializaci), nebo parametry připojení naplnit do Hashtable a tu předat konstruktoru třídy MQQueueManager. Z nějakého důvodu mi druhý způsob nefungoval, takže jsem použil způsob první.

//nastaveni statickych poli 
MQEnvironment.hostname = hostname;
MQEnvironment.port = port;
MQEnvironment.channel = channel;
MQEnvironment.CCSID = ccsid;
MQEnvironment.userID = userId;

//konstruktor cte z MQEnvironment 
MQQueueManager qMgr = new MQQueueManager(queueManagerName);

Dalším krokem je vytvoření samotného spojení na MQ frontu, tj. vytvořit instanci třídy com.ibm.mq.MQQueue. Frontu je možné otevřít v několika modech (zejména: sdílené nebo exkluzivní čtení, zápis, prohlížení obsahu fronty) a lze nastavit další různé detaily jak se fronta bude chovat. Všechny tyto parametry se nastavují jako bitová maska a předávají se konstruktoru MQQueue.

int openOptions = MQC.MQOO_INQUIRE |= MQC.MQOO_INPUT_SHARED |= MQC.MQOO_BROWSE;
MQQueue queue = qMgr.accessQueue(this.queueName, openOptions);

 Jakmile máme vytvořenou instanci MQQueue, můžeme číst zprávy z fronty. Pro čtení je opět potřeba nadefinovat sadu parametrů, které ovlivňují jak bude čtení probíhat; tyto parametry jsou reprezentovány třídou com.ibm.mq.MQGetMessageOptions. Původně jsem se snažil nastavit parametry tak, aby volání queue.get() klienta blokovalo do té doby, než se na frontě objeví zpráva, tudíž bez zbytečného pollingu, nějak takto:

MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.matchOptions = MQC.MQMO_NONE;
gmo.options = MQC.MQGMO_WAIT;
gmo.waitInterval = MQC.MQWI_UNLIMITED;

Ovšem záhy se ukázalo, že potom není možné volání get() předčasně ukončit: interrupt na threadu nemá vliv, volat metodu close() na MQQueue také nelze, protože jak get() tak close() je synchronizováno na instanci MQQueue. Takže jsem se pollingu nevyhnul. Celý průbeh čtení z fronty vypadá následovně:

MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.matchOptions = MQC.MQMO_NONE;
//nefiltrovat, cist vsechny zpravy ve fronte 
gmo.options = MQC.MQGMO_NO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING; 
MQMessage messageBuffer = new MQMessage(); 
queue.get(messageBuffer, gmo); 

Při takovéto konfiguraci se klient pokusí přečíst zprávu z fronty a pokud je fronta prázdná, volání neblokuje, ale vyhodí výjimku. Toto je nepříjemné, protože MQ klient loguje defaultně přímo do System.err, pak se v logu hromadí hlášení že nebyla nalezena zpráva: MQJE001: Completion Code 2, Reason 2033 Po delším googlování jsem nalezl dvě řešení, jak se vyhnout zahlcovaní logu. První způsob je přesměrovat logování nastavením statického pole com.ibm.mq.MQException.log (kdo má rád radikální řešení, lze nastavit na null a potlačit logování úplně). Ale více mi vyhovoval druhý způsob, filtrovat error kódy, které nechceme logovat:

MQException.logExclude(MQException.MQRC_NO_MSG_AVAILABLE);

Posledním krokem v implementaci adapteru je čtení těla zprávy, což už je jednoduchá věc.

byte[] buffer = new byte[messageBuffer.getDataLength()];
//MQMessage messageBuffer 
messageBuffer.readFully(buffer, messageBuffer.getDataOffset(), messageBuffer.getDataLength());
String payload = new String(buffer, charset); 

Vaše emailová adresa nebude zveřejněna

Komentáře

Děkujeme za váš komentář
Další
  • Břéťa Kadlec

    Trocha cpp-like programování je určitě dobrá duševní hygiena. Proč nebylo zpřístupněno JMS?

  • Jan Bumbala

    JMS: jednalo se o system treti strany a hned na zacatku bylo receno ze se nejedna o JMS frontu. Cim konkretne bylo toto omezeni zpusobeno bohuzel nevim.

  • pawelrychlik

    I think you should write in English :)