Maîtrisez les transactions avec Knex.js et Objection.js

Photo par Holly Stratton sur Unsplash
Cet article est aussi disponible en : English

Ajouter des transactions Postgres dans une application Node.js avec Knex.js et Objection.js en évitant les pièges.

Les transactions symbolisent une unité de travail qui permet de traiter un groupe de requêtes de manière isolée. Elles assurent deux choses :

  • La cohérence : si une des requêtes de la transaction échoue, la transaction est invalidée.
  • L'isolation : si une donnée utilisée dans une transaction change au cours de celle-ci, la transaction est invalidée.

A l'heure du serverless, nos applications se doivent d'être tolérantes à un arrêt brutal. Les transactions sont donc cruciales pour assurer la cohérence de la base de données sur le long terme. Se retrouver avec des données partielles ou incomplètes en base peut donner lieu à des bugs très difficiles à identifier et à corriger.

La gestion des transactions est un réel challenge. Dans cet article, je vous donnerai toutes les astuces pour avoir une gestion des transactions Postgres aux petits oignons en utilisant Knex.js et Objection.js.

Knex.js

Knex.js est un SQL query builder compatible avec de nombreuses bases SQL dont Postgres. Il permet de composer des requêtes SQL de manière simple et intuitive. Cela peut paraître superflu mais il est courant dans une application complexe d'avoir à ajouter un where à une requête de manière conditionnelle; Knex.js rend cela facile :

const query = knex('users')
if (activated) {
query.where('activated', activated)
}
const users = await query

Knex.js gère également les transactions grâce à une méthode transaction :

async function createUserAndMovie() {
await knex.transaction(async (trx) => {
const [user, movie] = await Promise.all([
trx('users').insert({ name: 'James' }),
trx('movies').insert({ name: 'Matrix' }),
])
await trx('users_movies').insert({ userId: user.id, movieId: movie.id })
})
}

Objection.js

Objection.js est un ORM basé sur Knex.js, il ajoute une couche "Model" avec de la validation et une gestion des relations poussées.

class User extends Model {
static get tableName() {
return 'users'
}
static get relationMappings() {
return {
children: {
relation: Model.HasManyRelation,
modelClass: User,
join: {
from: 'users.id',
to: 'users.parentId',
},
},
}
}
}

Objection.js gère également les transactions, en liant un ou plusieurs modèles à une transaction :

try {
const scrappy = await Person.transaction(async (trx) => {
const james = await Person.query(trx).insert({
firstName: 'James',
lastName: 'Bond',
})
const scrappy = await james
.$relatedQuery('pets', trx)
.insert({ name: 'Scrappy' })
return scrappy
})
console.log('Great success! Both James and Scrappy were inserted')
} catch (err) {
console.log('Something went wrong. Neither James nor Scrappy were inserted')
}

Problèmes liées aux transactions

Sur le papier, qu'on utilise Knex.js ou Objection.js, il est assez simple d'ajouter des transactions; mais dans la pratique, c'est une autre paire de manches ! Lorsqu'on ajoute des transactions dans une application, on fait généralement face à plusieurs problèmes.

Requêtes hors transaction

Reprenons l'exemple de la documentation d'Objection.js :

const scrappy = await Person.transaction(async (trx) => {
const james = await Person.query(trx).insert({
firstName: 'James',
lastName: 'Bond',
})
const scrappy = await james
.$relatedQuery('pets', trx)
.insert({ name: 'Scrappy' })
return scrappy
})
  • Person.transaction démarre une transaction sur le modèle Person, ce qui signifie que toutes les requêtes issues de ce modèle seront liées à la transaction.
  • trx représente la transaction, c'est équivalent à une instance de Knex, en passant cet objet dans une query on peut utiliser la transaction pour d'autres requêtes que celles liées au modèle Person.
  • $relatedQuery('pets', trx) fait appel au modèle Pet et grâce à trx, la transaction est utilisée ici aussi.

Cet exemple tient sur quelques lignes mais dans un cas réel il est fort probable que votre code soit réparti dans plusieurs fonctions (et même plusieurs fichiers) :

async function createUser({ firstName, lastName }) {
return Person.query(trx).insert({
firstName,
lastName,
})
}
async function createUserPet(user, { name }) {
return user.$relatedQuery('pets').insert({ name })
}
const scrappy = await Person.transaction(async () => {
const james = await createUser({ firstName, lastName })
const scrappy = await createUserPet(user, { name: 'Scrappy ' })
return scrappy
})

Ce code peut sembler correct, mais il comporte un bug :

  • Le modèle Person est bien créé dans la transaction car le modèle est lié.
  • Le pet n'est pas créé dans la transaction car trx n'est pas passé au $relatedQuery.

Ce genre de bug est très fréquent, même en étant vigilant, les chances d'oublier de passer un trx sont nombreuses. En cas d'oubli, cela peut se traduire par plusieurs choses.

  1. Une erreur explicite

On dépend d'un objet créé dans une transaction en dehors de la transaction, on le remarque assez vite et on le corrige, pas de réel soucis dans ce cas.

  1. Aucune erreur

Lorsque les opérations sont décorellées l'une de l'autre, on ne va pas s'en apercevoir. Cependant il se produit un effet de bord, une transaction utilise une connexion à la base dédiée. Lorsqu'on effectue des requêtes en parallèle d'une transaction, on utilise alors deux connexions simultanément, cela peut se traduire par plusieurs erreurs :

  • DBError: too many connections for role xxx : trop de connexions à la base, dans ce cas il faut ajuster la valeur pool.max afin de ne pas dépasser le nombre de connexions autorisées à la base.
  • TimeoutError: Knex: Timeout acquiring a connection. The pool is probably full. A re you missing a .transacting(trx) call? : le nombre de connexions à la base n'est pas suffisant et crée un goulot d'étranglement.

Orchestration

L'autre problème rencontré c'est l'orchestration. Par exemple lorsqu'on passe par un "event broker" pour déléguer le traitement à un autre process.

Tant qu'une transaction n'est pas "commited", toutes les données qu'elle insert ou modifie ne sont pas pris en compte tant que la transaction n'est pas "commited".

async function createUser({ firstName, lastName }) {
const person = await Person.query(trx).insert({
firstName,
lastName,
})
await publishPerson(james.id)
return person
}
async function createUserPet(user, { name }) {
return user.$relatedQuery('pets').insert({ name })
}
const scrappy = await Person.transaction(async () => {
const james = await createUser({ firstName, lastName })
const scrappy = await createUserPet(user, { name: 'Scrappy ' })
return scrappy
})

Au moment où publishPerson(james.id) est exécuté, james n'est pas encore dans la base, si on tente de récupérer cet objet en dehors de la transaction, il sera alors introuvable.

Pour éviter cela on peut évidemment sortir le publishPerson de la transaction, mais dans un code modulaire ce n'est pas toujours évident.

Maîtriser les transactions

Après plusieurs anneés à utiliser les transactions dans mes projets, j'ai mis au point plusieurs techniques permettant d'éviter ces problèmes.

Transactions standalone

La liaison que propose Objection.js avec le modèle est souvent source d'erreur, je m'efforce donc de ne pas l'utiliser. A la place je définis une méthode transaction qui n'est liée à aucun modèle. La méthode transaction native de Knex ne supporte pas le "chaining", c'est pour cela qu'on en définit une nouvelle.

/**
* @param {any} maybeTrx
* @returns {maybeTrx is import('objection').TransactionOrKnex & { executionPromise: Promise<any> }}
*/
function checkIsTransaction(maybeTrx) {
return Boolean(maybeTrx && maybeTrx.executionPromise)
}
/**
* @param {import('objection').TransactionOrKnex | undefined | ((trx: import('objection').TransactionOrKnex) => Promise.<any>)} trxOrCallback
* @param {(trx: import('objection').TransactionOrKnex) => Promise.<any>} [maybeCallback]
* @returns {Promise.<any>}
*/
export const transaction = (trxOrCallback, maybeCallback) => {
if (!knex) {
throw new Error(`transaction is not initialized`)
}
if (maybeCallback === undefined) {
if (typeof trxOrCallback !== 'function') {
throw new Error(`Invalid transaction call`)
}
return knex.transaction(trxOrCallback)
}
if (checkIsTransaction(trxOrCallback)) {
return maybeCallback(trxOrCallback)
}
return knex.transaction(maybeCallback)
}

Pour utiliser une transaction, on passera donc explicitement le trx partout où il est nécessaire :

const scrappy = await transaction(async (trx) => {
const james = await Person.query(trx).insert({
firstName: 'James',
lastName: 'Bond',
})
const scrappy = await james
.$relatedQuery('pets', trx)
.insert({ name: 'Scrappy' })
return scrappy
})

Passage systématique du trx

Chaque méthode faisant appel à des modèles doit accepter des options permettant de passer une transaction. De cette manière on pourra exécuter n'importe quelle opération dans une transaction :

async function createUser({ firstName, lastName }, { trx } = {}) {
const person = await Person.query(trx).insert({
firstName,
lastName,
})
return person
}
async function createUserPet(user, { name }, { trx }) {
return user.$relatedQuery('pets', trx).insert({ name })
}
const scrappy = await transaction(async (trx) => {
const james = await createUser({ firstName, lastName }, { trx })
const scrappy = await createUserPet(user, { name: 'Scrappy ' }, { trx })
return scrappy
})

Je parlais un peu plus haut de "chaining", cela permet de créer une méthode intermédiaire qui doit s'exécuter dans une transaction et de pouvoir l'inclure dans une transaction plus large. Dans notre cas ça reviendrait à créer une méthode createUserAndPet :

async function createUser({ firstName, lastName }, { trx } = {}) {
const person = await Person.query(trx).insert({
firstName,
lastName,
})
return person
}
async function createUserPet(user, { name }, { trx }) {
return user.$relatedQuery('pets', trx).insert({ name })
}
// This method can be used inside another transaction, or standalone
// it will always executed in a transaction.
export async function createUserAndPet(user, pet, { trx }) {
return await transaction(trx, async (trx) => {
const james = await createUser(user, { trx })
const scrappy = await createUserPet(user, pet, { trx })
return scrappy
})
}

Orchestration

Je pars du principe qu'on ne doit pas réduire la lisibilité et la modularité de son code pour des problématiques techniques. Partant de ce principe, on ne doit pas sortir de la transaction les appels qui doivent attendre la fin de celle-ci.

Pour éviter les problèmes d'accès, j'utilise donc une méthode qui permet d'ajouter un callback qui sera exécuté une fois la transaction terminée :

/**
* Wait for a transaction to be complete.
* @param {import('objection').TransactionOrKnex} [trx]
*/
async function waitForTransaction(trx) {
return Promise.resolve(checkIsTransaction(trx) ? trx.executionPromise : null)
}
/**
* Run a callback when the transaction is done.
* @param {import('objection').TransactionOrKnex | undefined} trx
* @param {Function} callback
*/
export function runAfterTransaction(trx, callback) {
waitForTransaction(trx).then(
() => {
// If transaction success, then run action
return Promise.resolve(callback()).catch((error) => {
setTimeout(() => {
throw error
})
})
},
() => {
// Ignore transaction error
},
)
}

Grâce à cette méthode, on peut attendre la fin d'une transaction n'importe où dans le code :

async function createUser({ firstName, lastName }, { trx } = {}) {
const person = await Person.query(trx).insert({
firstName,
lastName,
})
runAfterTransaction(trx, async () => {
await publishPerson(james.id)
})
return person
}
async function createUserPet(user, { name }, { trx }) {
return user.$relatedQuery('pets', trx).insert({ name })
}
const scrappy = await transaction(async (trx) => {
const james = await createUser({ firstName, lastName }, { trx })
const scrappy = await createUserPet(user, { name: 'Scrappy ' }, { trx })
return scrappy
})

L'inconvénient de cette méthode, c'est la gestion des erreurs, le code est exécuté hors de la promise principale.

Éviter les oublis de trx

Toutes ces techniques permettent de gérer les transactions de manière propre au sein de l'application. En revanche elles ne permettent pas de s'assurer qu'un trx n'a pas été oublié.

Comme je l'ai dit, lorsqu'on oublie un trx on va forcément utiliser deux connexions à la base : une pour la transaction et une pour la requête oubliée hors transaction.

Il est donc possible de détecter les oublis en configurant Knex.js pour qu'il n'utilise une seule connexion :

const knex = require('knex')({
client: 'postgres',
connection: {
host: '127.0.0.1',
user: 'your_database_user',
password: 'your_database_password',
database: 'myapp_test',
},
pool: { min: 1, max: 1 },
})

Évidemment ces réglages sont fortement déconseillés en production, en revanche pour l'exécution des tests unitaires c'est un excellent moyen pour détecter les oublis. En cas d'oubli, le test ne se terminera pas et tombera en timeout.


Grâce à ces méthodes, mon équipe et moi arrivons à garantir la cohérence des données sur un projet conséquent. J'espère qu'elles vous seront utiles aussi !

Discuter sur TwitterÉditer sur GitHub