Mastering transactions with Knex.js and Objection.js
Add Postgres transactions in a Node.js application with Knex.js and Objection.js avoiding the pitfalls.
Transactions symbolize a unit of work that allows a group of requests to be processed in isolation. They do two things:
- Consistency: if one of the transaction requests fails, the transaction is invalidated.
- Isolation: if a piece of data used in a transaction changes during it, the transaction is invalidated.
In the age of serverless, our applications must be fault tolerant. Transactions are therefore crucial to ensure the consistency of the database over the long term. Finding yourself with partial or incomplete data in the database can produce bugs that are very difficult to identify and correct.
Managing transactions is a real challenge. In this article, I will give you all the tips to have a smooth Postgres transaction management using Knex.js and Objection.js.
Knex.js
Knex.js is a SQL query builder compatible with many SQL databases including Postgres. It allows you to compose SQL queries in a simple and intuitive way. It may seem superfluous, but it is common in a complex application to have to add a where
to a query conditionally; Knex.js makes this easy:
const query = knex('users')if (activated) {query.where('activated', activated)}const users = await query
Knex.js also manages transactions thanks to a transaction
method:
async function createUserAndMovie() {await knex.transaction(async (trx) => {const [user, movie] = await Promise.all([trx('users').insert({ name: 'James' }),trx('movies').insert({ name: 'Matrix' }),])await trx('users_movies').insert({ userId: user.id, movieId: movie.id })})}
Objection.js
Objection.js is an ORM based on Knex.js, it adds a "Model" layer with validation and extensive relationship management.
class User extends Model {static get tableName() {return 'users'}static get relationMappings() {return {children: {relation: Model.HasManyRelation,modelClass: User,join: {from: 'users.id',to: 'users.parentId',},},}}}
Objection.js also manages transactions, by binding one or more models to a transaction:
try {const scrappy = await Person.transaction(async (trx) => {const james = await Person.query(trx).insert({firstName: 'James',lastName: 'Bond',})const scrappy = await james.$relatedQuery('pets', trx).insert({ name: 'Scrappy' })return scrappy})console.log('Great success! Both James and Scrappy were inserted')} catch (err) {console.log('Something went wrong. Neither James nor Scrappy were inserted')}
Transaction issues
On paper, whether using Knex.js or Objection.js, it is quite easy to add transactions; but in practice, it's a different story! When adding transactions to an application, there are usually several issues that arise.
Non-transaction queries
Let's take the example from the Objection.js documentation:
const scrappy = await Person.transaction(async (trx) => {const james = await Person.query(trx).insert({firstName: 'James',lastName: 'Bond',})const scrappy = await james.$relatedQuery('pets', trx).insert({ name: 'Scrappy' })return scrappy})
Person.transaction
starts a transaction on thePerson
model, which means that all requests from this model will be binded to the transaction.trx
represents the transaction, it is equivalent to an instance ofKnex
, by passing this object in a query we can use the transaction for other queries than those binded to thePerson
model.$relatedQuery ('pets', trx)
calls thePet
model and thanks totrx
, the transaction is used here too.
This example takes a few lines but in a real case it is very likely that your code is distributed in several functions (and even several files):
async function createUser({ firstName, lastName }) {return Person.query(trx).insert({firstName,lastName,})}async function createUserPet(user, { name }) {return user.$relatedQuery('pets').insert({ name })}const scrappy = await Person.transaction(async () => {const james = await createUser({ firstName, lastName })const scrappy = await createUserPet(user, { name: 'Scrappy ' })return scrappy})
This code may look correct, but it has a bug:
- The
Person
model is successfully created in the transaction because the model is binded. - The pet is not created in the transaction because
trx
is not passed to the$relatedQuery
.
This kind of bug is very common, even with vigilance, the chances of forgetting a trx
are high. If you do forget, this can translate into several things.
- An explicit error
We depend on an object created in a transaction outside of the transaction, we notice it quickly enough and we correct it, no real worries in this case.
- No error
When the operations are decorated with one another, we will not notice it. However, there is a side effect, a transaction uses a connection to the dedicated database. When making requests in parallel with a transaction, two connections are used simultaneously, this can result in several errors:
DBError: too many connections for role xxx
: too many connections to the database, in this case you must adjust thepool.max
value to not exceed the number of connections authorized to the database.TimeoutError: Knex: Timeout acquiring a connection. The pool is probably full. A re you missing a .transacting(trx) call?
: the number of connections to the base is not sufficient and creates a bottleneck.
Orchestration
Another problem encountered is the orchestration. For example when we go through an "event broker" to delegate the processing to another process.
As long as a transaction is not commited, any data that it inserts or modifies is not taken into account until the transaction is commited.
async function createUser({ firstName, lastName }) {const person = await Person.query(trx).insert({firstName,lastName,})await publishPerson(james.id)return person}async function createUserPet(user, { name }) {return user.$relatedQuery('pets').insert({ name })}const scrappy = await Person.transaction(async () => {const james = await createUser({ firstName, lastName })const scrappy = await createUserPet(user, { name: 'Scrappy ' })return scrappy})
When publishPerson(james.id)
is executed, james
is not yet in the database, if we try to retrieve this object outside of the transaction, then it will not be found.
To avoid this we can obviously remove the publishPerson
from the transaction, but in a modular code this is not always easy.
Mastering transactions
After several years of using transactions in my projects, I have developed several techniques to avoid these problems.
Standalone transactions
The binding that Objection.js offers with the model is often a source of error, so I try not to use it. Instead I define a transaction
method which is not binded to any model. Knex's native transaction
method does not support "chaining", that's why we define a new one.
/*** @param {any} maybeTrx* @returns {maybeTrx is import('objection').TransactionOrKnex & { executionPromise: Promise<any> }}*/function checkIsTransaction(maybeTrx) {return Boolean(maybeTrx && maybeTrx.executionPromise)}/*** @param {import('objection').TransactionOrKnex | undefined | ((trx: import('objection').TransactionOrKnex) => Promise.<any>)} trxOrCallback* @param {(trx: import('objection').TransactionOrKnex) => Promise.<any>} [maybeCallback]* @returns {Promise.<any>}*/export const transaction = (trxOrCallback, maybeCallback) => {if (!knex) {throw new Error(`transaction is not initialized`)}if (maybeCallback === undefined) {if (typeof trxOrCallback !== 'function') {throw new Error(`Invalid transaction call`)}return knex.transaction(trxOrCallback)}if (checkIsTransaction(trxOrCallback)) {return maybeCallback(trxOrCallback)}return knex.transaction(maybeCallback)}
To use a transaction, we will explicitly pass the trx
wherever it is needed:
const scrappy = await transaction(async (trx) => {const james = await Person.query(trx).insert({firstName: 'James',lastName: 'Bond',})const scrappy = await james.$relatedQuery('pets', trx).insert({ name: 'Scrappy' })return scrappy})
Pass trx
everywhere
Each method that uses templates must accept options to complete a transaction. In this way we can execute any operation in a transaction:
async function createUser({ firstName, lastName }, { trx } = {}) {const person = await Person.query(trx).insert({firstName,lastName,})return person}async function createUserPet(user, { name }, { trx }) {return user.$relatedQuery('pets', trx).insert({ name })}const scrappy = await transaction(async (trx) => {const james = await createUser({ firstName, lastName }, { trx })const scrappy = await createUserPet(user, { name: 'Scrappy ' }, { trx })return scrappy})
I spoke a little earlier about "chaining", this allows to create an intermediate method which must be executed in a transaction and to be able to include it in a larger transaction. In our case it would be like creating a createUserAndPet
method:
async function createUser({ firstName, lastName }, { trx } = {}) {const person = await Person.query(trx).insert({firstName,lastName,})return person}async function createUserPet(user, { name }, { trx }) {return user.$relatedQuery('pets', trx).insert({ name })}// This method can be used inside another transaction, or standalone// it will always executed in a transaction.export async function createUserAndPet(user, pet, { trx }) {return await transaction(trx, async (trx) => {const james = await createUser(user, { trx })const scrappy = await createUserPet(user, pet, { trx })return scrappy})}
Orchestration
I assume that you should not reduce the readability and modularity of your code for technical issues. Based on this principle, calls that must wait for the end of the transaction should not be removed from the transaction.
To avoid access problems, I therefore use a method that allows you to add a callback that will be executed once the transaction is complete:
/*** Wait for a transaction to be complete.* @param {import('objection').TransactionOrKnex} [trx]*/async function waitForTransaction(trx) {return Promise.resolve(checkIsTransaction(trx) ? trx.executionPromise : null)}/*** Run a callback when the transaction is done.* @param {import('objection').TransactionOrKnex | undefined} trx* @param {Function} callback*/export function runAfterTransaction(trx, callback) {waitForTransaction(trx).then(() => {// If transaction success, then run actionreturn Promise.resolve(callback()).catch((error) => {setTimeout(() => {throw error})})},() => {// Ignore transaction error},)}
Thanks to this method, we can wait for the end of a transaction anywhere in the code:
async function createUser({ firstName, lastName }, { trx } = {}) {const person = await Person.query(trx).insert({firstName,lastName,})runAfterTransaction(trx, async () => {await publishPerson(james.id)})return person}async function createUserPet(user, { name }, { trx }) {return user.$relatedQuery('pets', trx).insert({ name })}const scrappy = await transaction(async (trx) => {const james = await createUser({ firstName, lastName }, { trx })const scrappy = await createUserPet(user, { name: 'Scrappy ' }, { trx })return scrappy})
The downside of this method is error handling, the code is executed outside of the main promise.
Avoid forgetting trx
All these techniques make it possible to manage transactions in a clean way within the application. On the other hand, they do not make it possible to ensure that a trx
has not been forgotten.
As I said, when we forget a trx
we will necessarily use at least two connections to the database: one for the transaction and one for the request forgotten outside the transaction.
It is therefore possible to detect omissions by configuring Knex.js so that it only uses a single connection:
const knex = require('knex')({client: 'postgres',connection: {host: '127.0.0.1',user: 'your_database_user',password: 'your_database_password',database: 'myapp_test',},pool: { min: 1, max: 1 },})
Obviously these settings are strongly not recommended in production, on the other hand for the execution of unit tests it is an excellent way to detect omissions. If you forget, the test will not end and will fall into a timeout.
Thanks to these methods, my team and I manage to guarantee the consistency of the data on a large project. Hope they will be of use to you too!