Jan Bumbala
27.3.2012

Quartz Scheduler a dynamické plánování úloh



Na zákaznickém projektu bylo potřeba umožnit asynchronní spouštění různých handlerů, registrovaných jako Spring bean. Typicky jde o případ, kdy uživatel iniciuje vykonání nějaké činnosti, která může trvat desítky sekund nebo dokonce minut. Zároveň jsme potřebovali, aby řešení bylo persistentní, a tedy aby se naplánované úlohy neztrácely při pádu serveru. Protože jsme již na projektu využívali Quartz Scheduler pro spouštění pravidelných úloh, použili jsme ho i pro účel tohoto asynchronního zpracování. 

Scénář použití Quartz Scheduleru, se kterým jsem se nejčastěji setkal, je takový, že v systému jsou předem nakonfigurované joby, které chceme spouštět, a více či méně složité triggery. Každý job je jednoúčelový a spouští předem definovanou business logiku. Pokud chceme umožnit vyvolávat nějaký nový handler, je potřeba přidat a nakonfigurovat nový Quartz job.

Zde popisované řešení je trochu jiné v tom, že je definované obecné API pro naplánování asynchronního zpracování. Motivací bylo umožnit asynchronní procesing handleru bez nutnosti statické konfigurace Quartz jobu pro každý takový handler. Pro každou naplánovanou úlohu je použit společný Quartz job, který v okamžiku vyvolání vyhledá požadovaný handler ve Spring contextu a spustí jej. Přidání nového handleru, nového typu úlohy kterou chceme asynchronně spouštet, pak nevyžaduje žádné další konfigurační změny Quartz scheduleru, pouze konfiguraci nového beanu ve Spring kontextu. Pro vyvolání metody na Spring beanu již existuje MethodInvokingJobDetailFactoryBean, jenže tento způsob neumožňuje předat metodě parametry. Navíc i zde je potřeba mít pro každý bean nakonfigurovaný vlastní trigger.
V našem scénáři nechceme pro každý springový bean definovat nový trigger, pouze definujeme a nakonfigurujeme univerzální scheduler vrstvu. Popsané řešení je sice závislé na Springu, nicméně klientský kód je oddělený interfacem, takže není až takový problém vytvořit implementaci, která Spring používat nebude.

Implementace

Celý mechanismus je skrytý za API, které tvoří 2 třídy:
SchedulerDAO – určeno pro přístup z business logiky, pomocí tohoto interfacu se naplánuje asynchronní zpracování
Task – rozhraní, které musí naimplementovat handler, který chceme asynchronně spouštět

public interface SchedulerDAO { 
   /** 
    * Schedule task for asynchronous processing. 
    * @param taskClass task that is about to be executed asynchronously 
    * @param properties data to be passed to the task 
    */ 
    <T extends Task> void schedule(Class<T> taskClass, Properties properties); 
}

 

public interface Task { 
  /** 
   * run scheduled action 
   * @param properties data the task was scheduled with 
   */ 
   void run(Properties properties); 
}

 

Jednoduchá implementace SchedulerDAO může vypadat takto:

public class SimpleSchedulerDAO implements SchedulerDAO {
  //...instance variables omitted.. 
  public SimpleSchedulerDAO(Scheduler scheduler, String jobName, String jobGroup, String classNameProperty) {
    this.scheduler = scheduler;
    this.jobName = jobName;
    this.jobGroup = jobGroup;
    this.classNameProperty = classNameProperty;
  } 
  public <T extends Task> void schedule(Class<T> taskClass, Properties properties) {
    JobDataMap jobDataMap = new JobDataMap(properties); 
    //todo validation that classNameProperty not present in passed properties jobDataMap.put(classNameProperty, taskClass.getName()); 
    try { scheduler.triggerJob(jobName, jobGroup, jobDataMap); } 
    catch (SchedulerException e) { throw new RuntimeException(e); } 
  } 
}

Navržená implementace naplánuje jeden univerzální job, společný pro všechny Task implementace. Pokaždé je vytvořen nový trigger, který je okamžitě naplánován ke spuštění. Složitější implementace může např. získávat instanci triggeru pomocí trigger factory, kde bude vyřešena různá konfigurace triggeru pro různé typy handlerů. Důležité je, že do persistentních dat daného triggeru (jobDataMap) se uloží název třídy/interface, který chceme asynchronně spustit. Tj. název třídy, která byla jako parametr předána metodě schedule(..). V okamžiku, kdy Quartz vyvolá takto vytvořený trigger, se podle uloženého názvu třídy vyhledá Springový bean, který chceme zavolat (viz dále). Třída jobu je konfigurovatelná, stejně jako název skupiny a klíč, pod kterým je uložený persistovaný název třídy. Ukázková konfigurace Spring contextu:

<bean id="simpleSchedulerDAO" class="com.aspectworks.example.scheduler.impl.SimpleSchedulerDAO">
  <constructor-arg index="0" ref="scheduler"/>
  <constructor-arg index="1" value="taskExecutorJob"/>
  <constructor-arg index="2" value="taskGroup"/>
  <constructor-arg index="3" value="@className"/>
</bean>
<bean name="taskExecutorJob" class="org.springframework.scheduling.quartz.JobDetailBean">
  <property name="jobClass" value="com.aspectworks.example.scheduler.impl.TaskExecutorJob"/>
  <property name="group" value="taskGroup"/>
</bean> 
<bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean" destroy-method="destroy">
  <property name="jobFactory">
    <bean class="org.springframework.scheduling.quartz.SpringBeanJobFactory"/>
  </property>
  <property name="applicationContextSchedulerContextKey" value="applicationContext"/>
  <property name="schedulerContextAsMap">
    <map>
      <entry key="classNameProperty" value="@className"/>
    </map>
  </property>
  <property name="jobDetails">
    <list>
      <ref bean="taskExecutorJob"/>
    </list>
  </property>
</bean>

 

V konfiguraci je vidět název třídy společného Quartz jobu – TaskExecutorJob, který je spouštěn všemi triggery. Job se chová tak, že podle názvu interface uloženého v persistentních datech triggeru nalezne Springový bean, a na něm spustí metodu run(). Za zmínku stojí ještě čtvrtý parametr konstruktoru SimpleSchedulerDAO, jedná se o název klíče, pod kterým bude v persistentních datech triggeru uložen název třídy Spring beanu, zde tedy hodnota "@className". V implementaci SimpleSchedulerDAO je vhodné dodělat kontrolu, že předané properties neobsahují klíč, který by kolidoval s hodnotou classNameProperty, a ještě lépe tuto hodnotu pevně stanovit a zdokumentovat v API. Na závěr tedy ještě implementace samotného jobu, zde se vyhledá a spustí naplánovaný handler.

public class TaskExecutorJob implements Job{
  private String classNameProperty;
  private ApplicationContext applicationContext;
  public void execute(JobExecutionContext context) throws JobExecutionException { 
    //read class from persisted data Class 
    taskClass = readClass(context, classNameProperty);
    //lookup bean instance 
    Task task = lookupJobBean(taskClass);
    if (task != null) {
      Properties properties = new Properties();
      properties.putAll(context.getTrigger().getJobDataMap());
      properties.remove(classNameProperty);
      //finally execute the target task 
      task.run(properties); 
    }
  }
   
  /** 
   * read persisted Task class name from trigger's jobDataMap 
   */ 
   protected Class readClass(JobExecutionContext context, String beanClassKey) {
    String beanClassName = context.getTrigger().getJobDataMap().getString(beanClassKey);
    try {
      if (beanClassName != null) {
        return (Class) Class.forName(beanClassName); 
      }
    } catch (ClassNotFoundException e) { 
      //log error
    } 
    return null; 
  }
   
  /** 
   * lookup Task implementation from Spring context 
   */ 
   protected T lookupJobBean(Class clazz){
    if (clazz != null){
      try {
        return applicationContext.getBean(clazz);
      } catch (BeansException e) {
         //log error 
      } 
    }
    return null; 
  } 
}

 

Možné nevýhody

Existuje zde ovšem také jeden nedostatek. Klientský kód je sice oddělený dvěma rozhraními (Task a SchedulerDAO), nicméně popsané řešení je orientované na Spring framework a toto má jeden dopad: je potřeba, aby handler, který chceme asynchronně spouštět, definoval nové rozhraní rozšiřující interface Task. Například takto:

public interface TestingTask extends Task{ }
public class TestingTaskImpl implements TestingTask {
  public void run(Properties properties) {
    //do something..
  }
}

 

Nelze definovat přímo implementaci interface Task a tuto se snažit naplánovat přes SchedulerDAO. Jde o to, jak funguje Springový lookup BeanFactory.getBean(Class requiredType): pokud Spring pro AOP používá jdk proxy, nebude možné provést lookup Spring beanu podle jeho implementace, pouze podle interface. Dále je nutné, aby v kontextu byla registrovaná od každého tohoto rozhraní pouze jedna implementace. Popsané řešení by bylo možné změnit např. tak, že scheduling nebude probíhat na základě rozhraní handleru, ale pomocí jména beanu. Toto by ovšem do kódu zaneslo mnohem větší závislosti na Spring frameworku. Budu rád, pokud se podělíte se svými názory, či jste třeba řesili něco podobného, nebo zda v tom vidíte nějaké problémy. Ukázkový funkční příklad si můžete stáhnout zde: quartz-example.zip

Vaše emailová adresa nebude zveřejněna

Komentáře

Děkujeme za váš komentář
Další
  • Tomas

    Zdravim, jen takovy pitomy dotaz - tohle mi zrovna neprijde jako typicka uloha pro scheduled job (planovanui tasku dynamicky). Nebylo by vhodnejsi pouzit JMS popripade nejakou formu queues (Amazon SQS nebo neco podobneho) ?

    1. Jan Bumbala

      Ano máte pravdu, JMS se zde nabízí. Důvod byl hlavně ten, že jsme již v aplikaci Quartz používali, a rozhodli jsme se, že nebudeme zanášet další technologii.

  • Roman

    Zdravim, aku verziu Springu a Quartzu pouzivate? Prave dnes som riesil spustanie jobov v Springu 3.1.1 a musim pouzivat Quartz 1.8.6 (posledny 1.x). 2.x sa zda byt stale nepodporovane. Motivaciou je pekne nove flow API, ale inicializacia Springu bohuzial lahne:-/

    1. Jan Bumbala

      Bohuzel jsme narazili na ten samy problem, konkretne jsme pracovali se Spring 3.0.6. a k tomu pouzili verzi Quartz 1.8.5. Ve Springu na tento problem maji par ticketu: napr. https://jira.springsource.org/browse/SPR-8359 a https://jira.springsource.org/browse/SPR-8275, tvari se, ze jsou vyresene, tak treba tam najdete neco uzitecneho..