Designing a Generic DB Scheduler in Spring Boot: A Comprehensive Guide

·

7 min read

Hi folks !!

Recently during my internship, I was assigned a task to write a Generic DB Scheduler. The use case was to schedule various kinds of jobs mainly OneTimeTask in a generic way so that we don't have to explicitly create a bean of all the tasks and also while scheduling we do not have to Autowire each task.

So, I decided to use Kagkarlsson's Scheduler by adding this dependency to my project. This is a very lightweight open-source project that can easily handle 2k-10k executions/second which satisfied my use case.

<dependency>
    <groupId>com.github.kagkarlsson</groupId>
    <artifactId>db-scheduler</artifactId>
    <version>12.4.0</version>
</dependency>

Also, since I am using Postgres as my database I created this table

create table scheduled_tasks (
  task_name text not null,
  task_instance text not null,
  task_data bytea,
  execution_time timestamp with time zone not null,
  picked BOOLEAN not null,
  picked_by text,
  last_success timestamp with time zone,
  last_failure timestamp with time zone,
  consecutive_failures INT,
  last_heartbeat timestamp with time zone,
  version BIGINT not null,
  PRIMARY KEY (task_name, task_instance)
);

CREATE INDEX execution_time_idx ON scheduled_tasks (execution_time);
CREATE INDEX last_heartbeat_idx ON scheduled_tasks (last_heartbeat);

Designing the Scheduler

I decided to design the scheduler in this way :

Setting Up the Utilities

  • TaskSchedulerType: I needed to reference all the tasks with their unique key so that I could get their respective bean while scheduling from the ApplicationContext and what better way to group a set of named values than an Enum? So, this enum contains all the names and unique keys for the tasks with a parameter taskSchedulerName.
package com.db.scheduler.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 * @author Raj Aryan Mishra, created on 23/11/2023
 */

@Getter
@AllArgsConstructor
public enum TaskSchedulerType {
    MULTI_OFFICE_SMS_ESCALATION_TASK_SCHEDULER("multi-office-sms-escalation-task"),
    SMS_ESCALATION_TASK_SCHEDULER("sms-escalation-task");
    // Add more such tasks
    private final String taskSchedulerName;
}
  • TaskSchedulerJob class: All the tasks that I would create will have some common functionality. So I used the factory pattern to create the TaskFactory. Now, for the common functionality, I created an abstract class that implements a Runnable interface (as would all of my tasks) and also it has a method getTaskSchedulerType() that would return the enum value of the task.

      package com.db.scheduler.jobs;
    
      import com.db.scheduler.enums.TaskSchedulerType;
    
      /**
       * @author Raj Aryan Mishra, created on 23/11/2023
       */
      public abstract class TaskSchedulerJob implements Runnable {
          public abstract TaskSchedulerType getTaskSchedulerType();
    
          // can also add centralized logging and error handling
      }
    
  • TASK_SCHEDULER_MAP: After creating the enum, I needed to map the class name of each task with the unique key so I could easily access both the class of the job and the bean of the task. So, I created an immutable map of the type <String, Class<? extends TaskSchedulerJob>>

package com.db.scheduler.constant;

import com.google.common.collect.ImmutableMap;
import com.db.scheduler.enums.TaskSchedulerType;
import com.db.scheduler.jobs.MultiOfficeSmsEscalationJob;
import ccom.db.scheduler.jobs.SmsEscalationJob;
import com.db.scheduler.jobs.TaskSchedulerJob;

import java.util.Map;

/**
 * @author Raj Aryan Mishra, created on 22/11/2023
 */
public class SchedulerConstants {
    private SchedulerConstants() {
    }

    public static final Map<String, Class<? extends TaskSchedulerJob>> TASK_SCHEDULER_MAP = ImmutableMap.<String, 
            Class<? extends TaskSchedulerJob>>builder()
            .put(TaskSchedulerType.SMS_ESCALATION_TASK_SCHEDULER.getTaskSchedulerName(), SmsEscalationJob.class)
            .put(TaskSchedulerType.MULTI_OFFICE_SMS_ESCALATION_TASK_SCHEDULER.getTaskSchedulerName(), MultiOfficeSmsEscalationJob.class)
            .build();
            // add more entries to the map
}

Creating the TaskFactory

Until now, I had set up all of the utilities required. Now, I had to create the task factory. Writing this class, tested my ability to understand and implement generics in Java. Also, I couldn't simply annotate the createOneTimeTask(String taskName, Class<T> taskClass) function @Bean since it is parameterized.

What this function actually does is it takes the unique key defined in the enum TaskSchedulerType along with the class of the task and creates an instance of OneTimeTask class and then creates a bean of that task using the unique key from the enum and the task class from the map.

For creating the bean I used ConfigurableListableBeanFactory as it provides the singleton implementation of the bean again fulfilling my use case.

The generic type parameter declaration <T extends TaskSchedulerJob> in the function signature ensured that all of the tasks created would extend this class only.

I also added a retry mechanism. getDbSchedulerRetryConfig() returns a configuration for a failure handler that allows up to 3 retries of a failed task, and between each retry, it applies an exponential backoff delay starting with a 5-minute delay and doubling the delay time after each retry.

package com.db.scheduler.config;

import com.github.kagkarlsson.scheduler.task.FailureHandler;
import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask;
import com.github.kagkarlsson.scheduler.task.helper.Tasks;
import com.db.scheduler.jobs.TaskSchedulerJob;
import io.micrometer.core.instrument.Metrics;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import java.time.Duration;

/**
 * @author Raj Aryan Mishra, created on 22/11/2023
 */

@Slf4j
@Component
public class TaskFactory {

    private final ApplicationContext applicationContext;

    public TaskFactory(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }


    public <T extends TaskSchedulerJob> OneTimeTask<T> createOneTimeTask(String taskName, Class<T> taskClass) {
        OneTimeTask<T> task = Tasks.oneTime(taskName, taskClass)
                .onFailure(getDbSchedulerRetryConfig())
                .execute((inst, ctx) -> {
                    try {
                        log.info("Creating one-time task with class name: {} {}", taskName, taskClass.getName());
                        inst.getData().run();
                    } catch (Exception e) {
                        Metrics.counter(taskName, "failure", inst.getId()).increment();
                        log.error("Exception occurred while executing one-time task with class name: {} {}", taskName, taskClass.getName(), e);
                        throw new IllegalStateException(e);
                    }
                });
        ConfigurableListableBeanFactory beanFactory = (ConfigurableListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
        beanFactory.registerSingleton(taskName, task);
        return task;
    }

    private <T> FailureHandler.MaxRetriesFailureHandler<T> getDbSchedulerRetryConfig() {
        return new FailureHandler.MaxRetriesFailureHandler<>(3,
                new FailureHandler.ExponentialBackoffFailureHandler<>(Duration.ofMinutes(5), 2));
    }
}

Creating DBSchdulerConfig

The class DBSchedulerConfig contains all the configurations related to the scheduler. Here, in this class, everything written above comes together.

Using constructor-based initialization I have injected the bean of DataSource and TaskFactory. Since the beans of all the tasks are being created at run time, it's crucial that all the beans inside the TaskFactory class are created before the beans in DBSchedulerConfig class. This is achieved by injecting a bean of TaskFactory class in the constructor here. The rest is pretty simple.

The method getAllTasks() iterates over the map of all tasks and creates the tasks by calling the TaskFactory . The createScheduler() is annotated with @Bean and is responsible for creating and configuring the scheduler. It uses the injected DataSource and TaskFactory to set up the scheduler. It specifies a GsonSerializer for serialization of tasks. It configures the executor service for the scheduler using the getExecutorService method. Finally, it starts the scheduler and returns it.

The getExecutorService method creates and configures an ExecutorService, specifically a ThreadPoolExecutor, used for asynchronous task execution. It attempts to create the executor with specified parameters, including the core pool size, maximum pool size, keep-alive time, and a task queue.

package com.db.scheduler.configuration;

import com.github.kagkarlsson.scheduler.Scheduler;
import com.github.kagkarlsson.scheduler.serializer.GsonSerializer;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask;
import com.db.scheduler.constant.SchedulerConstants;
import com.db.scheduler.jobs.TaskSchedulerJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.annotation.Transactional;

import javax.sql.DataSource;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author Raj Aryan Mishra, created on 22/11/2023
 */

@Slf4j
@Configuration
public class DBSchedulerConfig {
    private final DataSource dataSource;
    private final TaskFactory taskFactory;


    @Value("${db.scheduler.core.pool.size:10}")
    private int corePoolSize;

    @Value("${db.scheduler.executor.max.thread.pool.size:20}")
    private int maxPoolSize;

    @Autowired
    public DBSchedulerConfig(DataSource dataSource, TaskFactory taskFactory) {
        this.dataSource = dataSource;
        this.taskFactory = taskFactory;
    }

    @Bean
    @Transactional
    public Scheduler createScheduler() {
        Scheduler scheduler = Scheduler.create(dataSource, getAllTasks())
                .registerShutdownHook()
                .serializer(new GsonSerializer())
                .executorService(getExecutorService())
                .build();

        scheduler.start();
        return scheduler;
    }

    private List<Task<?>> getAllTasks() {
        return SchedulerConstants.TASK_SCHEDULER_MAP.entrySet().stream()
                .map(this::getOneTimeTask)
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
    }

    public OneTimeTask<? extends TaskSchedulerJob> getOneTimeTask(Map.Entry<String, Class<? extends TaskSchedulerJob>> entry) {
        try {
            return taskFactory.createOneTimeTask(entry.getKey(), entry.getValue());
        } catch (Exception e) {
            log.error("Unable to instantiate task '{}' of class '{}': {}", entry.getKey(), entry.getValue().getName(), e.getMessage(), e);
            return null;
        }
    }


    public ExecutorService getExecutorService() {
        try {
            return new ThreadPoolExecutor(corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>());
        } catch (Exception e) {
            log.error("Unable to build db scheduler executor service due to missing properties : ", e);
            return new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>());
        }
    }
}

Creating SchedulerService

I created this service so that scheduling any task is also generic. Just inject the dependency of this class into any service and simply schedule it in one line.

schedulerService.scheduleOneTimeJob(TaskSchedulerType.MULTI_OFFICE_SMS_ESCALATION_TASK_SCHEDULER, ChronoUnit.SECONDS, 5);

The scheduleOneTimeJob method, which is parameterized with a task type, a time unit, and a time delay, attempts to schedule a one-time job for the given task type. The bean of the task is extracted from ApplicationContext using the key in the TaskSchedulerType enum. Then it creates the instance of the task class and schedules the task after a specified delay.

package com.db.scheduler.service;

import com.github.kagkarlsson.scheduler.Scheduler;
import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask;
import com.db.scheduler.constant.SchedulerConstants;
import com.db.scheduler.enums.TaskSchedulerType;
import com.db.scheduler.jobs.TaskSchedulerJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.UUID;

/**
 * @author Raj Aryan Mishra, created on 24/11/2023
 */

@Service
@Slf4j
public class SchedulerService {

    private final ApplicationContext applicationContext;
    private final Scheduler scheduler;

    public SchedulerService(Scheduler scheduler, ApplicationContext applicationContext) {
        this.scheduler = scheduler;
        this.applicationContext = applicationContext;
    }

    @SuppressWarnings("unchecked")
    public <T extends TaskSchedulerJob> void scheduleOneTimeJob(TaskSchedulerType taskSchedulerType, ChronoUnit timeUnit, long time) {
        try {
            OneTimeTask<T> task = applicationContext.getBean(taskSchedulerType.getTaskSchedulerName(), OneTimeTask.class);
            Class<T> taskClass = (Class<T>) SchedulerConstants.TASK_SCHEDULER_MAP.get(taskSchedulerType.getTaskSchedulerName());
            if (taskClass != null) {
                T instance = taskClass.getDeclaredConstructor().newInstance();
                scheduler.schedule(task.instance(UUID.randomUUID().toString(), instance), Instant.now().plus(time, timeUnit));
            } else {
                log.error("Task Class not found in TASK_SCHEDULER_MAP for key {}", taskSchedulerType.getTaskSchedulerName());
            }
        } catch (Exception e) {
            log.error("Exception occurred while scheduling job: {}", taskSchedulerType.getTaskSchedulerName(), e);
        }
    }
    // schedule other different type of tasks
}

This concludes this article. If there are any improvements please suggest them in the comments.

Happy learning !!