Reference implementation

The code behind the simulator and the audit: a typed tracking plan, idempotent delivery and a webhook receiver, with a test suite that runs. Real engineering, not a diagram. Full repo on disk: pipeline-repo/ (Node + TypeScript + Zod + Terraform).

✓ npm test → 15 passed (15) ✓ tsc --noEmit → clean vitest · strict TypeScript
$ npm test

 RUN  v1.6.1  pipeline-repo

 ✓ test/events.test.ts  (6 tests)
 ✓ test/track.test.ts   (5 tests)
 ✓ test/webhook.test.ts (4 tests)

 Test Files  3 passed (3)
      Tests  15 passed (15)
src/events.tsThe tracking plan as Zod schemas. The Source of Truth: unknown events or bad shapes can't be delivered.
import { z } from 'zod';

/**
 * The tracking plan, expressed as runtime-validated Zod schemas.
 * This file is the Source of Truth: an event that is not defined here, or
 * whose properties don't match, cannot be delivered. Changing it is a PR.
 */
export const EventSchemas = {
  'Account Created': z.object({
    method: z.enum(['email', 'wallet', 'oauth']),
    referral_code: z.string().optional(),
    utm_source: z.string().optional(),
  }),
  'KYC Approved': z.object({
    tier: z.enum(['basic', 'advanced']),
    days_since_signup: z.number().int().nonnegative(),
  }),
  'Deposit Completed': z.object({
    amount_usd: z.number().positive(),
    asset: z.string().min(1),
    method: z.enum(['onchain', 'card', 'wire']),
    is_first: z.boolean(),
  }),
  'Order Filled': z.object({
    symbol: z.string().min(1),
    contracts: z.number().int().positive(),
    notional_usd: z.number().positive(),
    fee: z.number().nonnegative(),
  }),
} as const;

export type EventName = keyof typeof EventSchemas;
export type EventProps<K extends EventName> = z.infer<(typeof EventSchemas)[K]>;

/** Per-event destination routing: money events reach AppsFlyer, UI events don't. */
export const Destinations: Record<EventName, readonly string[]> = {
  'Account Created': ['mixpanel', 'ga4', 'appsflyer', 'warehouse'],
  'KYC Approved': ['mixpanel', 'appsflyer', 'warehouse'],
  'Deposit Completed': ['mixpanel', 'ga4', 'appsflyer', 'warehouse'],
  'Order Filled': ['mixpanel', 'appsflyer', 'warehouse'],
};

export type ValidationResult<K extends EventName> =
  | { ok: true; data: EventProps<K> }
  | { ok: false; errors: string[] };

/** Validate props against the plan. Unknown event names are rejected (schema drift). */
export function validate<K extends EventName>(event: K, props: unknown): ValidationResult<K> {
  const schema = EventSchemas[event];
  if (!schema) return { ok: false, errors: [`event "${String(event)}" is not in the tracking plan`] };
  const parsed = schema.safeParse(props);
  if (parsed.success) return { ok: true, data: parsed.data as EventProps<K> };
  return {
    ok: false,
    errors: parsed.error.issues.map((i) => `${i.path.join('.') || '(root)'}: ${i.message}`),
  };
}
src/track.tsTyped, validated, idempotent track(). Deterministic messageId from a domain key so retries dedupe.
import { EventName, EventProps, validate } from './events';
import { Sink, TrackMessage } from './segment';

/** Thrown when an event violates the tracking plan. The webhook turns this into a 422. */
export class TrackingError extends Error {
  constructor(public readonly event: string, public readonly errors: string[]) {
    super(`Tracking plan violation on "${event}": ${errors.join('; ')}`);
    this.name = 'TrackingError';
  }
}

export interface Identity {
  userId?: string;
  anonymousId?: string;
}

export interface TrackOptions {
  identity: Identity;
  /** A domain key (e.g. deposit_id) that makes the messageId deterministic so retries dedupe. */
  key?: string;
  /** Override the generated messageId entirely. */
  messageId?: string;
  timestamp?: string;
}

/** Deterministic messageId from a domain key, so a retry collapses to one delivery. */
export function makeMessageId(event: string, key: string): string {
  return `${event.replace(/\s+/g, '_').toLowerCase()}:${key}`;
}

/**
 * Build a typed, validated, idempotent track() bound to a sink.
 * Money/identity events should always pass a stable `key` (e.g. the deposit id).
 */
export function createTracker(sink: Sink) {
  return async function track<K extends EventName>(
    event: K,
    props: EventProps<K>,
    opts: TrackOptions,
  ): Promise<TrackMessage> {
    const result = validate(event, props);
    if (!result.ok) throw new TrackingError(String(event), result.errors);

    const fallbackKey = `${opts.identity.userId ?? opts.identity.anonymousId ?? 'anon'}:${opts.timestamp ?? ''}`;
    const messageId = opts.messageId ?? makeMessageId(String(event), opts.key ?? fallbackKey);

    const msg: TrackMessage = {
      type: 'track',
      event: String(event),
      userId: opts.identity.userId,
      anonymousId: opts.identity.anonymousId,
      properties: result.data as Record<string, unknown>,
      messageId,
      timestamp: opts.timestamp,
    };

    await sink.track(msg);
    return msg;
  };
}
src/webhook.tsBackend to third-party receiver: validate, dedupe, forward. 202 on accept, 422 on a plan violation.
import { createTracker, TrackingError } from './track';
import { Sink } from './segment';
import { EventName } from './events';

export interface IncomingEvent {
  event: string;
  properties: unknown;
  userId?: string;
  anonymousId?: string;
  /** Domain key for idempotency, e.g. deposit_id. Strongly recommended for money events. */
  idempotencyKey?: string;
}

export interface HandlerResult {
  status: number;
  body: unknown;
}

/**
 * Framework-agnostic receiver: internal services POST an event here, it is
 * validated against the tracking plan, made idempotent and forwarded to Segment.
 * Returns 202 on accept, 422 on a plan violation (and never delivers a bad event).
 */
export async function handleEvent(sink: Sink, body: IncomingEvent): Promise<HandlerResult> {
  const track = createTracker(sink);
  try {
    const msg = await track(
      body.event as EventName,
      body.properties as never,
      { identity: { userId: body.userId, anonymousId: body.anonymousId }, key: body.idempotencyKey },
    );
    return { status: 202, body: { accepted: true, messageId: msg.messageId } };
  } catch (err) {
    if (err instanceof TrackingError) {
      return { status: 422, body: { accepted: false, errors: err.errors } };
    }
    throw err; // unexpected: let the server return 500
  }
}
test/track.test.tsThe proof. Idempotency and schema-violation tests that actually run.
import { describe, it, expect } from 'vitest';
import { createTracker, makeMessageId, TrackingError } from '../src/track';
import { InMemorySink } from '../src/segment';

const deposit = { amount_usd: 500, asset: 'USDT', method: 'onchain', is_first: true } as const;

describe('idempotent tracking', () => {
  it('delivers a valid money event exactly once', async () => {
    const sink = new InMemorySink();
    const track = createTracker(sink);
    await track('Deposit Completed', deposit, { identity: { userId: 'u1' }, key: 'dep_8842' });
    expect(sink.countOf('Deposit Completed')).toBe(1);
  });

  it('dedupes a double-fired deposit with the same domain key', async () => {
    const sink = new InMemorySink();
    const track = createTracker(sink);
    await track('Deposit Completed', deposit, { identity: { userId: 'u1' }, key: 'dep_8842' });
    await track('Deposit Completed', deposit, { identity: { userId: 'u1' }, key: 'dep_8842' });
    expect(sink.delivered).toHaveLength(1); // revenue is NOT double-counted
  });

  it('treats different domain keys as distinct deliveries', async () => {
    const sink = new InMemorySink();
    const track = createTracker(sink);
    await track('Deposit Completed', deposit, { identity: { userId: 'u1' }, key: 'dep_1' });
    await track('Deposit Completed', deposit, { identity: { userId: 'u1' }, key: 'dep_2' });
    expect(sink.delivered).toHaveLength(2);
  });

  it('derives a deterministic messageId from the domain key', () => {
    expect(makeMessageId('Deposit Completed', 'dep_8842')).toBe('deposit_completed:dep_8842');
  });

  it('throws TrackingError on a plan violation and delivers nothing', async () => {
    const sink = new InMemorySink();
    const track = createTracker(sink);
    await expect(
      // @ts-expect-error intentionally malformed props
      track('Deposit Completed', { asset: 'USDT' }, { identity: { userId: 'u1' }, key: 'x' }),
    ).rejects.toBeInstanceOf(TrackingError);
    expect(sink.delivered).toHaveLength(0);
  });
});
terraform/main.tfAnalytics infrastructure as code: Segment sources and destinations, encrypted+locked state.
# Analytics infrastructure as code: Segment sources + destinations managed via
# the segmentio/segment provider (wraps Segment's Public API). Every change here
# is a reviewed PR with a `terraform plan` diff and an audit trail.
#
# NOTE: TF state stores destination secrets in plaintext -> use an encrypted,
# locked remote backend (never local state).

terraform {
  required_version = ">= 1.5"
  required_providers {
    segment = {
      source  = "segmentio/segment"
      version = "~> 0.4"
    }
  }
  backend "s3" {
    bucket         = "bitmex-tf-state"
    key            = "analytics/terraform.tfstate"
    region         = "ap-southeast-1"
    encrypt        = true            # state holds secrets in plaintext
    dynamodb_table = "tf-locks"      # state locking
  }
}

variable "segment_token" {
  type      = string
  sensitive = true
}

variable "mixpanel_token" {
  type      = string
  sensitive = true
}

provider "segment" {
  token = var.segment_token
}

# --- Sources ---------------------------------------------------------------
resource "segment_source" "web_ui" {
  slug = "bitmex-web-ui"
  name = "BitMEX Web UI"
  metadata {
    id = "javascript" # analytics.js source
  }
}

resource "segment_source" "server" {
  slug = "bitmex-server"
  name = "BitMEX Server (Node)"
  metadata {
    id = "node" # cloud-mode source for money / identity events
  }
}

# --- Destinations ----------------------------------------------------------
resource "segment_destination" "mixpanel_prod" {
  source      = segment_source.server.id
  name        = "Mixpanel (Production)"
  enabled     = true
  metadata_id = "mixpanel"

  settings = jsonencode({
    token                 = var.mixpanel_token
    people                = true
    setAllTraitsByDefault = false # explicit traits only, no PII leakage
  })
}

# Repeat per destination (GA4, AppsFlyer, Warehouse). Tracking-plan / Protocols
# rules also live in code so schema enforcement is reviewed, not clicked.

Edward Tay · reference implementation for the BitMEX Growth Engineer (Data Systems) role.