Intro
사이드 프로젝트를 하나 진행하다가 Neo4J database, MongoDB를 같이 사용하게 되었는데, 문제가 있었다. 두 DB간의 Transaction의 관리를 하려다 보니, 아래 코드와 같이 business logic 안에서 불필요한 try , catch문과 함꼐 transaction을 계속 선언 해야 하는 문제가 생겼다.
async create (createDto: CreateDto): Promise<void> {
const neo4jTransaction = this.neo4jService
.getWriteSession()
.beginTransaction();
const mongoSession = await this.mongoConnection.startSession();
const mongoTransaction = mongoSession.startTransaction();
try {
//neo4j database repository
await this.neo4jRepository.createBarNode(neo4jModel, neo4jTransaction);
//mongoDB database repository
await this.mongoRepository.create(mongoModel);
await neo4jTransaction.commit();
await mongoSession.commitTransaction();
await mongoSession.endSession();
} catch (e) {
await neo4jTransaction.rollback();
await mongoSession.abortTransaction();
throw e;
}
}
@Transactional()
그래서 이번에 만들어 볼 transactional 환경은 TypeOrm , PostgreSQL 환경에서 돌아가는 일반적인 데코레이터를 만들 것은 아니다. 그 부분에 대해선 이미 다른 사람들의 글들이 많이 있고 거기에 아예 라이브러리까지 나와있기 때문에 그냥 사용하면 될 것 같다.
대신에 위에 나온 것과 같이 다른 DB도 위와 비슷하게 @Transactional()
데코레이터를 만들어서 관리할 수 있다라는 정도의 참고로 받아들여주면 좋을 것 같다.
cls-hooked
우선 @Transactional()
데코레이터를 만들기 위해 핵심이 되는 라이브러리가 바로 cls-hooked
이다. JS runtime인 nodejs / bunjs
에서는 비동기처리가 싱글 스레드에서 일어난다. 멀티스레드 기반의 다른 언어는 단순하게 각 스레드에서 context를 정의하고 그에 대한 정보를 접근 할 수 있는 반면에, JS runtime들은 이와 같은 접근이 불가능하다. 그를 가능하게 만들어주는 라이브러리 cls-hooked이다. (찾아보니까 cls를 wrapping하는 다른 라이브러리도 꽤 많다.)
cls는 각 callback chain에서 접근할 수 있는 로컬 스토리지 같은 것을 제공해 주는데, 이를 이용해서 우리는 transaction에 관한 메타데이터, 그리고 transaction을 처리할 예정이다.
Architecture
위 그림은 cls-hooked를 이용해서 어떻게 transactional 데코레이터를 구현 할 지에 대한 간단한 모식도이다. 크게 다음과 같은 단계를 따른다.
- 각 요청이 interceptor에 도달하기 전에, 각 요청이 사용하는 DB 혹은 transaction type(write , read) 에 대한 metadata를 주입한다.
- Interceptor에서 namespace가 있는지 검사하고 없으면 새로 생성한다.
- namespace에 어떤 DB , transaction type이 들어올 지 metadata를 통해 읽고, 그에 맞춰서 namespace에 관련해서 manage 할 수 있는 객체들을 주입한다.
- Business logic이 실행 되기 전에 앞서서 주입한 transaction managing object들을 이용해서 transaction을 start한다.
- Business logic이 끝나면 다시 주입한 transaction managing object로 transaction을 commit 한다.
- 모든 과정이 완료되면 response를 반환하고 namespace가 더 이상 active하지 않을 경우 namespace를 종료한다.
이를 구현하기 위해서 TransactionalInterceptor()
@Transactional()
TransactionalHelper
그리고 interceptor가 적용되기 전에 다양한 metadata를 주입해주는 decorator가 필요하다.
Inject transaction metadata
우선 가장 먼저 해야할 것은 transaction에 대한 metadata를 주입하는 것이다.
//transactional.interceptor.ts
import {
Injectable,
NestInterceptor,
ExecutionContext,
CallHandler,
} from '@nestjs/common';
import { InjectConnection } from '@nestjs/mongoose';
import { createNamespace, getNamespace } from 'cls-hooked';
import mongoose, { mongo } from 'mongoose';
import { Neo4jService } from 'nest-neo4j/dist';
import { Observable, lastValueFrom } from 'rxjs';
import { from } from 'rxjs';
export const TRANSACTION_NAMESPACE = 'TRANSACTION_NAMESPACE';
export const TRANSACTION_MONGO = 'TRANSACTION_MONGO';
export const TRANSACTION_NEO4J = 'TRANSACTION_NEO4J';
export const MONGO_METADATA = 'mongoMetaData';
export const NEO4J_METADATA = 'neo4jMetaData';
export type MongoMeataData = {
useMongo: boolean;
};
export type Neo4jMetaData = {
useNeo4j: boolean;
txType?: 'READ' | 'WRITE';
};
@Injectable()
export class TrnasactionalInterceptor implements NestInterceptor {
constructor(
private readonly neo4jService: Neo4jService,
@InjectConnection() private readonly mongoConnection: mongoose.Connection,
) {}
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
const nameSpace =
getNamespace(TRANSACTION_NAMESPACE) ??
createNamespace(TRANSACTION_NAMESPACE);
const mongoMetaData: MongoMeataData = Reflect.getMetadata(
'mongoMetaData',
context.getHandler(),
);
const neo4jMetaData: Neo4jMetaData = Reflect.getMetadata(
'neo4jMetaData',
context.getHandler(),
);
return from(
nameSpace.runAndReturn(async () => {
await this.setMongoTransaction(mongoMetaData);
await this.setNeo4jTransaction(neo4jMetaData);
return await lastValueFrom(next.handle());
}),
);
}
private async setMongoTransaction(
mongoMeataData: MongoMeataData | undefined,
) {
const namespace = getNamespace(TRANSACTION_NAMESPACE);
if (!namespace || !namespace.active) {
throw new Error(`${TRANSACTION_NAMESPACE} is not active`);
}
if (mongoMeataData === undefined || mongoMeataData.useMongo === false) {
namespace.set('TRANSACTION_MONGO', null);
return;
}
const mongoSession = await this.mongoConnection.startSession();
namespace.set('TRANSACTION_MONGO', mongoSession);
}
private async setNeo4jTransaction(neo4jMetaData: Neo4jMetaData | undefined) {
const namespace = getNamespace(TRANSACTION_NAMESPACE);
if (!namespace || !namespace.active) {
throw new Error(`${TRANSACTION_NAMESPACE} is not active`);
}
if (neo4jMetaData === undefined || neo4jMetaData.useNeo4j === false) {
namespace.set('TRANSACTION_NEO4J', null);
return;
}
if (neo4jMetaData.txType === 'READ') {
const neo4jTransaction = this.neo4jService
.getReadSession()
.beginTransaction();
namespace.set('TRANSACTION_NEO4J', neo4jTransaction);
return;
}
const neo4jTransaction = this.neo4jService
.getWriteSession()
.beginTransaction();
namespace.set('TRANSACTION_NEO4J', neo4jTransaction);
}
}
TransactionalInterceptor가 하는 2가지가 있다.
- Request에서 요청이 들어오면 namespace가 있다 확인해 주고, 만약에 없다면 새로운 namespace를 만든다.
Reflect
에서 metadata를 받아서, mongo DB, Neo4J DB에 대한 connection 정보를 어떻게 설정할지 정하고, 관련된 managing object를 namespace에 주입해준다.
그러면 여기서 한 가지 의문이 생기는데, 메타데이터는 어디서 받는 것일까? 그에 대한 코드가 아래에 있다.
//inject.transaction.ts
import { SetMetadata, UseInterceptors } from '@nestjs/common';
import {
MONGO_METADATA,
MongoMeataData,
NEO4J_METADATA,
TrnasactionalInterceptor,
} from '@util/transaction/transactional.intercepter';
export function InjectMongo(): MethodDecorator {
return (target, key, descriptor) => {
const mongoMeataData: MongoMeataData = {
useMongo: true,
};
SetMetadata(MONGO_METADATA, mongoMeataData)(target, key, descriptor);
UseInterceptors(TrnasactionalInterceptor)(target, key, descriptor);
return descriptor;
};
}
export function InjectWriteNeo4j(): MethodDecorator {
return (target, key, descriptor) => {
const neo4jMetaData = {
useNeo4j: true,
txType: 'WRITE',
};
SetMetadata(NEO4J_METADATA, neo4jMetaData)(target, key, descriptor);
UseInterceptors(TrnasactionalInterceptor)(target, key, descriptor);
return descriptor;
};
}
export function InjectReadNeo4j(): MethodDecorator {
return (target, key, descriptor) => {
const neo4jMetaData = {
useNeo4j: true,
txType: 'READ',
};
SetMetadata(NEO4J_METADATA, neo4jMetaData)(target, key, descriptor);
UseInterceptors(TrnasactionalInterceptor)(target, key, descriptor);
return descriptor;
};
}
export function InjectMongoAndWriteNeo4j(): MethodDecorator {
return (target, key, descriptor) => {
const mongoMeataData: MongoMeataData = {
useMongo: true,
};
const neo4jMetaData = {
useNeo4j: true,
txType: 'WRITE',
};
SetMetadata(MONGO_METADATA, mongoMeataData)(target, key, descriptor);
SetMetadata(NEO4J_METADATA, neo4jMetaData)(target, key, descriptor);
UseInterceptors(TrnasactionalInterceptor)(target, key, descriptor);
return descriptor;
};
}
export function InjectMongoAndReadNeo4j(): MethodDecorator {
return (target, key, descriptor) => {
const mongoMeataData: MongoMeataData = {
useMongo: true,
};
const neo4jMetaData = {
useNeo4j: true,
txType: 'READ',
};
SetMetadata(MONGO_METADATA, mongoMeataData)(target, key, descriptor);
SetMetadata(NEO4J_METADATA, neo4jMetaData)(target, key, descriptor);
UseInterceptors(TrnasactionalInterceptor)(target, key, descriptor);
return descriptor;
};
}
아래의 코드를 보게 되면 Interceptor로 들어가기 전에 SetMetaData로 각 DB에 대한 metadata를 설정함을 알 수 있다. 이렇게 되면 controller에서는 만약에 이 요청이 mongoDB와 neo4j의 write transaction을 사용한다고 하면 다음과 같이 작성할 수 있다.
//controller.ts
@Post()
@InjectMongoAndWriteNeo4j()
async create(@Body() createDto: CreateDto) {
await this.commandService.create(createDto);
return { message: 'Created successfully' };
}
Transactional decorator
Transaction에 대한 metadata가 주입되었으면, 이제 그 namespace에서 주입된 object를 꺼내와서 실제로 transaction을 이용해야 한다.
//transactional.ts
import { InternalServerErrorException } from '@nestjs/common';
import {
TRANSACTION_MONGO,
TRANSACTION_NAMESPACE,
TRANSACTION_NEO4J,
} from '@util/transaction/transactional.intercepter';
import { getNamespace } from 'cls-hooked';
import mongoose, { mongo } from 'mongoose';
import { TransactionPromise } from 'neo4j-driver-core';
export function Transactional() {
return function (
_target: any,
_propertyKey: string | symbol,
descriptor: TypedPropertyDescriptor<any>,
) {
const originalMethod = descriptor.value;
async function transactionWrapped(...args: unknown[]) {
const nameSpace = getNamespace(TRANSACTION_NAMESPACE);
if (!nameSpace || !nameSpace.active)
throw new InternalServerErrorException(
`${TRANSACTION_NAMESPACE} is not active`,
);
const mongoSession: mongoose.ClientSession =
nameSpace.get(TRANSACTION_MONGO);
if (mongoSession === undefined)
throw new InternalServerErrorException(
`Could not find MongoDB session in ${TRANSACTION_NAMESPACE} nameSpace`,
);
// Get Neo4j Transaction
const neo4jTransaction: TransactionPromise =
nameSpace.get(TRANSACTION_NEO4J);
if (neo4jTransaction === undefined)
throw new InternalServerErrorException(
`Could not find Neo4j transaction in ${TRANSACTION_NAMESPACE} nameSpace`,
);
try {
if (mongoSession) {
mongoSession.startTransaction();
}
const result = await originalMethod.apply(this, args);
if (neo4jTransaction) {
neo4jTransaction.commit();
}
if (mongoSession) {
mongoSession.commitTransaction();
}
return result;
} catch (error) {
if (neo4jTransaction) {
neo4jTransaction.rollback();
}
if (mongoSession) {
mongoSession.abortTransaction();
}
throw error;
} finally {
if (mongoSession) {
mongoSession.endSession();
}
}
}
descriptor.value = transactionWrapped;
};
}
그를 위해서 위와 같이 trnasactional decorator를 만들었고 이 데코레이터는 비즈니스 로직이 동작하는 service layer함수의 데코레이터로 붙게된다.
코드가 길지만 결국 하는 일은 namespace에 주입된 transaction을 managing 할 수 있는 객체를 받아서 transaction 실행, transaction commit을 business logic 앞 뒤로 감싸는 역할을 한다.
이를 실제로 사용하는 예시는 다음과 같다.
@Transactional()
async create (createDto: CreateDto): Promise<void> {
//neo4j database repository
await this.neo4jRepository.createBarNode(neo4jModel);
//mongoDB database repository
await this.mongoRepository.create(mongoModel);
}
맨 위에의 예시와 비교해보면 불필요한 코드들이 싹 정리되고 있음을 볼 수 있다.
다만 문제가 하나 있는데, neo4jRepository는 transaction을 managing하는 객체를 원래 함수의 params로 주입받았었다. 그러나 현재 @transactional()
로는 이 객체를 불러올 수 있는 방법이 없기 때문에, neo4jRepository에서 이를 주입받을 수 있는 기능이 필요하다.
TransactionalHelper
repository에서는 transaction에 대한 기능을 따로 주입받을 수 있는 기능이 없기 때문에, 따로 접근할 수 있는 함수를 만들어줘야 했다. 그 기능을 하는 것이 바로 transactionalHelper이다.
import { Injectable, InternalServerErrorException } from '@nestjs/common';
import {
TRANSACTION_NAMESPACE,
TRANSACTION_NEO4J,
} from '@util/transaction/transactional.intercepter';
import { getNamespace } from 'cls-hooked';
import { TransactionPromise } from 'neo4j-driver-core';
export type TransactionManagerList = {
neo4jTransaction: TransactionPromise;
};
@Injectable()
export class TransactionalHelper {
getTransaction(): TransactionManagerList {
const nameSpace = getNamespace(TRANSACTION_NAMESPACE);
if (!nameSpace || !nameSpace.active)
throw new InternalServerErrorException(
`${TRANSACTION_NAMESPACE} is not active`,
);
return { neo4jTransaction: nameSpace.get(TRANSACTION_NEO4J) };
}
}
위와같이 단순하게 getTransaction을 호출하면 현재 namespace에서 가져와서 transaction object를 불러오는 것을 확일할 수 있다.
import { Inject, Injectable } from '@nestjs/common';
import { TransactionalHelper } from '@util/transaction/transactional.helper';
import { TransactionPromise } from 'neo4j-driver-core';
@Injectable()
export abstract class Repository {
constructor(
@Inject(TransactionalHelper)
protected readonly txHelper: TransactionalHelper,
) {}
protected getNeo4jTransaction(): TransactionPromise {
const { neo4jTransaction } = this.txHelper.getTransaction();
return neo4jTransaction;
}
}
이를 Abstract class로 만들어서 getNeo4jTransaction()
에 대한 메서드를 작성하고, 각 repository에 extends로 받으면 this.getNeo4jTransaction()
으로 접근할 수 있다.
@Injectable()
export class ARepository extends Repository {
async createANode(model: Model): Promise<Model> {
const neo4jTransaction = this.getNeo4jTransaction();
// 비즈니스 로직
}
Outro
결과적으로 코드 전체는 깔끔하게 리펙토링 되었다. 다만 몇 가지 개선할 점들이 있는데, 우선 neo4j 자체의 OGM(그래프 DB에서는 ORM 대신에 OGM이라고 한다)을 사용하지 못해서 트랜잭션을 미리 시작하고 주입하는 부분이 있다.
그리고 메타데이터를 동적으로 불러오는 것이 아니라 Intercepor를 통과하기 전에 수동으로 넣어주는 방식이기 때문에 이를 해결하면 더 깔끔한 리펙토링이 될 것 같다. 다만 아직은 해결방법을 못찾기도 했고, 나름 성공적으로 리펙토링 했다고 생각해서 글을 작성했다…! 해결을 하면 그 방법을 포함해서 추후 글을 작성할 예정이다.