MarketPulse is a backend/data engineering learning project that simulates a production-grade data pipeline.
It ingests external market data, stores raw payloads, transforms them into structured PostgreSQL records, exposes them through a REST API, caches latest values in Redis, and tracks the ingestion lifecycle through logs and database records.
The crypto market API is used only as an external data source. The main goal of the project is to demonstrate data ingestion, transformation, storage, caching, monitoring, SQL analysis and cloud-ready backend architecture.
MarketPulse is a technical learning project designed to answer one business question:
"What is happening on the crypto market right now β and what happened recently?"
The project demonstrates a complete backend/data pipeline flow:
- External market data ingestion from CoinGecko
- Raw payload storage in PostgreSQL
- ETL-style transformation into structured market records
- Data quality checks before inserting structured data
- Ingestion run tracking for pipeline observability
- Redis cache for frequently accessed latest prices
- REST API exposure through Express.js
- Swagger / OpenAPI documentation through
/api-docs - Scheduled ingestion with
node-cron - Structured JSON logs with Winston
- SQL analysis queries for monitoring and diagnostics
- SQL index and performance documentation
- Unit and API route integration tests for transformation, cache, data quality, ingestion orchestration and Express routes
- GitHub Actions CI for automated format checks and test execution
- AWS architecture mapping for a future cloud version
MarketPulse currently includes:
- Express.js REST API
- Layered route/service/repository architecture
- CoinGecko market data ingestion
- PostgreSQL storage for raw and processed market data
- PostgreSQL ingestion run tracking through
ingestion_runs - Data quality checks stored in
data_quality_checks - ETL-style transformation layer
- Redis cache for latest BTC / ETH / SOL prices
- Scheduled ingestion every 60 seconds with
node-cron - Monitoring API for ingestion runs and quality checks
- Swagger / OpenAPI documentation available at
/api-docs - Structured JSON logs with Winston
- SQL analysis queries for monitoring and diagnostics
- SQL index and performance documentation
- Unit and API route integration tests with Jest and Supertest for transformation, cache, data quality and ingestion orchestration
- Docker Compose setup for PostgreSQL and Redis
- Prettier formatting
- GitHub Actions CI for automated formatting checks and test execution
Planned next steps:
- Deeper end-to-end integration tests with PostgreSQL and Redis
- AWS proof of concept: EventBridge Scheduler β Lambda β S3 β CloudWatch
- ELK / Kibana dashboard
CoinGecko API
β
βΌ
Scheduled Job / Manual Trigger
(node-cron or POST /api/prices/fetch)
β
βΌ
Ingestion Service
(fetch BTC, ETH, SOL prices)
β
ββββΊ ingestion_runs
β (track status, duration, records, errors)
β
βΌ
Raw Data Store
PostgreSQL: raw_prices
(raw JSON payload)
β
βΌ
Transformation Service
(normalize external payload into internal records)
β
βΌ
Data Quality Checks
PostgreSQL: data_quality_checks
(validate payload and transformed records)
β
βΌ
Processed Data Store
PostgreSQL: market_data
(structured market records)
β
βΌ
Redis Cache
latest:BTC / latest:ETH / latest:SOL
β
βΌ
REST API
Express.js
The current implementation is local, but the architecture is designed so each component can be mapped to a cloud-native AWS equivalent later.
For the AWS cloud-ready architecture mapping, see docs/AWS_ARCHITECTURE.md.
The API follows a layered backend structure:
HTTP routes
β
βΌ
Services
(application logic, cache coordination, ingestion monitoring)
β
βΌ
Repositories / infrastructure clients
(SQL access, PostgreSQL, Redis, external APIs)
Routes are responsible for HTTP requests and responses. Services contain application logic. Repositories isolate SQL/database access.
MarketPulse includes automated tests for the main backend/data pipeline components and API routes:
- cache service
- transformation service
- data quality service
- ingestion orchestration service
- API route integration tests with Supertest
Run the test suite locally:
npm testCheck formatting:
npm run format:checkFormat the codebase:
npm run formatThe project also includes a GitHub Actions CI workflow that runs automatically on every push and pull request to main.
The CI pipeline performs:
- dependency installation
- Prettier formatting check
- Jest test suite execution
This ensures that the project can be installed, formatted and tested successfully in a clean environment, not only on the local machine.
The AWS version is not implemented yet. This table explains how the current local components would map to AWS services in a future version.
| Local Component | AWS Equivalent | Purpose |
|---|---|---|
| node-cron scheduled job | EventBridge Scheduler | Trigger ingestion periodically |
| Ingestion service | AWS Lambda | Fetch data from CoinGecko |
| Raw PostgreSQL JSON storage | S3 raw bucket | Store immutable raw payloads |
| Transformation service | AWS Lambda | Transform raw data into structured records |
| PostgreSQL processed data | RDS PostgreSQL | Store structured market records |
| Redis cache | ElastiCache Redis | Cache latest prices |
| Express REST API | API Gateway + Lambda | Expose data through HTTP endpoints |
| Winston logs | CloudWatch Logs | Centralized logs and monitoring |
Future AWS target flow:
EventBridge Scheduler
β
βΌ
Lambda Ingestion
β
ββββΊ S3 Raw Bucket
β
ββββΊ EventBridge Event
β
βΌ
Lambda Transformation
β
ββββΊ RDS PostgreSQL
ββββΊ ElastiCache Redis
β
βΌ
API Gateway + Lambda
| Layer | Technology | Purpose |
|---|---|---|
| Runtime | Node.js | Backend runtime |
| API Framework | Express.js | REST API |
| API Documentation | Swagger / OpenAPI | Interactive REST API documentation |
| Database | PostgreSQL | Raw and structured data storage |
| Cache | Redis | Latest price caching |
| HTTP Client | Axios | CoinGecko API calls |
| Scheduler | node-cron | Local scheduled ingestion |
| Logging | Winston | Structured JSON logs |
| Testing | Jest, Supertest | Unit and API route integration tests |
| CI | GitHub Actions | Automated checks on push/PR |
| Containerization | Docker Compose | Local PostgreSQL and Redis |
| Formatting | Prettier | Code formatting |
marketpulse-pipeline/
β
βββ README.md
βββ docker-compose.yml
βββ .env.example
βββ package.json
β
βββ .github/
β βββ workflows/
β βββ ci.yml
β
βββ db/
β βββ init.sql
β
βββ docs/
β βββ ARCHITECTURE.md
β βββ sql-analysis/
β βββ marketpulse_analysis.sql
β βββ indexes.md
β
βββ src/
β βββ app.js
β βββ server.js
β β
β βββ config/
β β βββ database.js
β β βββ redis.js
β β βββ logger.js
β β βββ swagger.js
β β
β βββ jobs/
β β βββ priceIngestion.job.js
β β
β βββ services/
β β βββ health.service.js
β β βββ marketData.service.js
β β βββ ingestionMonitoring.service.js
β β βββ ingestion.service.js
β β βββ transformation.service.js
β β βββ dataQuality.service.js
β β βββ cache.service.js
β β
β βββ repositories/
β β βββ rawData.repository.js
β β βββ marketData.repository.js
β β βββ ingestionRun.repository.js
β β βββ dataQuality.repository.js
β β
β βββ routes/
β β βββ prices.routes.js
β β βββ ingestion.routes.js
β β βββ health.routes.js
β β
β βββ utils/
β βββ calculateVariation.js
β
βββ tests/
βββ api.test.js
βββ cache.test.js
βββ dataQuality.test.js
βββ ingestion.test.js
βββ transformation.test.js
Interactive API documentation is available locally through Swagger UI:
http://localhost:3000/api-docs
| Method | Endpoint | Description |
|---|---|---|
| GET | /health |
Checks API, PostgreSQL and Redis health |
| POST | /api/prices/fetch |
Manually triggers CoinGecko ingestion |
| GET | /api/prices/latest |
Returns latest prices for all tracked symbols |
| GET | /api/prices/latest/:symbol |
Returns latest price for BTC, ETH or SOL using Redis cache first |
| GET | /api/prices/history/:symbol?limit=10 |
Returns price history for a symbol |
| GET | /api/ingestions/runs |
Returns latest ingestion runs |
| GET | /api/ingestions/runs/failed |
Returns failed ingestion runs |
| GET | /api/ingestions/runs/:id |
Returns details of a specific ingestion run |
| GET | /api/ingestions/runs/:id/quality-checks |
Returns data quality checks for a specific ingestion run |
| GET | /api/ingestions/quality-checks/failed |
Returns failed data quality checks |
http://localhost:3000/api-docs
curl http://localhost:3000/healthExample response:
{
"status": "UP",
"service": "marketpulse-api",
"timestamp": "2026-04-24T14:00:00.000Z",
"dependencies": {
"database": "UP",
"redis": "UP"
}
}curl -X POST http://localhost:3000/api/prices/fetchExample response:
{
"status": "SUCCESS",
"message": "Prices fetched, transformed and stored successfully",
"data": {
"ingestionRunId": 7,
"rawPriceId": 2903,
"recordsCount": 3,
"records": [
{
"id": 8707,
"symbol": "BTC",
"price": "78241.000000",
"currency": "USD",
"variation24h": "2.5365",
"high24h": null,
"low24h": null,
"raw_price_id": 2903,
"captured_at": "2026-05-01T14:40:29.478Z"
}
]
}
}curl http://localhost:3000/api/prices/latestcurl http://localhost:3000/api/prices/latest/BTCExample response:
{
"status": "SUCCESS",
"source": "cache",
"data": {
"id": 1,
"symbol": "BTC",
"price": "78000.000000",
"currency": "USD",
"variation24h": "0.5124",
"high24h": null,
"low24h": null,
"raw_price_id": 1,
"captured_at": "2026-04-24T14:00:00.000Z"
}
}curl "http://localhost:3000/api/prices/history/BTC?limit=10"curl http://localhost:3000/api/ingestions/runscurl http://localhost:3000/api/ingestions/runs/failedcurl http://localhost:3000/api/ingestions/runs/7/quality-checksExample response:
{
"status": "SUCCESS",
"count": 6,
"data": [
{
"id": 1,
"ingestion_run_id": 7,
"check_name": "PAYLOAD_NOT_EMPTY",
"status": "PASSED",
"checked_at": "2026-05-01T14:40:29.483Z",
"error_message": null
}
]
}curl http://localhost:3000/api/ingestions/quality-checks/failedStores the original CoinGecko response before transformation.
CREATE TABLE IF NOT EXISTS raw_prices (
id SERIAL PRIMARY KEY,
payload JSONB NOT NULL,
fetched_at TIMESTAMP DEFAULT NOW()
);Stores structured market data records.
CREATE TABLE IF NOT EXISTS market_data (
id SERIAL PRIMARY KEY,
symbol VARCHAR(10) NOT NULL,
price DECIMAL(18, 6) NOT NULL,
currency VARCHAR(5) DEFAULT 'USD',
variation24h DECIMAL(8, 4),
high24h DECIMAL(18, 6),
low24h DECIMAL(18, 6),
raw_price_id INTEGER REFERENCES raw_prices(id),
captured_at TIMESTAMP NOT NULL
);Tracks every execution of the ingestion pipeline.
CREATE TABLE IF NOT EXISTS ingestion_runs (
id SERIAL PRIMARY KEY,
source VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL,
started_at TIMESTAMP NOT NULL DEFAULT NOW(),
ended_at TIMESTAMP,
duration_ms INTEGER,
records_fetched INTEGER DEFAULT 0,
records_inserted INTEGER DEFAULT 0,
error_message TEXT
);Each ingestion cycle creates one row in ingestion_runs.
Typical statuses:
RUNNING
SUCCESS
FAILED
Stores the result of data quality validations executed during each ingestion run.
CREATE TABLE IF NOT EXISTS data_quality_checks (
id SERIAL PRIMARY KEY,
ingestion_run_id INTEGER REFERENCES ingestion_runs(id),
check_name VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL,
checked_at TIMESTAMP DEFAULT NOW(),
error_message TEXT
);Current quality checks:
PAYLOAD_NOT_EMPTY
EXPECTED_COINS_PRESENT
TRANSFORMED_RECORDS_NOT_EMPTY
SYMBOL_PRESENT
PRICE_NOT_NULL
PRICE_POSITIVE
Each successful ingestion cycle creates:
1 row in raw_prices
1 row in ingestion_runs
6 rows in data_quality_checks
3 rows in market_data: BTC, ETH, SOL
If a data quality check fails, the quality check results are still stored, the ingestion run is marked as FAILED, and structured market data is not inserted.
node-cron job or POST /api/prices/fetch
β
βΌ
ingestion.service.fetchTransformAndStorePrices()
β
βββ Create ingestion run with status RUNNING
βββ Fetch prices from CoinGecko
βββ Store raw JSON payload in raw_prices
βββ Transform CoinGecko payload into market records
βββ Run data quality checks
βββ Store quality check results in data_quality_checks
βββ Stop the pipeline if quality checks fail
βββ Store BTC / ETH / SOL records in market_data
βββ Refresh Redis cache keys:
β - latest:BTC
β - latest:ETH
β - latest:SOL
βββ Mark ingestion run as SUCCESS or FAILED
The local scheduled job uses node-cron. In a cloud version, this would naturally map to EventBridge Scheduler triggering a Lambda function.
MarketPulse is designed to demonstrate backend and data engineering concepts beyond a simple API.
The original CoinGecko payload is stored in raw_prices before transformation. This allows auditability and potential replay if the transformation logic changes.
Each pipeline execution is tracked in ingestion_runs with:
- source
- status
- start and end timestamps
- duration
- records fetched
- records inserted
- error message if the run fails
This makes the pipeline observable and easier to troubleshoot.
Each ingestion run executes quality checks before inserting structured records into market_data.
Current checks validate that:
- the payload is not empty;
- expected coins are present;
- transformed records are generated;
- each record has a symbol;
- prices are not null;
- prices are positive.
If a quality check fails, the check results are stored in data_quality_checks, the ingestion run is marked as FAILED, and the pipeline stops before inserting structured market data.
The project includes SQL queries under docs/sql-analysis/ to monitor:
- latest prices per symbol;
- ingestion success rate;
- ingestion duration;
- records inserted per day;
- failed quality checks;
- quality check summary per ingestion run.
The project documents SQL index usage in:
docs/sql-analysis/indexes.md
The main index supports price history queries by symbol and timestamp:
CREATE INDEX IF NOT EXISTS idx_market_data_symbol_captured_at
ON market_data(symbol, captured_at DESC);The latest price endpoint checks Redis first:
GET /api/prices/latest/BTC
β
βββ Cache hit β return Redis value with source: "cache"
βββ Cache miss β query PostgreSQL, refresh Redis, return source: "database"
Redis keys:
latest:BTC
latest:ETH
latest:SOL
Cache TTL:
300 seconds
The application uses structured JSON logs with Winston.
Example logs:
{
"level": "info",
"message": {
"event": "INGESTION_START",
"source": "CoinGecko",
"coins": ["bitcoin", "ethereum", "solana"]
},
"timestamp": "2026-04-24T14:00:00.000Z"
}{
"level": "info",
"message": {
"event": "CACHE_HIT",
"key": "latest:BTC"
},
"timestamp": "2026-04-24T14:00:10.000Z"
}These logs are designed to be compatible with future observability tools such as ELK or CloudWatch.
The project includes unit tests for:
- price variation calculation
- CoinGecko payload transformation
- missing coin handling
- Redis cache set/get/delete logic
- cache hit and cache miss behavior
- Redis key normalization
- cache TTL usage
- data quality validation rules
- ingestion orchestration success path
- ingestion orchestration failure path
- API health check route
- manual ingestion trigger route
- latest price routes
- price history route
- ingestion monitoring routes
- quality check monitoring routes
The API route tests use Supertest with mocked repositories and services, so they validate Express routing, status codes and response structures without requiring PostgreSQL, Redis or CoinGecko during CI.
The ingestion orchestration tests verify that:
- data quality checks are persisted before failing an ingestion run;
- market data is not inserted when quality checks fail;
- failed ingestion runs are marked as
FAILED; - successful ingestion runs are marked as
SUCCESS.
Run tests:
npm testRun formatting:
npm run formatCheck formatting:
npm run format:check- Node.js >= 18
- Docker
- Docker Compose
- Git
git clone https://github.com/FekherJ/MarketPulse.git
cd MarketPulse
cp .env.example .env
docker-compose up -d
npm install
npm run devThe API runs on:
http://localhost:3000
Swagger UI is available at:
http://localhost:3000/api-docs
The scheduled ingestion job starts automatically when the server starts.
docker-compose downdocker psPORT=3000
NODE_ENV=development
DB_HOST=localhost
DB_PORT=5432
DB_NAME=marketpulse
DB_USER=postgres
DB_PASSWORD=postgres
REDIS_HOST=localhost
REDIS_PORT=6379
COINGECKO_BASE_URL=https://api.coingecko.com/api/v3
FETCH_INTERVAL_SECONDS=60- Local architecture design
- Docker Compose setup for PostgreSQL and Redis
- PostgreSQL schema:
raw_pricesandmarket_data - PostgreSQL schema:
ingestion_runs - PostgreSQL schema:
data_quality_checks - Express.js REST API
- Health check endpoint
- CoinGecko ingestion service
- ETL-style transformation layer
- Redis cache for latest prices
- Scheduled ingestion with
node-cron - Ingestion run tracking
- Monitoring API for ingestion runs
- Data quality checks
- API endpoints for quality checks
- Structured JSON logging with Winston
- SQL analysis queries
- SQL index and performance documentation
- Unit tests for transformation logic
- Unit tests for cache service
- Unit tests for data quality service
- Unit tests for ingestion orchestration
- API route integration tests with Supertest
- Swagger / OpenAPI documentation
- Prettier formatting
- GitHub Actions CI
- Deeper end-to-end integration tests with PostgreSQL and Redis
- AWS proof of concept: EventBridge Scheduler β Lambda β S3 β CloudWatch
- ELK / Kibana dashboard
MarketPulse was designed to strengthen hands-on understanding of backend, data pipeline and integration engineering concepts through a practical end-to-end implementation.
The project ingests external market data from CoinGecko on a scheduled basis. Raw responses are stored before transformation, which acts as a local raw landing zone similar to what S3 would provide in a cloud architecture. The transformation layer normalizes the payload into structured BTC, ETH and SOL market records, which are stored in PostgreSQL.
The project includes an ingestion monitoring layer through the ingestion_runs table. Each pipeline execution is tracked with a status, duration, number of records fetched, number of records inserted and error message if the run fails.
The project also includes a data quality layer through the data_quality_checks table. Each ingestion run validates that the payload is not empty, expected assets are present, transformed records exist, symbols are present, and prices are valid before storing structured data. If a check fails, the results are persisted and the ingestion run is marked as failed.
Redis is used as a caching layer for frequently requested latest prices. The API checks Redis first, falls back to PostgreSQL on cache miss, and refreshes the cache with a TTL.
The project also includes structured JSON logs, SQL analysis queries, performance notes around indexes, automated tests for the cache, transformation, data quality, API routes and ingestion orchestration layers, and a GitHub Actions CI workflow that checks formatting and runs the test suite on every push.
Swagger/OpenAPI documentation is exposed through /api-docs, making the API easier to explore and validate during development.
Locally, scheduled ingestion is handled with node-cron. In an AWS version, this could map to EventBridge Scheduler triggering Lambda functions, with S3 for raw storage, RDS PostgreSQL for structured data, ElastiCache for Redis, API Gateway for HTTP exposure, and CloudWatch for logs.
The goal of this project is to build practical experience around data ingestion, transformation, quality checks, observability, SQL diagnostics, caching and cloud-ready backend architecture.
Fekher β Technical Business Analyst transitioning toward Data / Integration Engineering
Financial systems, data pipelines, SQL, APIs and cloud-ready architectures
GitHub: FekherJ