TypeScript PR

CQRS × アーキテクチャ(Typescript)

記事内に商品プロモーションを含む場合があります

TypeScriptで学ぶ導入判断と実装パターン(Express/NestJS)

この記事でわかること

CQRSは「更新(Command)」と「参照(Query)」を分けて、それぞれを最適化する設計です。ここでは、向き不向きの見極め、段階導入の型、TypeScriptでの実装(軽量ExpressとNestJSの2通り)、運用でのハマりどころまでをまとめます。

CQRSとは何か

CQRS(Command Query Responsibility Segregation)は、更新処理と参照処理の責務を分離します。単一モデルに両方を詰め込むと、参照は重く、更新は脆くなりがちです。更新側はドメインルールと整合性に集中し、参照側はDTOや非正規化で高速化します。結果として、スケールと保守性を両立しやすくなります。

期待できる効果

  • 読み取り/書き込みを独立にスケールしやすい
  • 参照DTO/更新集約の役割分担で実装とテストが単純化
  • 異種ストア(RDB+検索/キャッシュ等)を使い分けられる
  • 更新境界が明確になり、権限設計がやりやすい

コストとリスク

  • コンポーネント増で設計・運用が複雑化
  • 別DBなら反映遅延(最終的整合性)が前提に
  • 重複/順序/再送など非同期運用の論点が増える
  • 小規模CRUDでは過剰設計になりやすい

どんなときに採用する?

  • 参照トラフィックが多い、複雑な集計・検索が多い
  • 更新ロジックが厳密でビジネスルールが重い
  • 読み取りと書き込みで求めるDB特性が大きく異なる
  • システム全体ではなく特定の境界コンテキストだけ最適化したい

まずは従来構成で良く、課題が出る箇所から部分的にCQRSを入れるのが現実的です。

段階的なアーキテクチャ

ステップ1:論理分離(同一DBで開始)

  • CommandService(更新)とQueryService(参照)を分ける
  • 参照はDTO+専用クエリ、更新は集約+トランザクション
  • DBは1つのままなので整合性が扱いやすい

ステップ2:フルCQRS(読み書きでDBも分離)

  • 書き込み:RDBで厳密管理
  • 読み取り:非正規化ビュー、検索エンジン、キャッシュ
  • 同期:Outbox→メッセージ配信→プロジェクション(冪等)

TypeScript実装例①:軽量Express版(論理分離→Outbox導入の土台)

依存は express, 任意のDBクライアント(例:mysql2/pg/better-sqlite3 など想定)。記法はあくまで最小例です。

// domain/User.ts
export class User {
  constructor(public readonly id: string, public name: string) {
    if (!name.trim()) throw new Error("User.name must not be empty");
  }
  rename(newName: string) {
    if (!newName.trim()) throw new Error("empty name");
    this.name = newName;
  }
}

// application/commands/CreateUser.ts
export interface CreateUserCommand { name: string; }
export interface IUserRepository {
  add(user: User): Promise<void>;
}
export class CreateUserHandler {
  constructor(private readonly repo: IUserRepository) {}
  async handle(cmd: CreateUserCommand): Promise<{ id: string }> {
    const id = crypto.randomUUID();
    const user = new User(id, cmd.name);
    await this.repo.add(user);
    return { id };
  }
}

// application/queries/GetUserDetail.ts
export interface GetUserDetailQuery { id: string; }
export type UserDto = { id: string; name: string; createdAt: string; };
export interface IReadDb {
  getUserDto(id: string): Promise<UserDto | null>;
}
export class GetUserDetailHandler {
  constructor(private readonly readDb: IReadDb) {}
  async handle(q: GetUserDetailQuery) { return this.readDb.getUserDto(q.id); }
}

// infrastructure/SqlUserRepository.ts
import type { IUserRepository } from "../application/commands/CreateUser";
import { User } from "../domain/User";
export class SqlUserRepository implements IUserRepository {
  constructor(private readonly db: { execute: (sql: string, params?: any[]) => Promise<any> }) {}
  async add(user: User): Promise<void> {
    await this.db.execute(
      "INSERT INTO users(id, name, created_at) VALUES(?, ?, CURRENT_TIMESTAMP)",
      [user.id, user.name]
    );
    // ★Outbox(同一Txで書くのが理想。実運用はTx境界できっちり。)
    await this.db.execute(
      "INSERT INTO outbox(id, type, payload, created_at) VALUES(?, ?, ?, CURRENT_TIMESTAMP)",
      [crypto.randomUUID(), "UserCreated", JSON.stringify({ id: user.id, name: user.name })]
    );
  }
}

// infrastructure/ReadDb.ts
import type { IReadDb, UserDto } from "../application/queries/GetUserDetail";
export class ReadDb implements IReadDb {
  constructor(private readonly db: { query: (sql: string, params?: any[]) => Promise<any[]> }) {}
  async getUserDto(id: string): Promise<UserDto | null> {
    const rows = await this.db.query("SELECT id, name, created_at FROM users WHERE id = ?", [id]);
    if (rows.length === 0) return null;
    const r = rows[0];
    return { id: r.id, name: r.name, createdAt: r.created_at };
  }
}

// presentation/http.ts
import express from "express";
import { CreateUserHandler } from "../application/commands/CreateUser";
import { GetUserDetailHandler } from "../application/queries/GetUserDetail";
import { SqlUserRepository } from "../infrastructure/SqlUserRepository";
import { ReadDb } from "../infrastructure/ReadDb";

const app = express();
app.use(express.json());

// 実際はDBクライアントをDI
const writeDb = /* your write connection */;
const readDb  = /* your read connection */;

const createUser = new CreateUserHandler(new SqlUserRepository(writeDb));
const getUser    = new GetUserDetailHandler(new ReadDb(readDb));

app.post("/users", async (req, res) => {
  try {
    const result = await createUser.handle({ name: req.body.name });
    res.status(201).json(result);
  } catch (e: any) {
    res.status(400).json({ error: e.message });
  }
});

app.get("/users/:id", async (req, res) => {
  const dto = await getUser.handle({ id: req.params.id });
  if (!dto) return res.status(404).end();
  res.json(dto);
});

app.listen(3000);

次の一手:Outboxのレコードをポーリング or ストリームで拾い、Read用テーブル検索エンジン冪等で投影します(イベントIDで重複適用を無害化)。


TypeScript実装例②:NestJS+@nestjs/cqrs版(ハンドラ分離が明確)

依存:@nestjs/common, @nestjs/core, @nestjs/cqrs など。DBは任意(TypeORM/Prisma/Knex/生SQL 等)。

// commands/create-user.command.ts
export class CreateUserCommand {
  constructor(public readonly name: string) {}
}

// commands/create-user.handler.ts
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
  constructor(private readonly repo: UserRepository) {}
  async execute(cmd: CreateUserCommand): Promise<{ id: string }> {
    const id = crypto.randomUUID();
    const user = new User(id, cmd.name);
    await this.repo.add(user); // ここでOutboxにも記録
    return { id };
  }
}

// queries/get-user.query.ts
export class GetUserQuery { constructor(public readonly id: string) {} }

// queries/get-user.handler.ts
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
@QueryHandler(GetUserQuery)
export class GetUserHandler implements IQueryHandler<GetUserQuery> {
  constructor(private readonly readDb: ReadDb) {}
  async execute(q: GetUserQuery) {
    return this.readDb.getUserDto(q.id); // DTOに最適化
  }
}

// users.controller.ts
import { Controller, Get, Param, Post, Body } from '@nestjs/common';
import { CommandBus, QueryBus } from '@nestjs/cqrs';

@Controller('users')
export class UsersController {
  constructor(private readonly commandBus: CommandBus, private readonly queryBus: QueryBus) {}

  @Post()
  create(@Body() body: { name: string }) {
    return this.commandBus.execute(new CreateUserCommand(body.name));
  }

  @Get(':id')
  findOne(@Param('id') id: string) {
    return this.queryBus.execute(new GetUserQuery(id));
  }
}

次の一手:書き込み完了時にOutboxへ保存→専用プロセス/ワーカーがメッセージ配送→投影(Read DB 更新)。集約IDでパーティションすると順序保証が取りやすくなります。


運用でハマりやすい点と対策

反映遅延へのUX

  • 「反映中」の表示、再読込や通知で完了を伝える
  • 参照APIに最終更新時刻を含め、遅延検知の余地を残す

イベント配送の信頼性

  • Outbox/Inboxで確実配送、リトライは冪等で安全に
  • DLQ(デッドレター)で障害時にイベントを隔離

可観測性

  • キュー長・失敗数・プロジェクションの最終更新時刻を監視
  • 更新→投影→参照のE2E結合テストを継続実行

導入前チェックリスト

  • 参照/更新の比率・SLOを定義し、現状のボトルネックを計測
  • 参照DTOと更新集約の境界が明確
  • Outbox/リトライ/DLQの設計がある
  • 反映遅延時のUXが決まっている
  • 最終一致を検証する結合テストがある

まとめ

CQRSは、読み取りと書き込みの非対称性を前提に、責務・モデル・技術選択を分ける設計です。まずは同一DBで論理分離し、効果が見えた箇所からOutbox+投影で段階的に広げるのが安全です。小さく始め、必要なところだけ深くしていきましょう〜。