Overview

In FluxFlow, jobs play a crucial role in asynchronously performing tasks within a workflow at specified times. Jobs enable the parallel execution of tasks, facilitate distributed processing, and offer flexibility in managing complex workflows.

A job represents an independent unit of work executed asynchronously within a workflow. It encapsulates specific tasks or workloads to be performed at designated times. Jobs can be scheduled and rescheduled, allowing for automation of periodic tasks and timely execution.

Interacting with other workflow components, jobs can exchange data, update the workflow state, generate notifications, and initiate further actions.

Jobs enhance workflow execution by providing asynchronous task execution, flexibility, and automation. This section will explore job configuration, scheduling, interaction with workflow components, and best practices for incorporating jobs into workflow designs using FluxFlow.

Job definition

A job’s definition is responsible for defining a job’s behavior. How a job can be declared, is lined out within this section.

Payload

A job’s behavior can be defined by implementing a new class that is annotated with @Job.

Example job definition

@Job
class SendReminderJob {
    fun doSendMail() {
        // Do work here
    }
}

The doSendMail method (in this example) holds the functionality that is going to be executed, once the scheduled time has come. It doesn’t matter how the method is named. As long as the following conditions hold true, FluxFlow will be able to discover and use it:

  1. the method is public
  2. the method is not abstract
  3. the method is the only public method within its type or is annotated
  4. with `@JobPayload`
  5. all declared parameters (if present), only refer to
    1. 1. an object that can be obtained from the IoC container
    2. 2. the `Workflow<*>` API object
    3. 3. the `Job` API object
  6. Requirements regarding a job's payload method

Parameters

Most of the time, we need a way to dynamically customize a job’s behavior. Sticking to the previous example, we might want to specify the mail receiver during job creation.

This functionality is provided by "job parameters". A job parameter is declared by creating a public property.

Job definition accepting parameters

@Job
class SendReminderJob(
    val receiverAddress: String // 
) {
    fun doSendMail() {
        // Do work here
    }
}
  • Note, that the property is public

When a job is scheduled, FluxFlow will inspect the assigned property values and persist them for later use. As soon as the job is about to be executed, the values will be restored from the database and passed into the job.

Contrary to step data, job parameters can not be updated after the owning job has been scheduled.

In order for FluxFlow to be able to detect and process job parameters, the following conditions have to be met.

  1. the property must be public

  2. the property value’s type must be serializable

During job activation, the parameter values are passed into the job’s constructor. Each constructor parameter is matched to the originating job parameter using its name. Therefore, it is utterly important to have the property and constructor parameter named identically.

FluxFlow currently only supports constructor initialization for job parameters. Setter-based injection is therefore unavailable.

Constructor injection

In order to access external functionality from within a job, constructor injection can be used to receive certain objects. During job construction FluxFlow tries to resolve a value for each declared constructor parameter. The priority and logic of said resolution is given by the table "Job constructor injection priorities".

Job constructor injection priorities

Priority

Source

Description

Prerequisites

0

Workflow model

The workflow model associated with the job’s workflow.

The workflow model’s type must be assignable to the parameter’s type.

1

Job parameters

The parameter’s as they have been set during the job’s scheduling.

  1. The job parameter’s type must be assignable to the constructor parameter’s type

  2. The constructor parameter name must match the job parameters name (which is obtained from the associated property)

See Parameters for more information.

2

IoC container

FluxFlow tries to obtain an instance of the requested type from the inversion of control container (e.g. a Spring Bean)

  1. There is an IoC container

  2. The IoC container is able to provide an instance that is assignable to the parameter’s type

A typical use case for this functionality is to inject services, which will do the heavy-lifting and can be shared among different kind of jobs. In our example this might be a service that provides the actual mail sending capabilities. Using this approach, we can avoid having to reimplement that functionality for each job wich might be sending a mail notification.

Example on how to inject external functionality using constructor injection

@Service // 
class MailService {
    fun sendMail(receiver: String) {
        // send actual mail
    }
}

@Job
class SendReminderJob(
    val receiverAddress: String, // 
    private val mailService: MailService // 
) {
    fun doSendMail() {
        mailService.sendMail(receiverAddress)
    }
}
  • The job declares a private primary constructor property of type MailService. As the resulting property is not public FluxFlow will not try to resolve the value using a job parameter.

  • Contrary to that, the receiverAddress will be obtained from the job’s parameters, as there is a public property with a matching name (due to the constructor parameter being declared as val without a private or protected modifier).

  • The dependency to be fetched from inversion of control container must have been registered. In this example we use Spring and registered the service with the @Service annotation.

Payload function injection

As already mentioned in "Requirements regarding a job's payload method ", a payload function can declare parameters. Similar to the Constructor injection, this can be useful to access external functionality or to obtain information regarding the current execution. Parameter resolution is done as outlined in the table

Payload function injection priorities

Priority

Source

Description

Prerequisites

0

Job API object.

The Job API object, representing the currently executing job.

The parameter’s type must be assignable from Job.

1

Workflow<TModel> API object

The Workflow<TModel> API object, representing the currently executing job’s workflow.

  1. The parameter’s type must be assignable from Worfklow.

2

IoC container

FluxFlow tries to obtain an instance of the requested type from the inversion of control container (e.g. a Spring Bean)

  1. There is an IoC container

  2. The IoC container is able to provide an instance that is assignable to the parameter’s type

The direct injection of a workflow’s model is currently unsupported due to technical limitations. Inject the Workflow<TModel> instead and access its model using the .model property.

It is recommended to use constructor injection for a job’s general prerequisites (e.g. dependencies vital to a job’s functionality that are unspecific to the workflow) and function injection for dependencies that are related to the currently executing workflow.

Sticking to the previous example, we can now also send the notification to other "workflow observers".

Example of using payload function injecting to access the current job and workflow

class VacationRequest(
    val otherObservers: List<String>
)

@Job
class SendReminderJob(
    val receiverAddress: String,
    private val mailService: MailService
) {
    fun doSendMail(
        job: Job,
        workflow: Workflow<VacationRequest>
    ) {
        System.out.println("Executing job: " + job.identifier)
        mailService.sendMail(receiverAddress)
        workflow.model.otherObservers.forEach{ observerAddress ->
            mailService.sendMail(receiver)
        }
    }
}

State changes

A job might change the workflow state and data of its owning workflow. All changes applied to it will be persisted after the job has been successfully run. If an exception occurrs, the changes will not be committed and instead be rolled back.

Example of a job modifying its workflow’s data

class VacationRequest(
    val otherObservers: List<String>,
    var notificationSent: Boolean = false
)

@Job
class SendReminderJob(
    val receiverAddress: String,
    private val mailService: MailService
) {
    fun doSendMail(
        job: Job,
        workflow: Workflow<VacationRequest> // 
    ) {
        // send actual mail
        workflow.model.notificationSent = true // 
    }
}
  • Inject the owning workflow using payload function injection

  • Accessing and modifying its data

Adding metadata to jobs

Like other FluxFlow objects, jobs can have metadata added to them and their definition. You can apply them by annotating the job definition type with an appropriate annotation. More details on how to create a metadata annotation can be found in "Definition of metadata".

@Metadata("jobType")
@Retention
annotation class JobType(
    val type: String
)

@JobType("housekeeping")
class ClearStaleWorkflowJob {
    fun clear() {
        // ...
    }
}

Scheduling

As far as FluxFlow is concerned, all jobs are scheduled for a fixed and absolute time. This has been an intentional design decision, which aims to reduce the scheduling complexity while allowing the developers to create scheduling logic based on the application’s domain logic.

Using a continuation

FluxFlow provides the JobContinuation which indicates to the workflow engine, that a new job should be scheduled. As with every kind of Continuation, there are multiple ways to request their execution. Those are described in detail within the next sections.

In order to construct a JobContinuation the Continuation.job(...) function should be utilized. This function expects two parameters, the first one being the time the job should be executed, while the second parameter specifies the actual job to be executed. The job passed into the function is usually an instance of a job definition as described in "Job definition".

@Job
class SendMailJob(
    private val receiverAddress: String
) {
    fun execute() {
        // do actual work
    }
}

Continuation.job( // 
    Instant.now().plus(Duration.ofMinutes(5)), // 
    SendMailJob("receiver@example.com") // 
)
  • construct the time the job should be scheduled for (based on domain logic)
  • construct an instance of the desired job definition
  • use the returned intent to tell FluxFlow to schedule the job for execution

If the scheduled time has already passed, the job will be up for immediate execution. There are no guarantees on how this immediate execution will be performed. Depending on the scheduler, the job might be executed synchronously within the current thread and context or asynchronously with a small technical delay.

Using a step action continuation

If an instance of JobContinuation is returned by a step action, it is automatically picked up and schedule by FluxFlow. There is no need to schedule it explicitly.

Scheduling a mail notification once a step action is executed

@Step
class CheckVacationRequest {
    @Action
    fun permit(): Continuation<*> {
       return Continuation.job(
            Instant.now().plus(Duration.ofMinutes(5)),
            SendMailJob("receiver@example.com")
        )
    }
}

Using a job’s payload function

Another common way is to return a JobContinuation directly from another job’s payload function. This way it is possible to mimic recurring scheduling behavior.

Assuming we want to notify a user every ten minutes, we could return a new JobContinuation every time it is run.

@Job
class SendReminderJob(
    private val mailService: MailService,
    val receiver: String
) {
    fun sendReminder(): Continuation<*> {
        mailService.sendReminder(receiver)
        return Continuation.job( // 
            Instant.now().plus(Duration.ofMinutes(10)), // 
            SendReminderJob(
                mailService,
                receiver
            )
        )
    }
}
  • Return a continuation that schedules a new job, which

  • should be executed in ten minutes.

When rescheduling a job, it is important to include an exit condition to avoid infinite loops or unexpected executions.

Using a service method

This way of scheduling a job would be the fallback solution. In most cases on of the other ways should be preferred.

Both the WorkflowService and JobService provide a function that can receive a JobContinuation. While the WorkflowService.start(...) function would be used if a new workflow should be started for the job, the JobService.schedule function can be used if a job should be scheduled for a preexisting workflow.

Cancellation

FluxFlow supports job cancellation based on a "cancellation key". Such cancellation key is a unique string identifier assigned to a workflow’s job. The idea behind this, is that there must never be more than one scheduled job for each workflow and cancellation key.

Note that jobs can only be canceled as long as they are within the Scheduled status. As soon as execution has begun, they can no longer be canceled.

Cancellation keys are workflow-scoped. Canceling any given cancellation key will not affect jobs having the same cancellation key, as long as they belong to other workflows.

Implicit cancellation by scheduling a new job

One way of canceling a job, is by replacing it with a new job. This can be archived very easily by specifying the same cancellation key twice during scheduling.

Canceling a job by replacing it

@Job
class SoundAlarmClockJob {
    fun timeToWakeUp() {
        // Sound alarm as loud as possible
    }
}

@Step
class SleepStep {
    @Action(statusBehavior = ImplicitStatusBehavior.Preserve)
    fun wakeMeUpIn10Minutes(): JobContinuation<SoundAlarmClockJob> {
        return Continuation.job(
            Instant.now().plus(10, ChronoUnit.MINUTES),
            SoundAlarmClockJob(),
            CancellationKey("alarm") // (1)
        )
    }

    @Action(statusBehavior = ImplicitStatusBehavior.Preserve)
    fun wakeMeUpIn1Hour(): JobContinuation<SoundAlarmClockJob> {
        return Continuation.job(
            Instant.now().plus(1, ChronoUnit.HOURS),
            SoundAlarmClockJob(),
            CancellationKey("alarm") // (1)
        )
    }
}
  1. Reusing the cancellation key "alarm" for both, causes jobs created by wakeMeUpIn10Minutes to be replaced by jobs created by wakeMeUpIn1Hour and vice versa.

Explicit job cancellation

If replacing the job with a new one is not an option, one might return Continuation.cancelJobs. This will cancel all jobs having the same cancellation key.

Cancel a job without starting a new one

@Job
class SoundAlarmClockJob {
    fun timeToWakeUp() {
        // Sound alarm as loud as possible
    }
}

@Step
class SleepStep {
    @Action(statusBehavior = ImplicitStatusBehavior.Preserve)
    fun wakeMeUpIn10Minutes(): JobContinuation<SoundAlarmClockJob> {
        return Continuation.job(
            Instant.now().plus(10, ChronoUnit.MINUTES),
            SoundAlarmClockJob(),
            CancellationKey("alarm")
        )
    }

    @Action(statusBehavior = ImplicitStatusBehavior.Preserve)
    fun neverGonnaWakeMeUp(): Continuation<*> {
        return Continuation.cancelJobs( // 
            CancellationKey("alarm")
        )
    }
}

This will cancel all previously scheduled jobs having "alarm" for a cancellation key.

Execution interception

Since jobs are almost always executed asynchronously and outside a Spring request context,
it may sometimes be necessary to perform custom setup and/or teardown actions,
such as populating context information or preparing a transition.

FluxFlow supports these scenarios by allowing developers to define custom JobExecutionInterceptors.
Once registered, FluxFlow invokes them right before a job is executed.

Defining a job execution interceptor

To create and register a job execution interceptor, declare a Spring Bean that implements the JobExecutionInterceptor interface:

import de.lise.fluxflow.api.interceptors.InterceptionToken
import de.lise.fluxflow.api.job.interceptors.JobExecutionContext
import de.lise.fluxflow.api.job.interceptors.JobExecutionInterceptor
import org.springframework.stereotype.Component

@Component
class MyMinimalInterceptor : JobExecutionInterceptor {
    override fun intercept(token: InterceptionToken<JobExecutionContext>) {
        // do something
    }
}

See the following sections for more detailed examples.

Creating a setup/teardown interceptor

A setup/teardown interceptor can perform initialization before continuing the interceptor chain with a call to token.next(). Teardown operations can be placed immediately after .next() returns.

import de.lise.fluxflow.api.interceptors.InterceptionToken
import de.lise.fluxflow.api.job.Job
import de.lise.fluxflow.api.job.interceptors.JobExecutionContext
import de.lise.fluxflow.api.job.interceptors.JobExecutionInterceptor
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component

@Component
class CustomJobExecutionSetupInterceptor : JobExecutionInterceptor {
    private val log = LoggerFactory.getLogger(CustomJobExecutionSetupInterceptor::class.java)!!

    override fun intercept(token: InterceptionToken<JobExecutionContext>) {
        setup(token.context.job)
        try {
            token.next()
        } finally {
            teardown()
        }
    }

    private fun setup(
        job: Job
    ) {
        log.debug("Setting up execution context for job '{}'", job.identifier)
        // custom setup actions
    }

    private fun teardown() { /** custom teardown actions **/ }
}

Note that a subsequent interceptor may cancel the job execution. If teardown actions depend on whether the job was actually executed, inspect the status returned by token.next() or the token.status property:

    override fun intercept(token: InterceptionToken<JobExecutionContext>) {
        setup(token.context.job)
        when(token.next()) {
            InterceptionTokenStatus.Executed -> teardown()
            else -> {}
        }
    }

Creating a "filtering" interceptor

Although it is generally recommended to prefer domain logic or workflow-native features, interceptors can also be used to abort job executions. This is done by calling token.abort() within the intercept method:

@Component
class JobFilteringInterceptor: JobExecutionInterceptor {
    override fun intercept(token: InterceptionToken<JobExecutionContext>) {
        if (isExecutionStillNecessary(token.context.job)) {
            token.abort()
        }
    }

    private fun isExecutionStillNecessary(job: Job): Boolean {
        // custom logic
        return true
    }
}

Important

Aborting a job execution causes FluxFlow to skip the entire job, including all side effects. This means no status updates, no continuations, and no events will be emitted.

After cancellation, the job remains in the Scheduled state, appearing as if the scheduler failed to run it. Therefore, cancelling interceptors must perform the required side effects themselves.

@Component
class JobFilteringInterceptor(
    private val jobService: JobService
) : JobExecutionInterceptor {
    override fun intercept(token: InterceptionToken<JobExecutionContext>) {
        if (isExecutionStillNecessary(token.context.job)) {
            token.abort()
            jobService.deleteAll(
                setOf(
                    token.context.job.identifier
                )
            )
        }
    }

    private fun isExecutionStillNecessary(job: Job): Boolean {
        // custom logic
        return true
    }
}