Sagas
When building distributed systems, it is crucial to ensure that the system remains consistent even in the presence of failures. One way to achieve this is by using the Saga pattern.
Sagas are a way to manage transactions that span multiple services. They allow you to run compensations when your code crashes halfway through. This way, you can ensure that your system remains consistent even in the presence of failures.
Implementing Sagas in Restate
Let’s assume we want to build a travel booking application. The core of our application is a workflow that first tries to book the flight, then rents a car, and finally processes the customer’s payment before confirming the flight and car rental. When the payment fails, we want to undo the flight booking and car rental.
Restate lets us implement this purely in user code:
- Wrap your business logic in a try-block, and throw a terminal error for cases where you want to compensate and finish.
- For each step you do in your try-block, add a compensation to a list.
- In the catch block, in case of a terminal error, you run the compensations in reverse order, and rethrow the error.
Restate guarantees us that all code will execute. So if a terminal error is thrown, all compensations will run:
- TypeScript
- Java
- Kotlin
- Python
- Go
const bookingWorkflow = restate.service({name: "BookingWorkflow",handlers: {run: async (ctx: restate.Context, req: BookingRequest) => {// create a list of undo actionsconst compensations = [];try {// For each action, we register a compensation that will be executed on failurescompensations.push(() =>ctx.run("Cancel flight", () => flightClient.cancel(req.customerId)));await ctx.run("Book flight", () =>flightClient.book(req.customerId, req.flight));compensations.push(() =>ctx.run("Cancel car", () => carRentalClient.cancel(req.customerId)));await ctx.run("Book car", () =>carRentalClient.book(req.customerId, req.car));compensations.push(() =>ctx.run("Cancel hotel", () => hotelClient.cancel(req.customerId)));await ctx.run("Book hotel", () =>hotelClient.book(req.customerId, req.hotel));} catch (e) {// Terminal errors are not retried by Restate, so undo previous actions and fail the workflowif (e instanceof restate.TerminalError) {// Restate guarantees that all compensations are executedfor (const compensation of compensations.reverse()) {await compensation();}}throw e;}},},});restate.endpoint().bind(bookingWorkflow).listen(9080);
@Servicepublic class BookingWorkflow {public record BookingRequest(String customerId, FlightRequest flight, CarRequest car, HotelRequest hotel) {}@Handlerpublic void run(Context ctx, BookingRequest req) throws TerminalException {// Create a list of undo actionsList<Runnable> compensations = new ArrayList<>();try {// For each action, we register a compensation that will be executed on failurescompensations.add(() -> ctx.run("Cancel flight", () -> FlightClient.cancel(req.customerId)));ctx.run("Book flight", () -> FlightClient.book(req.customerId, req.flight()));compensations.add(() -> ctx.run("Cancel car", () -> CarRentalClient.cancel(req.customerId)));ctx.run("Book car", () -> CarRentalClient.book(req.customerId, req.car()));compensations.add(() -> ctx.run("Cancel hotel", () -> HotelClient.cancel(req.customerId)));ctx.run("Book hotel", () -> HotelClient.book(req.customerId, req.hotel()));}// Terminal exceptions are not retried by Restate. We undo previous actions and fail the workflow.catch (TerminalException e) {// Restate guarantees that all compensations are executedfor (Runnable compensation : compensations) {compensation.run();}throw e;}}public static void main(String[] args) {RestateHttpServer.listen(Endpoint.bind(new BookingWorkflow()));}}
@Serviceclass BookingWorkflow {@Handlersuspend fun run(ctx: Context, req: BookingRequest) {// Create a list of undo actionsval compensations = mutableListOf<suspend () -> Unit>()try {// For each action, we register a compensation that will be executed on failurescompensations.add { ctx.runBlock("Cancel flight") { cancelFlight(req.customerId) } }ctx.runBlock("Book flight") { bookFlight(req.customerId, req.flight) }compensations.add { ctx.runBlock("Cancel car") { cancelCar(req.customerId) } }ctx.runBlock("Book car") { bookCar(req.customerId, req.car) }compensations.add { ctx.runBlock("Cancel hotel") { cancelHotel(req.customerId) } }ctx.runBlock("Book hotel") { bookHotel(req.customerId, req.hotel) }}// Terminal exceptions are not retried by Restate. We undo previous actions and fail the workflow.catch (e: TerminalException) {// Restate guarantees that all compensations are executedcompensations.reversed().forEach { it() }throw e}}}fun main() {RestateHttpServer.listen(endpoint { bind(BookingWorkflow()) })}
booking_workflow = restate.Service("BookingWorkflow")@booking_workflow.handler()async def run(ctx: restate.Context, req: BookingRequest):# Create a list of undo actionscompensations = []try:# For each action, we register a compensation that will be executed on failurescompensations.append(lambda: ctx.run("Cancel flight", flight_client.cancel, args=(req.customer_id,)))await ctx.run("Book flight", flight_client.book, args=(req.customer_id, req.flight))compensations.append(lambda: ctx.run("Cancel car", car_rental_client.cancel, args=(req.customer_id,)))await ctx.run("Book car", car_rental_client.book, args=(req.customer_id, req.car))compensations.append(lambda: ctx.run("Cancel hotel", hotel_client.cancel, args=(req.customer_id,)))await ctx.run("Book hotel", hotel_client.book, args=(req.customer_id, req.hotel))# Terminal errors are not retried by Restate, so undo previous actions and fail the workflowexcept TerminalError as e:# Restate guarantees that all compensations are executedfor compensation in reversed(compensations):await compensation()raise eapp = restate.app([booking_workflow])if __name__ == "__main__":import hypercornimport asyncioconf = hypercorn.Config()conf.bind = ["0.0.0.0:9080"]asyncio.run(hypercorn.asyncio.serve(app, conf))
type BookingWorkflow struct{}func (BookingWorkflow) Run(ctx restate.Context, req BookingRequest) (err error) {// Create a list of undo actionsvar compensations []func() (restate.Void, error)// Run compensations at the end if err != nildefer func() {if err != nil {for _, compensation := range compensations {if _, compErr := compensation(); compErr != nil {err = compErr}}}}()compensations = append(compensations, func() (restate.Void, error) {return restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) {return CancelFlight(req.CustomerId)})})if _, err = restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) {return BookFlight(req.CustomerId, req.Flight)}); err != nil {return err}compensations = append(compensations, func() (restate.Void, error) {return restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) {return CancelCar(req.CustomerId)})})if _, err = restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) {return BookCar(req.CustomerId, req.Car)}); err != nil {return err}compensations = append(compensations, func() (restate.Void, error) {return restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) {return CancelHotel(req.CustomerId)})})if _, err = restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) {return BookHotel(req.CustomerId, req.Hotel)}); err != nil {return err}return nil}func main() {if err := server.NewRestate().Bind(restate.Reflect(BookingWorkflow{})).Start(context.Background(), ":9080"); err != nil {log.Fatal(err)}}
When to use Sagas
Restate runs invocations till completion, with infinite retries and recovery of partial progress. In that sense, you do not require to run compensations in between retries. Restate will start the retry attempt from the point where the invocation failed.
However, there can still be cases in your business logic where you want to stop a handler from executing any further and run compensations for the work done so far.
You will also need sagas to end up in a consistent state when you cancel an invocation (via the CLI or programmatically). For example, if an invocation gets stuck because an external system is not responding, you might want to stop executing the invocation while keeping the overall system state consistent.
Registering compensations
Because this is all implemented in pure user code, there are no restrictions on what you can do in compensations, as long as its idempotent.
It is for example possible to reset the state of the service, call other services to undo previously executed calls, or run ctx.run
actions to delete previously inserted rows in a database.
Adding compensations
Depending on the characteristics of the API, adding the compensation might look different:
-
The flights and cars require to first reserve, and then use the ID you get to confirm or cancel. In this case, we add the compensation after creating the reservation (because we need the ID).
-
The example of the payment API requires you to generate an idempotency key yourself, and executes in one shot. Here, we add the compensation before performing the action, using the same UUID. This way, we ensure that a payment which throws a terminal error did not go through.