The code behind the simulator and the audit: a typed tracking plan, idempotent delivery and a webhook receiver, with a test suite that runs. Real engineering, not a diagram. Full repo on disk: pipeline-repo/ (Node + TypeScript + Zod + Terraform).
$ npm test
RUN v1.6.1 pipeline-repo
✓ test/events.test.ts (6 tests)
✓ test/track.test.ts (5 tests)
✓ test/webhook.test.ts (4 tests)
Test Files 3 passed (3)
Tests 15 passed (15)
import { z } from 'zod';
/**
* The tracking plan, expressed as runtime-validated Zod schemas.
* This file is the Source of Truth: an event that is not defined here, or
* whose properties don't match, cannot be delivered. Changing it is a PR.
*/
export const EventSchemas = {
'Account Created': z.object({
method: z.enum(['email', 'wallet', 'oauth']),
referral_code: z.string().optional(),
utm_source: z.string().optional(),
}),
'KYC Approved': z.object({
tier: z.enum(['basic', 'advanced']),
days_since_signup: z.number().int().nonnegative(),
}),
'Deposit Completed': z.object({
amount_usd: z.number().positive(),
asset: z.string().min(1),
method: z.enum(['onchain', 'card', 'wire']),
is_first: z.boolean(),
}),
'Order Filled': z.object({
symbol: z.string().min(1),
contracts: z.number().int().positive(),
notional_usd: z.number().positive(),
fee: z.number().nonnegative(),
}),
} as const;
export type EventName = keyof typeof EventSchemas;
export type EventProps<K extends EventName> = z.infer<(typeof EventSchemas)[K]>;
/** Per-event destination routing: money events reach AppsFlyer, UI events don't. */
export const Destinations: Record<EventName, readonly string[]> = {
'Account Created': ['mixpanel', 'ga4', 'appsflyer', 'warehouse'],
'KYC Approved': ['mixpanel', 'appsflyer', 'warehouse'],
'Deposit Completed': ['mixpanel', 'ga4', 'appsflyer', 'warehouse'],
'Order Filled': ['mixpanel', 'appsflyer', 'warehouse'],
};
export type ValidationResult<K extends EventName> =
| { ok: true; data: EventProps<K> }
| { ok: false; errors: string[] };
/** Validate props against the plan. Unknown event names are rejected (schema drift). */
export function validate<K extends EventName>(event: K, props: unknown): ValidationResult<K> {
const schema = EventSchemas[event];
if (!schema) return { ok: false, errors: [`event "${String(event)}" is not in the tracking plan`] };
const parsed = schema.safeParse(props);
if (parsed.success) return { ok: true, data: parsed.data as EventProps<K> };
return {
ok: false,
errors: parsed.error.issues.map((i) => `${i.path.join('.') || '(root)'}: ${i.message}`),
};
}
import { EventName, EventProps, validate } from './events';
import { Sink, TrackMessage } from './segment';
/** Thrown when an event violates the tracking plan. The webhook turns this into a 422. */
export class TrackingError extends Error {
constructor(public readonly event: string, public readonly errors: string[]) {
super(`Tracking plan violation on "${event}": ${errors.join('; ')}`);
this.name = 'TrackingError';
}
}
export interface Identity {
userId?: string;
anonymousId?: string;
}
export interface TrackOptions {
identity: Identity;
/** A domain key (e.g. deposit_id) that makes the messageId deterministic so retries dedupe. */
key?: string;
/** Override the generated messageId entirely. */
messageId?: string;
timestamp?: string;
}
/** Deterministic messageId from a domain key, so a retry collapses to one delivery. */
export function makeMessageId(event: string, key: string): string {
return `${event.replace(/\s+/g, '_').toLowerCase()}:${key}`;
}
/**
* Build a typed, validated, idempotent track() bound to a sink.
* Money/identity events should always pass a stable `key` (e.g. the deposit id).
*/
export function createTracker(sink: Sink) {
return async function track<K extends EventName>(
event: K,
props: EventProps<K>,
opts: TrackOptions,
): Promise<TrackMessage> {
const result = validate(event, props);
if (!result.ok) throw new TrackingError(String(event), result.errors);
const fallbackKey = `${opts.identity.userId ?? opts.identity.anonymousId ?? 'anon'}:${opts.timestamp ?? ''}`;
const messageId = opts.messageId ?? makeMessageId(String(event), opts.key ?? fallbackKey);
const msg: TrackMessage = {
type: 'track',
event: String(event),
userId: opts.identity.userId,
anonymousId: opts.identity.anonymousId,
properties: result.data as Record<string, unknown>,
messageId,
timestamp: opts.timestamp,
};
await sink.track(msg);
return msg;
};
}
import { createTracker, TrackingError } from './track';
import { Sink } from './segment';
import { EventName } from './events';
export interface IncomingEvent {
event: string;
properties: unknown;
userId?: string;
anonymousId?: string;
/** Domain key for idempotency, e.g. deposit_id. Strongly recommended for money events. */
idempotencyKey?: string;
}
export interface HandlerResult {
status: number;
body: unknown;
}
/**
* Framework-agnostic receiver: internal services POST an event here, it is
* validated against the tracking plan, made idempotent and forwarded to Segment.
* Returns 202 on accept, 422 on a plan violation (and never delivers a bad event).
*/
export async function handleEvent(sink: Sink, body: IncomingEvent): Promise<HandlerResult> {
const track = createTracker(sink);
try {
const msg = await track(
body.event as EventName,
body.properties as never,
{ identity: { userId: body.userId, anonymousId: body.anonymousId }, key: body.idempotencyKey },
);
return { status: 202, body: { accepted: true, messageId: msg.messageId } };
} catch (err) {
if (err instanceof TrackingError) {
return { status: 422, body: { accepted: false, errors: err.errors } };
}
throw err; // unexpected: let the server return 500
}
}
import { describe, it, expect } from 'vitest';
import { createTracker, makeMessageId, TrackingError } from '../src/track';
import { InMemorySink } from '../src/segment';
const deposit = { amount_usd: 500, asset: 'USDT', method: 'onchain', is_first: true } as const;
describe('idempotent tracking', () => {
it('delivers a valid money event exactly once', async () => {
const sink = new InMemorySink();
const track = createTracker(sink);
await track('Deposit Completed', deposit, { identity: { userId: 'u1' }, key: 'dep_8842' });
expect(sink.countOf('Deposit Completed')).toBe(1);
});
it('dedupes a double-fired deposit with the same domain key', async () => {
const sink = new InMemorySink();
const track = createTracker(sink);
await track('Deposit Completed', deposit, { identity: { userId: 'u1' }, key: 'dep_8842' });
await track('Deposit Completed', deposit, { identity: { userId: 'u1' }, key: 'dep_8842' });
expect(sink.delivered).toHaveLength(1); // revenue is NOT double-counted
});
it('treats different domain keys as distinct deliveries', async () => {
const sink = new InMemorySink();
const track = createTracker(sink);
await track('Deposit Completed', deposit, { identity: { userId: 'u1' }, key: 'dep_1' });
await track('Deposit Completed', deposit, { identity: { userId: 'u1' }, key: 'dep_2' });
expect(sink.delivered).toHaveLength(2);
});
it('derives a deterministic messageId from the domain key', () => {
expect(makeMessageId('Deposit Completed', 'dep_8842')).toBe('deposit_completed:dep_8842');
});
it('throws TrackingError on a plan violation and delivers nothing', async () => {
const sink = new InMemorySink();
const track = createTracker(sink);
await expect(
// @ts-expect-error intentionally malformed props
track('Deposit Completed', { asset: 'USDT' }, { identity: { userId: 'u1' }, key: 'x' }),
).rejects.toBeInstanceOf(TrackingError);
expect(sink.delivered).toHaveLength(0);
});
});
# Analytics infrastructure as code: Segment sources + destinations managed via
# the segmentio/segment provider (wraps Segment's Public API). Every change here
# is a reviewed PR with a `terraform plan` diff and an audit trail.
#
# NOTE: TF state stores destination secrets in plaintext -> use an encrypted,
# locked remote backend (never local state).
terraform {
required_version = ">= 1.5"
required_providers {
segment = {
source = "segmentio/segment"
version = "~> 0.4"
}
}
backend "s3" {
bucket = "bitmex-tf-state"
key = "analytics/terraform.tfstate"
region = "ap-southeast-1"
encrypt = true # state holds secrets in plaintext
dynamodb_table = "tf-locks" # state locking
}
}
variable "segment_token" {
type = string
sensitive = true
}
variable "mixpanel_token" {
type = string
sensitive = true
}
provider "segment" {
token = var.segment_token
}
# --- Sources ---------------------------------------------------------------
resource "segment_source" "web_ui" {
slug = "bitmex-web-ui"
name = "BitMEX Web UI"
metadata {
id = "javascript" # analytics.js source
}
}
resource "segment_source" "server" {
slug = "bitmex-server"
name = "BitMEX Server (Node)"
metadata {
id = "node" # cloud-mode source for money / identity events
}
}
# --- Destinations ----------------------------------------------------------
resource "segment_destination" "mixpanel_prod" {
source = segment_source.server.id
name = "Mixpanel (Production)"
enabled = true
metadata_id = "mixpanel"
settings = jsonencode({
token = var.mixpanel_token
people = true
setAllTraitsByDefault = false # explicit traits only, no PII leakage
})
}
# Repeat per destination (GA4, AppsFlyer, Warehouse). Tracking-plan / Protocols
# rules also live in code so schema enforcement is reviewed, not clicked.
Edward Tay · reference implementation for the BitMEX Growth Engineer (Data Systems) role.