TypeScriptで学ぶ導入判断と実装パターン(Express/NestJS)
この記事でわかること
CQRSは「更新(Command)」と「参照(Query)」を分けて、それぞれを最適化する設計です。ここでは、向き不向きの見極め、段階導入の型、TypeScriptでの実装(軽量ExpressとNestJSの2通り)、運用でのハマりどころまでをまとめます。
CQRSとは何か
CQRS(Command Query Responsibility Segregation)は、更新処理と参照処理の責務を分離します。単一モデルに両方を詰め込むと、参照は重く、更新は脆くなりがちです。更新側はドメインルールと整合性に集中し、参照側はDTOや非正規化で高速化します。結果として、スケールと保守性を両立しやすくなります。
期待できる効果
- 読み取り/書き込みを独立にスケールしやすい
- 参照DTO/更新集約の役割分担で実装とテストが単純化
- 異種ストア(RDB+検索/キャッシュ等)を使い分けられる
- 更新境界が明確になり、権限設計がやりやすい
コストとリスク
- コンポーネント増で設計・運用が複雑化
- 別DBなら反映遅延(最終的整合性)が前提に
- 重複/順序/再送など非同期運用の論点が増える
- 小規模CRUDでは過剰設計になりやすい
どんなときに採用する?
- 参照トラフィックが多い、複雑な集計・検索が多い
- 更新ロジックが厳密でビジネスルールが重い
- 読み取りと書き込みで求めるDB特性が大きく異なる
- システム全体ではなく特定の境界コンテキストだけ最適化したい
まずは従来構成で良く、課題が出る箇所から部分的にCQRSを入れるのが現実的です。
段階的なアーキテクチャ
ステップ1:論理分離(同一DBで開始)
- CommandService(更新)とQueryService(参照)を分ける
- 参照はDTO+専用クエリ、更新は集約+トランザクション
- DBは1つのままなので整合性が扱いやすい
ステップ2:フルCQRS(読み書きでDBも分離)
- 書き込み:RDBで厳密管理
- 読み取り:非正規化ビュー、検索エンジン、キャッシュ
- 同期:Outbox→メッセージ配信→プロジェクション(冪等)
TypeScript実装例①:軽量Express版(論理分離→Outbox導入の土台)
依存は
express
, 任意のDBクライアント(例:mysql2
/pg
/better-sqlite3
など想定)。記法はあくまで最小例です。
// domain/User.ts
export class User {
constructor(public readonly id: string, public name: string) {
if (!name.trim()) throw new Error("User.name must not be empty");
}
rename(newName: string) {
if (!newName.trim()) throw new Error("empty name");
this.name = newName;
}
}
// application/commands/CreateUser.ts
export interface CreateUserCommand { name: string; }
export interface IUserRepository {
add(user: User): Promise<void>;
}
export class CreateUserHandler {
constructor(private readonly repo: IUserRepository) {}
async handle(cmd: CreateUserCommand): Promise<{ id: string }> {
const id = crypto.randomUUID();
const user = new User(id, cmd.name);
await this.repo.add(user);
return { id };
}
}
// application/queries/GetUserDetail.ts
export interface GetUserDetailQuery { id: string; }
export type UserDto = { id: string; name: string; createdAt: string; };
export interface IReadDb {
getUserDto(id: string): Promise<UserDto | null>;
}
export class GetUserDetailHandler {
constructor(private readonly readDb: IReadDb) {}
async handle(q: GetUserDetailQuery) { return this.readDb.getUserDto(q.id); }
}
// infrastructure/SqlUserRepository.ts
import type { IUserRepository } from "../application/commands/CreateUser";
import { User } from "../domain/User";
export class SqlUserRepository implements IUserRepository {
constructor(private readonly db: { execute: (sql: string, params?: any[]) => Promise<any> }) {}
async add(user: User): Promise<void> {
await this.db.execute(
"INSERT INTO users(id, name, created_at) VALUES(?, ?, CURRENT_TIMESTAMP)",
[user.id, user.name]
);
// ★Outbox(同一Txで書くのが理想。実運用はTx境界できっちり。)
await this.db.execute(
"INSERT INTO outbox(id, type, payload, created_at) VALUES(?, ?, ?, CURRENT_TIMESTAMP)",
[crypto.randomUUID(), "UserCreated", JSON.stringify({ id: user.id, name: user.name })]
);
}
}
// infrastructure/ReadDb.ts
import type { IReadDb, UserDto } from "../application/queries/GetUserDetail";
export class ReadDb implements IReadDb {
constructor(private readonly db: { query: (sql: string, params?: any[]) => Promise<any[]> }) {}
async getUserDto(id: string): Promise<UserDto | null> {
const rows = await this.db.query("SELECT id, name, created_at FROM users WHERE id = ?", [id]);
if (rows.length === 0) return null;
const r = rows[0];
return { id: r.id, name: r.name, createdAt: r.created_at };
}
}
// presentation/http.ts
import express from "express";
import { CreateUserHandler } from "../application/commands/CreateUser";
import { GetUserDetailHandler } from "../application/queries/GetUserDetail";
import { SqlUserRepository } from "../infrastructure/SqlUserRepository";
import { ReadDb } from "../infrastructure/ReadDb";
const app = express();
app.use(express.json());
// 実際はDBクライアントをDI
const writeDb = /* your write connection */;
const readDb = /* your read connection */;
const createUser = new CreateUserHandler(new SqlUserRepository(writeDb));
const getUser = new GetUserDetailHandler(new ReadDb(readDb));
app.post("/users", async (req, res) => {
try {
const result = await createUser.handle({ name: req.body.name });
res.status(201).json(result);
} catch (e: any) {
res.status(400).json({ error: e.message });
}
});
app.get("/users/:id", async (req, res) => {
const dto = await getUser.handle({ id: req.params.id });
if (!dto) return res.status(404).end();
res.json(dto);
});
app.listen(3000);
次の一手:Outboxのレコードをポーリング or ストリームで拾い、Read用テーブルや検索エンジンに冪等で投影します(イベントIDで重複適用を無害化)。
TypeScript実装例②:NestJS+@nestjs/cqrs版(ハンドラ分離が明確)
依存:
@nestjs/common
,@nestjs/core
,@nestjs/cqrs
など。DBは任意(TypeORM/Prisma/Knex/生SQL 等)。
// commands/create-user.command.ts
export class CreateUserCommand {
constructor(public readonly name: string) {}
}
// commands/create-user.handler.ts
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
constructor(private readonly repo: UserRepository) {}
async execute(cmd: CreateUserCommand): Promise<{ id: string }> {
const id = crypto.randomUUID();
const user = new User(id, cmd.name);
await this.repo.add(user); // ここでOutboxにも記録
return { id };
}
}
// queries/get-user.query.ts
export class GetUserQuery { constructor(public readonly id: string) {} }
// queries/get-user.handler.ts
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
@QueryHandler(GetUserQuery)
export class GetUserHandler implements IQueryHandler<GetUserQuery> {
constructor(private readonly readDb: ReadDb) {}
async execute(q: GetUserQuery) {
return this.readDb.getUserDto(q.id); // DTOに最適化
}
}
// users.controller.ts
import { Controller, Get, Param, Post, Body } from '@nestjs/common';
import { CommandBus, QueryBus } from '@nestjs/cqrs';
@Controller('users')
export class UsersController {
constructor(private readonly commandBus: CommandBus, private readonly queryBus: QueryBus) {}
@Post()
create(@Body() body: { name: string }) {
return this.commandBus.execute(new CreateUserCommand(body.name));
}
@Get(':id')
findOne(@Param('id') id: string) {
return this.queryBus.execute(new GetUserQuery(id));
}
}
次の一手:書き込み完了時にOutboxへ保存→専用プロセス/ワーカーがメッセージ配送→投影(Read DB 更新)。集約IDでパーティションすると順序保証が取りやすくなります。
運用でハマりやすい点と対策
反映遅延へのUX
- 「反映中」の表示、再読込や通知で完了を伝える
- 参照APIに最終更新時刻を含め、遅延検知の余地を残す
イベント配送の信頼性
- Outbox/Inboxで確実配送、リトライは冪等で安全に
- DLQ(デッドレター)で障害時にイベントを隔離
可観測性
- キュー長・失敗数・プロジェクションの最終更新時刻を監視
- 更新→投影→参照のE2E結合テストを継続実行
導入前チェックリスト
- 参照/更新の比率・SLOを定義し、現状のボトルネックを計測
- 参照DTOと更新集約の境界が明確
- Outbox/リトライ/DLQの設計がある
- 反映遅延時のUXが決まっている
- 最終一致を検証する結合テストがある
まとめ
CQRSは、読み取りと書き込みの非対称性を前提に、責務・モデル・技術選択を分ける設計です。まずは同一DBで論理分離し、効果が見えた箇所からOutbox+投影で段階的に広げるのが安全です。小さく始め、必要なところだけ深くしていきましょう〜。